pom.xml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/engine/influxdb/InfluxdbRepository.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/engine/scheduler/DataCleaningScheduler.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/engine/service/DataCleaningService.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/application.yml | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
pom.xml
@@ -120,6 +120,14 @@ </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>9</source> <target>9</target> </configuration> </plugin> </plugins> <finalName>${project.artifactId}</finalName> </build> src/main/java/com/zhitan/engine/influxdb/InfluxdbRepository.java
@@ -7,6 +7,7 @@ import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.HealthCheck; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.engine.enums.CollectionModes; import com.zhitan.engine.enums.GroupTimeType; import com.zhitan.engine.enums.Quality; @@ -76,6 +77,16 @@ write.writeMeasurements(WritePrecision.MS, writePoints); } /** * 写入单个点位 */ public void writePoint(Point point) { if (null == point) { return; } WriteApiBlocking writeApi = client.getWriteApiBlocking(); writeApi.writePoint(point); } public TagValue query(String tagCode, Date time) { List<TagValue> values = query(Collections.singletonList(tagCode), time); return !values.isEmpty() ? values.get(0) : new TagValue(); @@ -303,6 +314,9 @@ config.getMeasurement(), field, startTime, endTime); return 0.0; } if (field.contains("total_A")) { System.out.println(values); } // 计算累计值(最后一个值减去第一个值) Object firstValue = values.get(0); @@ -323,4 +337,32 @@ return 0.0; } } public double getLastPoint(String measurement, String tag, String s) { if (client == null || !config.isEnable()) { log.warn("InfluxDB client is not initialized or disabled."); return 0; } String query = String.format("from(bucket: \"%s\") " + "|> range(start: -1h) " + "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"value\") " + "|> filter(fn: (r) => r.tag == \"%s\") " + "|> last()", config.getBucket(), measurement, s); List<FluxTable> tables = client.getQueryApi().query(query, config.getOrg()); if (tables != null && !tables.isEmpty()) { List<FluxRecord> records = tables.get(0).getRecords(); if (records != null && !records.isEmpty()) { FluxRecord record = records.get(0); double value = (double) record.getValue(); return value; } } log.warn("No data found for measurement: {}, tag: {}, field: {}", measurement, tag, s); return 0; } } src/main/java/com/zhitan/engine/scheduler/DataCleaningScheduler.java
@@ -98,4 +98,42 @@ log.error("手动触发{}统计任务执行失败:{}", timeType, e.getMessage(), e); } } @Scheduled(cron = "0 * * * * ?") public void totalElectricityTask() { try { LocalDateTime now = LocalDateTime.now(); log.info("开始执行全厂总电量统计任务,处理时间:{}", now); dataCleaningService.calculateTotalElectricity(now); log.info("全厂总电量统计任务执行完成"); } catch (Exception e) { log.error("全厂总电量统计任务执行失败:{}", e.getMessage(), e); } } /** * 每分钟执行一次,统计当前小时、当天、当月和当年的用电量 */ @Scheduled(cron = "0 * * * * ?") public void periodicElectricityStatisticsTask() { try { LocalDateTime now = LocalDateTime.now(); log.info("开始执行周期性用电量统计任务,处理时间:{}", now); // 统计当前小时用电量 dataCleaningService.calculateHourlyElectricity(now); // 统计当天用电量 dataCleaningService.calculateDailyElectricity(now); // 统计当月用电量 dataCleaningService.calculateMonthlyElectricity(now); // 统计当年用电量 dataCleaningService.calculateYearlyElectricity(now); log.info("周期性用电量统计任务执行完成"); } catch (Exception e) { log.error("周期性用电量统计任务执行失败:{}", e.getMessage(), e); } } } src/main/java/com/zhitan/engine/service/DataCleaningService.java
@@ -51,4 +51,11 @@ * @param dateTime 统计时间点 */ void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime); /** * 全厂总电量统计 * @param dateTime 统计时间点 */ void calculateTotalElectricity(LocalDateTime dateTime); } src/main/java/com/zhitan/engine/service/impl/DataCleaningServiceImpl.java
@@ -1,5 +1,8 @@ package com.zhitan.engine.service.impl; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.engine.config.InfluxDBConfig; import com.zhitan.engine.influxdb.InfluxdbRepository; import com.zhitan.engine.entity.*; import com.zhitan.engine.repository.*; @@ -20,13 +23,18 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import static java.util.List.*; /** * 数据清洗服务实现类 */ @Slf4j @Service public class DataCleaningServiceImpl implements DataCleaningService { private final String TAG = "tag"; private final String FIELD_VALUE = "value"; @Autowired private InfluxDBConfig influxDBConfig; @Autowired private InfluxdbRepository influxdbRepository; @@ -164,6 +172,97 @@ } @Override public void calculateTotalElectricity(LocalDateTime dateTime) { log.info("开始执行全厂总电量统计,时间点:{}", dateTime); // 定义需要统计的电表网关列表 List<String> gatewayCodes = new java.util.ArrayList<>(List.of("00601925032100028432", "00601925032100026840", "00601925032100028685", "00601925032100027392", "00601925032100028050", "00601925032100028751")); gatewayCodes.add("00601925032100026275"); gatewayCodes.add("00601925032100028117"); gatewayCodes.add("00601925032100028814"); gatewayCodes.add("00601925032100028990"); gatewayCodes.add("00601925032100031307"); // 定义需要统计的参数类型及其统计方式 Map<String, String> parameterTypes = new java.util.HashMap<>(Map.of( "ActiveZT", "SUM" // 正向总有功电能 - 求和 , "ActiveZN", "SUM" // 正向总无功电能 - 求和 , "ActiveElectricity", "SUM" // 有功电能 - 求和 , "NoActiveElectricity", "SUM" // 无功功率 - 求和 , "ExpZN", "SUM" // 反向总无功电能 - 求和 , "Exp", "SUM" // 反向有功电能 - 求和 , "ActivePow", "SUM" // 正向有功功率 - 求和 , "VoltageUAB", "AVG" // 总电压 - 平均值 , "VoltageUBC", "AVG" // 总电压 - 平均值 , "VoltageUCA", "AVG" // 总电压 - 平均值 // ,"CurrentA", "SUM" // 总电流 - 求和 // ,"CurrentB", "SUM" // 总电流 - 求和 // ,"CurrentC", "SUM" // 总电流 - 求和 // ,"Io", "SUM" // 总电流 - 求和 // ,"VoltageA", "AVG" // 总电压 - 平均值 // ,"VoltageB", "AVG" // 总电压 - 平均值 // ,"VoltageC", "AVG" )); parameterTypes.put("CurrentA", "SUM"); parameterTypes.put("CurrentB", "SUM"); parameterTypes.put("CurrentC", "SUM"); parameterTypes.put("Io", "SUM"); parameterTypes.put("VoltageA", "AVG"); parameterTypes.put("VoltageB", "AVG"); parameterTypes.put("VoltageC", "AVG"); // 计算时间范围(当前分钟) LocalDateTime startTime = dateTime.withSecond(0).withNano(0); LocalDateTime endTime = startTime.plusMinutes(1); // 遍历所有参数类型进行统计 parameterTypes.forEach((paramType, calcType) -> { try { double totalValue = 0; int meterCount = 0; // 遍历所有网关下的电表 for (String gatewayCode : gatewayCodes) { // 遍历每个网关下的8个电表 for (int i = 1; i <= 8; i++) { // 构建tag名称格式:网关编号_电表序号_参数类型 // 示例:00601925032100028432_1_ActiveZT String tag = String.format("%s_%d_%s", gatewayCode, i, paramType); // 查询最后一次保存数据 Double value = influxdbRepository.getLastPoint(influxDBConfig.getMeasurement(),TAG,tag); if (value != null) { totalValue += value ; meterCount++; } } } // 计算最终值 double finalValue = "SUM".equals(calcType) ? totalValue : (meterCount > 0 ? totalValue / meterCount : 0); // 保存总表数据 String totalTag = "total_" + paramType; Point point = Point .measurement(influxDBConfig.getMeasurement()) .addTag(TAG, totalTag) .addField(FIELD_VALUE, finalValue) .time(Instant.now(), WritePrecision.S); influxdbRepository.writePoint(point); log.info("统计完成:{}={} ({}个电表)", totalTag, finalValue, meterCount); } catch (Exception e) { log.error("统计参数{}失败:{}", paramType, e.getMessage(), e); } }); log.info("全厂总电量统计完成,时间点:{}", dateTime); } @Override @Transactional public void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime) { log.info("开始计算尖峰平谷用电量,索引ID:{},时间点:{}", indexId, dateTime); src/main/resources/application.yml
@@ -5,7 +5,7 @@ # 数据库配置 datasource: url: jdbc:postgresql://localhost:5432/postgres url: jdbc:postgresql://192.168.0.24:5432/postgres username: postgres password: 123456 driver-class-name: org.postgresql.Driver @@ -22,19 +22,19 @@ # InfluxDB配置 influxdb: host: "http://localhost:8086" host: "http://192.168.0.24:8086" #修改为自己的时序库访问org org: "lanbao" #修改为自己的时序库bucket bucket: "nygl" #修改为自己的时序库访问token token: "AminQagYp5rjb09mFPYvriK0T0vlF-zmwboqtUzdcq3nkXNuhnEpMuG_Ht5vtfWC4xBIVOThvoxy5reTer9XcQ==" token: "i8WwVZz3RvkEVF3qGaY8uIDXTFEe2PzjgrKebDzcxlYGKnR-kOK5Hf1S5G4z3p-lc9UO7MQS4qKGL4lIeHSw1A==" measurement: data enable: true # 服务器配置 server: port: 8082 port: 8090 servlet: context-path: /