baoshiwei
2025-06-18 318ebac926b9627a683c4ab90d4e2b7451b1e573
feat(engine): 新增全厂总电量统计功能

- 在 DataCleaningService 接口中添加 calculateTotalElectricity 方法
- 在 DataCleaningServiceImpl 类中实现该方法,统计全厂总电量
- 在 DataCleaningScheduler 类中添加定时任务,定期执行总电量统计
- 更新 InfluxdbRepository 接口,增加写入单个点位和查询最后一点数据的方法
已修改6个文件
204 ■■■■■ 文件已修改
pom.xml 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/engine/influxdb/InfluxdbRepository.java 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/engine/scheduler/DataCleaningScheduler.java 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/engine/service/DataCleaningService.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java 101 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/application.yml 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pom.xml
@@ -120,6 +120,14 @@
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>9</source>
                    <target>9</target>
                </configuration>
            </plugin>
        </plugins>
        <finalName>${project.artifactId}</finalName>
    </build>
src/main/java/com/zhitan/engine/influxdb/InfluxdbRepository.java
@@ -7,6 +7,7 @@
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.HealthCheck;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.engine.enums.CollectionModes;
import com.zhitan.engine.enums.GroupTimeType;
import com.zhitan.engine.enums.Quality;
@@ -76,6 +77,16 @@
        write.writeMeasurements(WritePrecision.MS, writePoints);
    }
    /**
     * 写入单个点位
     */
    public void writePoint(Point point) {
        if (null == point) {
            return;
        }
        WriteApiBlocking writeApi = client.getWriteApiBlocking();
        writeApi.writePoint(point);
    }
    public TagValue query(String tagCode, Date time) {
        List<TagValue> values = query(Collections.singletonList(tagCode), time);
        return !values.isEmpty() ? values.get(0) : new TagValue();
@@ -303,6 +314,9 @@
                        config.getMeasurement(), field, startTime, endTime);
                return 0.0;
            }
            if (field.contains("total_A")) {
                System.out.println(values);
            }
            // 计算累计值(最后一个值减去第一个值)
            Object firstValue = values.get(0);
@@ -323,4 +337,32 @@
            return 0.0;
        }
    }
    public double getLastPoint(String measurement, String tag, String s) {
        if (client == null || !config.isEnable()) {
            log.warn("InfluxDB client is not initialized or disabled.");
            return 0;
        }
        String query = String.format("from(bucket: \"%s\") " +
                "|> range(start: -1h) " +
                "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"value\") " +
                "|> filter(fn: (r) => r.tag == \"%s\") " +
                "|> last()", config.getBucket(), measurement, s);
        List<FluxTable> tables = client.getQueryApi().query(query, config.getOrg());
        if (tables != null && !tables.isEmpty()) {
            List<FluxRecord> records = tables.get(0).getRecords();
            if (records != null && !records.isEmpty()) {
                FluxRecord record = records.get(0);
                double value = (double) record.getValue();
                return value;
            }
        }
        log.warn("No data found for measurement: {}, tag: {}, field: {}", measurement, tag, s);
        return 0;
    }
}
src/main/java/com/zhitan/engine/scheduler/DataCleaningScheduler.java
@@ -98,4 +98,42 @@
            log.error("手动触发{}统计任务执行失败:{}", timeType, e.getMessage(), e);
        }
    }
    @Scheduled(cron = "0 * * * * ?")
    public void totalElectricityTask() {
        try {
            LocalDateTime now = LocalDateTime.now();
            log.info("开始执行全厂总电量统计任务,处理时间:{}", now);
            dataCleaningService.calculateTotalElectricity(now);
            log.info("全厂总电量统计任务执行完成");
        } catch (Exception e) {
            log.error("全厂总电量统计任务执行失败:{}", e.getMessage(), e);
        }
    }
    /**
     * 每分钟执行一次,统计当前小时、当天、当月和当年的用电量
     */
    @Scheduled(cron = "0 * * * * ?")
    public void periodicElectricityStatisticsTask() {
        try {
            LocalDateTime now = LocalDateTime.now();
            log.info("开始执行周期性用电量统计任务,处理时间:{}", now);
            // 统计当前小时用电量
            dataCleaningService.calculateHourlyElectricity(now);
            // 统计当天用电量
            dataCleaningService.calculateDailyElectricity(now);
            // 统计当月用电量
            dataCleaningService.calculateMonthlyElectricity(now);
            // 统计当年用电量
            dataCleaningService.calculateYearlyElectricity(now);
            log.info("周期性用电量统计任务执行完成");
        } catch (Exception e) {
            log.error("周期性用电量统计任务执行失败:{}", e.getMessage(), e);
        }
    }
}
src/main/java/com/zhitan/engine/service/DataCleaningService.java
@@ -51,4 +51,11 @@
     * @param dateTime 统计时间点
     */
    void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime);
    /**
     * 全厂总电量统计
     * @param dateTime 统计时间点
     */
    void calculateTotalElectricity(LocalDateTime dateTime);
}
src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java
@@ -1,5 +1,8 @@
package com.zhitan.engine.service.impl;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.engine.config.InfluxDBConfig;
import com.zhitan.engine.influxdb.InfluxdbRepository;
import com.zhitan.engine.entity.*;
import com.zhitan.engine.repository.*;
@@ -20,13 +23,18 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.List.*;
/**
 * 数据清洗服务实现类
 */
@Slf4j
@Service
public class DataCleaningServiceImpl implements DataCleaningService {
    private final String TAG = "tag";
    private final String FIELD_VALUE = "value";
    @Autowired
    private InfluxDBConfig influxDBConfig;
    @Autowired
    private InfluxdbRepository influxdbRepository;
@@ -164,6 +172,97 @@
    }
    @Override
    public void calculateTotalElectricity(LocalDateTime dateTime) {
        log.info("开始执行全厂总电量统计,时间点:{}", dateTime);
        // 定义需要统计的电表网关列表
        List<String> gatewayCodes = new java.util.ArrayList<>(List.of("00601925032100028432", "00601925032100026840", "00601925032100028685", "00601925032100027392", "00601925032100028050", "00601925032100028751"));
        gatewayCodes.add("00601925032100026275");
        gatewayCodes.add("00601925032100028117");
        gatewayCodes.add("00601925032100028814");
        gatewayCodes.add("00601925032100028990");
        gatewayCodes.add("00601925032100031307");
        // 定义需要统计的参数类型及其统计方式
        Map<String, String> parameterTypes = new java.util.HashMap<>(Map.of(
                "ActiveZT", "SUM" // 正向总有功电能 - 求和
                , "ActiveZN", "SUM" // 正向总无功电能 - 求和
                , "ActiveElectricity", "SUM" // 有功电能 - 求和
                , "NoActiveElectricity", "SUM" // 无功功率 - 求和
                , "ExpZN", "SUM" // 反向总无功电能 - 求和
                , "Exp", "SUM" // 反向有功电能 - 求和
                , "ActivePow", "SUM" // 正向有功功率 - 求和
                , "VoltageUAB", "AVG" // 总电压 - 平均值
                , "VoltageUBC", "AVG" // 总电压 - 平均值
                , "VoltageUCA", "AVG" // 总电压 - 平均值
//            ,"CurrentA", "SUM" // 总电流 - 求和
//            ,"CurrentB", "SUM" // 总电流 - 求和
//            ,"CurrentC", "SUM" // 总电流 - 求和
//            ,"Io", "SUM" // 总电流 - 求和
//            ,"VoltageA", "AVG" // 总电压 - 平均值
//            ,"VoltageB", "AVG" // 总电压 - 平均值
//            ,"VoltageC", "AVG"
        ));
        parameterTypes.put("CurrentA", "SUM");
        parameterTypes.put("CurrentB", "SUM");
        parameterTypes.put("CurrentC", "SUM");
        parameterTypes.put("Io", "SUM");
        parameterTypes.put("VoltageA", "AVG");
        parameterTypes.put("VoltageB", "AVG");
        parameterTypes.put("VoltageC", "AVG");
        // 计算时间范围(当前分钟)
        LocalDateTime startTime = dateTime.withSecond(0).withNano(0);
        LocalDateTime endTime = startTime.plusMinutes(1);
        // 遍历所有参数类型进行统计
        parameterTypes.forEach((paramType, calcType) -> {
            try {
                double totalValue = 0;
                int meterCount = 0;
                // 遍历所有网关下的电表
                for (String gatewayCode : gatewayCodes) {
                    // 遍历每个网关下的8个电表
                    for (int i = 1; i <= 8; i++) {
                        // 构建tag名称格式:网关编号_电表序号_参数类型
                        // 示例:00601925032100028432_1_ActiveZT
                        String tag = String.format("%s_%d_%s", gatewayCode, i, paramType);
                        // 查询最后一次保存数据
                        Double value = influxdbRepository.getLastPoint(influxDBConfig.getMeasurement(),TAG,tag);
                        if (value != null) {
                            totalValue +=  value ;
                            meterCount++;
                        }
                    }
                }
                // 计算最终值
                double finalValue = "SUM".equals(calcType) ? totalValue : (meterCount > 0 ? totalValue / meterCount : 0);
                // 保存总表数据
                String totalTag = "total_" + paramType;
                Point point = Point
                        .measurement(influxDBConfig.getMeasurement())
                        .addTag(TAG, totalTag)
                        .addField(FIELD_VALUE, finalValue)
                        .time(Instant.now(), WritePrecision.S);
                influxdbRepository.writePoint(point);
                log.info("统计完成:{}={} ({}个电表)", totalTag, finalValue, meterCount);
            } catch (Exception e) {
                log.error("统计参数{}失败:{}", paramType, e.getMessage(), e);
            }
        });
        log.info("全厂总电量统计完成,时间点:{}", dateTime);
    }
    @Override
    @Transactional
    public void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime) {
        log.info("开始计算尖峰平谷用电量,索引ID:{},时间点:{}", indexId, dateTime);
src/main/resources/application.yml
@@ -5,7 +5,7 @@
  
  # 数据库配置
  datasource:
    url: jdbc:postgresql://localhost:5432/postgres
    url: jdbc:postgresql://192.168.0.24:5432/postgres
    username: postgres
    password: 123456
    driver-class-name: org.postgresql.Driver
@@ -22,19 +22,19 @@
# InfluxDB配置
influxdb:
  host: "http://localhost:8086"
  host: "http://192.168.0.24:8086"
  #修改为自己的时序库访问org
  org: "lanbao"
  #修改为自己的时序库bucket
  bucket: "nygl"
  #修改为自己的时序库访问token
  token: "AminQagYp5rjb09mFPYvriK0T0vlF-zmwboqtUzdcq3nkXNuhnEpMuG_Ht5vtfWC4xBIVOThvoxy5reTer9XcQ=="
  token: "i8WwVZz3RvkEVF3qGaY8uIDXTFEe2PzjgrKebDzcxlYGKnR-kOK5Hf1S5G4z3p-lc9UO7MQS4qKGL4lIeHSw1A=="
  measurement: data
  enable: true
# 服务器配置
server:
  port: 8082
  port: 8090
  servlet:
    context-path: /