baoshiwei
2025-06-21 d9714be7130e14e063e6499637e1cc5241ff9dd3
src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java
@@ -1,5 +1,8 @@
package com.zhitan.engine.service.impl;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.engine.config.InfluxDBConfig;
import com.zhitan.engine.influxdb.InfluxdbRepository;
import com.zhitan.engine.entity.*;
import com.zhitan.engine.repository.*;
@@ -20,13 +23,18 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.List.*;
/**
 * 数据清洗服务实现类
 */
@Slf4j
@Service
public class DataCleaningServiceImpl implements DataCleaningService {
    private final String TAG = "tag";
    private final String FIELD_VALUE = "value";
    @Autowired
    private InfluxDBConfig influxDBConfig;
    @Autowired
    private InfluxdbRepository influxdbRepository;
@@ -164,6 +172,112 @@
    }
    @Override
    public void calculateTotalElectricity(LocalDateTime dateTime) {
        log.info("开始执行全厂总电量统计,时间点:{}", dateTime);
        // 定义需要统计的电表网关列表
        List<String> oldGatewayList = List.of("00601925032100028432", "00601925032100026840");
        List<String> sevenGatewayList = List.of("00601925032100028685", "00601925032100027392", "00601925032100028050", "00601925032100028751","00601925032100026275","00601925032100028117","00601925032100028814","00601925032100028990");
        List<String> outGatewayList = List.of("00601925032100031307");
        // 定义需要统计的参数类型及其统计方式
        Map<String, String> parameterTypes = new java.util.HashMap<>(Map.of(
                "ActiveZT", "SUM" // 正向总有功电能 - 求和
                , "ActiveZN", "SUM" // 正向总无功电能 - 求和
                , "ActiveElectricity", "SUM" // 有功电能 - 求和
                , "NoActiveElectricity", "SUM" // 无功功率 - 求和
                , "ExpZN", "SUM" // 反向总无功电能 - 求和
                , "Exp", "SUM" // 反向有功电能 - 求和
                , "ActivePow", "SUM" // 正向有功功率 - 求和
                , "VoltageUAB", "AVG" // 总电压 - 平均值
                , "VoltageUBC", "AVG" // 总电压 - 平均值
                , "VoltageUCA", "AVG" // 总电压 - 平均值
        ));
        parameterTypes.put("CurrentA", "SUM");
        parameterTypes.put("CurrentB", "SUM");
        parameterTypes.put("CurrentC", "SUM");
        parameterTypes.put("Io", "SUM");
        parameterTypes.put("VoltageA", "AVG");
        parameterTypes.put("VoltageB", "AVG");
        parameterTypes.put("VoltageC", "AVG");
        // 保存总表数据
        String old = "old_";
        String seven = "seven_";
        String out = "out_";
        String total = "total_";
        // 遍历所有参数类型进行统计
        parameterTypes.forEach((paramType, calcType) -> {
            TotalCount totalCount = new TotalCount();
            statistics(paramType, calcType, oldGatewayList, old, totalCount);
            statistics(paramType, calcType, sevenGatewayList, seven, totalCount);
            statistics(paramType, calcType, outGatewayList, out, totalCount);
            clacAndSave(paramType, calcType,total,  totalCount.totalValue, totalCount.totalCount);
        });
        log.info("全厂总电量统计完成,时间点:{}", dateTime);
    }
    private static final int METER_COUNT_PER_GATEWAY = 8;
    private static class TotalCount {
        public int totalCount;
        public double totalValue;
        public TotalCount() {
            totalCount = 0;
            totalValue = 0;
        }
    }
private void statistics(String paramType, String calcType, List<String> gatewayCodes, String area, TotalCount totalCount) {
    try {
        double sumValue = 0;
        int meterCount = 0;
        // 遍历老楼车间所有网关下的电表
        for (String gatewayCode : gatewayCodes) {
            // 遍历每个网关下的8个电表
            for (int i = 1; i <= METER_COUNT_PER_GATEWAY; i++) {
                // 构建tag名称格式:网关编号_电表序号_参数类型
                // 示例:00601925032100028432_1_ActiveZT
                String tag = String.format("%s_%d_%s", gatewayCode, i, paramType);
                // 查询最后一次保存数据
                Double value = influxdbRepository.getLastPoint(influxDBConfig.getMeasurement(),TAG, tag);
                if (value != null) {
                    sumValue += value;
                    meterCount++;
                }
            }
        }
        totalCount.totalCount += meterCount;
        totalCount.totalValue += sumValue;
        clacAndSave(paramType, calcType, area, sumValue, meterCount);
    } catch (Exception e) {
        log.error("统计参数{}失败:{}", paramType, e.getMessage(), e);
    }
}
    private void clacAndSave(String paramType, String calcType, String area, double sumValue, int meterCount) {
        // 计算最终值
        double finalValue = "SUM".equals(calcType) ? sumValue : (meterCount > 0 ? sumValue / meterCount : 0);
        String sumTag = area + paramType;
        Point point = Point
                .measurement(influxDBConfig.getMeasurement())
                .addTag(TAG, sumTag)
                .addField(FIELD_VALUE, finalValue)
                .time(Instant.now(), WritePrecision.S);
        influxdbRepository.writePoint(point);
        log.info("统计完成:{}={} ({}个电表)", sumTag, finalValue, meterCount);
    }
    @Override
    @Transactional
    public void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime) {
        log.info("开始计算尖峰平谷用电量,索引ID:{},时间点:{}", indexId, dateTime);