baoshiwei
2025-06-18 318ebac926b9627a683c4ab90d4e2b7451b1e573
src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java
@@ -1,5 +1,8 @@
package com.zhitan.engine.service.impl;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.zhitan.engine.config.InfluxDBConfig;
import com.zhitan.engine.influxdb.InfluxdbRepository;
import com.zhitan.engine.entity.*;
import com.zhitan.engine.repository.*;
@@ -20,13 +23,18 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.List.*;
/**
 * 数据清洗服务实现类
 */
@Slf4j
@Service
public class DataCleaningServiceImpl implements DataCleaningService {
    private final String TAG = "tag";
    private final String FIELD_VALUE = "value";
    @Autowired
    private InfluxDBConfig influxDBConfig;
    @Autowired
    private InfluxdbRepository influxdbRepository;
@@ -164,6 +172,97 @@
    }
    @Override
    public void calculateTotalElectricity(LocalDateTime dateTime) {
        log.info("开始执行全厂总电量统计,时间点:{}", dateTime);
        // 定义需要统计的电表网关列表
        List<String> gatewayCodes = new java.util.ArrayList<>(List.of("00601925032100028432", "00601925032100026840", "00601925032100028685", "00601925032100027392", "00601925032100028050", "00601925032100028751"));
        gatewayCodes.add("00601925032100026275");
        gatewayCodes.add("00601925032100028117");
        gatewayCodes.add("00601925032100028814");
        gatewayCodes.add("00601925032100028990");
        gatewayCodes.add("00601925032100031307");
        // 定义需要统计的参数类型及其统计方式
        Map<String, String> parameterTypes = new java.util.HashMap<>(Map.of(
                "ActiveZT", "SUM" // 正向总有功电能 - 求和
                , "ActiveZN", "SUM" // 正向总无功电能 - 求和
                , "ActiveElectricity", "SUM" // 有功电能 - 求和
                , "NoActiveElectricity", "SUM" // 无功功率 - 求和
                , "ExpZN", "SUM" // 反向总无功电能 - 求和
                , "Exp", "SUM" // 反向有功电能 - 求和
                , "ActivePow", "SUM" // 正向有功功率 - 求和
                , "VoltageUAB", "AVG" // 总电压 - 平均值
                , "VoltageUBC", "AVG" // 总电压 - 平均值
                , "VoltageUCA", "AVG" // 总电压 - 平均值
//            ,"CurrentA", "SUM" // 总电流 - 求和
//            ,"CurrentB", "SUM" // 总电流 - 求和
//            ,"CurrentC", "SUM" // 总电流 - 求和
//            ,"Io", "SUM" // 总电流 - 求和
//            ,"VoltageA", "AVG" // 总电压 - 平均值
//            ,"VoltageB", "AVG" // 总电压 - 平均值
//            ,"VoltageC", "AVG"
        ));
        parameterTypes.put("CurrentA", "SUM");
        parameterTypes.put("CurrentB", "SUM");
        parameterTypes.put("CurrentC", "SUM");
        parameterTypes.put("Io", "SUM");
        parameterTypes.put("VoltageA", "AVG");
        parameterTypes.put("VoltageB", "AVG");
        parameterTypes.put("VoltageC", "AVG");
        // 计算时间范围(当前分钟)
        LocalDateTime startTime = dateTime.withSecond(0).withNano(0);
        LocalDateTime endTime = startTime.plusMinutes(1);
        // 遍历所有参数类型进行统计
        parameterTypes.forEach((paramType, calcType) -> {
            try {
                double totalValue = 0;
                int meterCount = 0;
                // 遍历所有网关下的电表
                for (String gatewayCode : gatewayCodes) {
                    // 遍历每个网关下的8个电表
                    for (int i = 1; i <= 8; i++) {
                        // 构建tag名称格式:网关编号_电表序号_参数类型
                        // 示例:00601925032100028432_1_ActiveZT
                        String tag = String.format("%s_%d_%s", gatewayCode, i, paramType);
                        // 查询最后一次保存数据
                        Double value = influxdbRepository.getLastPoint(influxDBConfig.getMeasurement(),TAG,tag);
                        if (value != null) {
                            totalValue +=  value ;
                            meterCount++;
                        }
                    }
                }
                // 计算最终值
                double finalValue = "SUM".equals(calcType) ? totalValue : (meterCount > 0 ? totalValue / meterCount : 0);
                // 保存总表数据
                String totalTag = "total_" + paramType;
                Point point = Point
                        .measurement(influxDBConfig.getMeasurement())
                        .addTag(TAG, totalTag)
                        .addField(FIELD_VALUE, finalValue)
                        .time(Instant.now(), WritePrecision.S);
                influxdbRepository.writePoint(point);
                log.info("统计完成:{}={} ({}个电表)", totalTag, finalValue, meterCount);
            } catch (Exception e) {
                log.error("统计参数{}失败:{}", paramType, e.getMessage(), e);
            }
        });
        log.info("全厂总电量统计完成,时间点:{}", dateTime);
    }
    @Override
    @Transactional
    public void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime) {
        log.info("开始计算尖峰平谷用电量,索引ID:{},时间点:{}", indexId, dateTime);