package com.zhitan.engine.service.impl; import com.zhitan.engine.influxdb.InfluxdbRepository; import com.zhitan.engine.entity.*; import com.zhitan.engine.repository.*; import com.zhitan.engine.service.DataCleaningService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import java.math.BigDecimal; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * 数据清洗服务实现类 */ @Slf4j @Service public class DataCleaningServiceImpl implements DataCleaningService { @Autowired private InfluxdbRepository influxdbRepository; @Autowired private IndexStorageRepository indexStorageRepository; @Autowired private DataItemRepository dataItemRepository; @Autowired private ElectricityDataItemRepository electricityDataItemRepository; @Autowired private ElectricityPriceRepository electricityPriceRepository; @Autowired private ElectricityPriceDateRepository electricityPriceDateRepository; private static final Pattern CALC_TEXT_PATTERN = Pattern.compile("accumulate\\('([^']+)'\\)"); @Override public void calculateHourlyElectricity(LocalDateTime dateTime) { calculateElectricityByTimeType("HOUR", dateTime); } @Override public void calculateDailyElectricity(LocalDateTime dateTime) { calculateElectricityByTimeType("DAY", dateTime); } @Override public void calculateMonthlyElectricity(LocalDateTime dateTime) { calculateElectricityByTimeType("MONTH", dateTime); } @Override public void calculateYearlyElectricity(LocalDateTime dateTime) { calculateElectricityByTimeType("YEAR", dateTime); } @Override @Transactional public void calculateElectricityByTimeType(String timeType, LocalDateTime dateTime) { log.info("开始{}统计,时间点:{}", timeType, dateTime); // 获取指定时间类型的统计规则 List indexStorages = indexStorageRepository.findByTimeType(timeType); for (IndexStorage indexStorage : indexStorages) { try { // 解析计算表达式 if ("CALC".equals(indexStorage.getCalcType()) && StringUtils.hasText(indexStorage.getCalcText())) { processCalcRule(indexStorage, dateTime); } // 如果是按小时统计且需要计算尖峰平谷 if ("HOUR".equals(timeType) && indexStorage.getIsPvCalc() != null && indexStorage.getIsPvCalc() == 1) { calculatePeakValleyElectricity(indexStorage.getIndexId(), dateTime); } } catch (Exception e) { log.error("处理索引{}的{}统计失败:{}", indexStorage.getIndexId(), timeType, e.getMessage(), e); } } log.info("完成{}统计,时间点:{}", timeType, dateTime); } /** * 处理计算规则 * * @param indexStorage 索引存储配置 * @param dateTime 统计时间点 */ private void processCalcRule(IndexStorage indexStorage, LocalDateTime dateTime) { String calcText = indexStorage.getCalcText(); String timeType = indexStorage.getTimeType(); // 解析计算表达式:accumulate('00601925032100026643_5_ActiveZT') Matcher matcher = CALC_TEXT_PATTERN.matcher(calcText); if (matcher.find()) { String tag = matcher.group(1); // 计算时间范围 LocalDateTime[] timeRange = calculateTimeRange(timeType, dateTime); LocalDateTime startTime = timeRange[0]; LocalDateTime endTime = timeRange[1]; // 生成时间编码 String timeCode = generateTimeCode(timeType, dateTime); // 从InfluxDB查询数据并计算累计值 Double accumulatedValue = influxdbRepository.calculateAccumulatedValue(tag, startTime, endTime); double finalValue = accumulatedValue; // 保存到数据库 saveDataItem(indexStorage.getIndexId(), timeCode, startTime, endTime, finalValue, timeType); } else { log.warn("无法解析计算表达式:{}", calcText); } } /** * 保存数据项 * * @param indexId 索引ID * @param timeCode 时间编码 * @param beginTime 开始时间 * @param endTime 结束时间 * @param value 值 * @param timeType 时间类型 */ private void saveDataItem(String indexId, String timeCode, LocalDateTime beginTime, LocalDateTime endTime, double value, String timeType) { // 查询是否已存在 DataItem existingItem = dataItemRepository.findByIndexIdAndTimeCode(indexId, timeCode); if (existingItem == null) { existingItem = new DataItem(); existingItem.setIndexId(indexId); existingItem.setTimeCode(timeCode); existingItem.setCreateTime(LocalDateTime.now()); } existingItem.setBeginTime(beginTime); existingItem.setEndTime(endTime); existingItem.setValue(value); existingItem.setQuality("GOOD"); existingItem.setUpdateTime(LocalDateTime.now()); existingItem.setTimeType(timeType); existingItem.setDataTime(beginTime); dataItemRepository.save(existingItem); log.info("保存数据项成功:indexId={}, timeCode={}, value={}", indexId, timeCode, value); } @Override @Transactional public void calculatePeakValleyElectricity(String indexId, LocalDateTime dateTime) { log.info("开始计算尖峰平谷用电量,索引ID:{},时间点:{}", indexId, dateTime); try { // 查询当前生效的电价配置 List priceDates = electricityPriceDateRepository.findEffectiveByDate(dateTime.toLocalDate()); if (priceDates.isEmpty()) { log.warn("未找到{}生效的电价配置", dateTime.toLocalDate()); return; } // 获取电价配置 ElectricityPriceDate priceDate = priceDates.get(0); List prices = electricityPriceRepository.findByParentId(priceDate.getId()); if (prices.isEmpty()) { log.warn("未找到电价配置,parentId={}", priceDate.getId()); return; } // 按电价类型分组,如果存在重复的电价类型,保留最后一个配置 Map priceMap = prices.stream() .collect(Collectors.toMap( ElectricityPrice::getType, price -> price, (existing, replacement) -> replacement )); // 生成时间编码 String timeCode = generateTimeCode("HOUR", dateTime); // 查询该小时的用电量 DataItem dataItem = dataItemRepository.findByIndexIdAndTimeCode(indexId, timeCode); if (dataItem == null || dataItem.getValue() == null) { log.warn("未找到用电量数据,indexId={},timeCode={}", indexId, timeCode); return; } // 确定当前小时属于哪个电价类型 LocalTime currentTime = dateTime.toLocalTime(); String electricityType = determineElectricityType(currentTime, prices); if (electricityType == null) { log.warn("无法确定电价类型,时间:{}", currentTime); return; } // 获取电价 ElectricityPrice price = priceMap.get(electricityType); // 注意:effecticityPrice是实体类中的属性名,虽然拼写看起来不太规范 BigDecimal priceValue = null; try { priceValue = price.getEffecticityPrice(); if (priceValue == null) { priceValue = BigDecimal.ZERO; log.warn("电价值为空,使用默认值0,电价类型:{}", electricityType); } } catch (Exception e) { priceValue = BigDecimal.ZERO; log.error("获取电价失败,使用默认值0,电价类型:{},错误:{}", electricityType, e.getMessage(), e); } // 计算电费 BigDecimal electricity = BigDecimal.valueOf(dataItem.getValue()); BigDecimal cost = electricity.multiply(priceValue); // 保存尖峰平谷数据 saveElectricityDataItem(indexId, timeCode, electricityType, dataItem.getDataTime(), dataItem.getBeginTime(), dataItem.getEndTime(), electricity, cost, priceValue); // 同时更新日、月、年的尖峰平谷统计数据 // updatePeakValleyStatistics(indexId, dateTime, electricityType, electricity, cost); log.info("计算尖峰平谷用电量完成,索引ID:{},时间点:{},类型:{},用电量:{},电费:{}", indexId, dateTime, electricityType, electricity, cost); } catch (Exception e) { log.error("计算尖峰平谷用电量失败:{}", e.getMessage(), e); } } /** * 确定电价类型 * * @param time 当前时间 * @param prices 电价配置映射 * @return 电价类型 */ private String determineElectricityType(LocalTime time, List prices) { for (ElectricityPrice price : prices) { if (isTimeInRange(time, price.getStartTime(), price.getStopTime())) { return price.getType(); } } return null; } /** * 判断时间是否在范围内 * * @param time 当前时间 * @param startTime 开始时间 * @param stopTime 结束时间 * @return 是否在范围内 */ private boolean isTimeInRange(LocalTime time, LocalTime startTime, LocalTime stopTime) { // 处理跨天的情况 if (stopTime.isBefore(startTime)) { return !time.isBefore(startTime) || !time.isAfter(stopTime); } else { return !time.isBefore(startTime) && !time.isAfter(stopTime); } } /** * 保存尖峰平谷数据 * * @param indexId 索引ID * @param timeCode 时间编码 * @param electricityType 用电类型 * @param dataTime 数据时间 * @param beginTime 开始时间 * @param endTime 结束时间 * @param electricity 用电量 * @param cost 电费 * @param price 电价 */ @Autowired private EnergyIndexRepository energyIndexRepository; private void saveElectricityDataItem(String indexId, String timeCode, String electricityType, LocalDateTime dataTime, LocalDateTime beginTime, LocalDateTime endTime, BigDecimal electricity, BigDecimal cost, BigDecimal price) { // 查询是否已存在 ElectricityDataItem existingItem = electricityDataItemRepository .findByIndexIdAndTimeCodeAndElectricityType(indexId, timeCode, electricityType); // 查询指标code String indexCode = null; try { EnergyIndex energyIndex = energyIndexRepository.findByIndexId(indexId); if (energyIndex != null) { indexCode = energyIndex.getCode(); } } catch (Exception e) { log.error("查询指标code失败:{}", e.getMessage(), e); } if (existingItem == null) { existingItem = new ElectricityDataItem(); existingItem.setIndexId(indexId); existingItem.setTimeCode(timeCode); existingItem.setElectricityType(electricityType); existingItem.setCreateTime(LocalDateTime.now()); } // 设置指标code existingItem.setIndexCode(indexCode); existingItem.setDataTime(dataTime); existingItem.setBeginTime(beginTime); existingItem.setEndTime(endTime); existingItem.setElectricity(electricity); existingItem.setCost(cost); existingItem.setPrice(price); existingItem.setTimeType("HOUR"); existingItem.setUpdateTime(LocalDateTime.now()); electricityDataItemRepository.save(existingItem); } /** * 更新日、月、年的尖峰平谷统计数据 * * @param indexId 索引ID * @param dateTime 数据时间 * @param electricityType 用电类型 * @param electricity 用电量 * @param cost 电费 */ private void updatePeakValleyStatistics(String indexId, LocalDateTime dateTime, String electricityType, BigDecimal electricity, BigDecimal cost) { try { // 更新日统计 updateDailyPeakValleyStatistics(indexId, dateTime, electricityType, electricity, cost); // 更新月统计 updateMonthlyPeakValleyStatistics(indexId, dateTime, electricityType, electricity, cost); // 更新年统计 updateYearlyPeakValleyStatistics(indexId, dateTime, electricityType, electricity, cost); } catch (Exception e) { log.error("更新尖峰平谷统计数据失败:{}", e.getMessage(), e); } } /** * 更新日尖峰平谷统计数据 */ private void updateDailyPeakValleyStatistics(String indexId, LocalDateTime dateTime, String electricityType, BigDecimal electricity, BigDecimal cost) { // 生成日时间编码 String timeCode = generateTimeCode("DAY", dateTime); // 计算时间范围 LocalDateTime[] timeRange = calculateTimeRange("DAY", dateTime); // 更新或创建统计数据 updateElectricityDataItem(indexId, timeCode, electricityType, dateTime, timeRange[0], timeRange[1], electricity, cost, "DAY"); } /** * 更新月尖峰平谷统计数据 */ private void updateMonthlyPeakValleyStatistics(String indexId, LocalDateTime dateTime, String electricityType, BigDecimal electricity, BigDecimal cost) { // 生成月时间编码 String timeCode = generateTimeCode("MONTH", dateTime); // 计算时间范围 LocalDateTime[] timeRange = calculateTimeRange("MONTH", dateTime); // 更新或创建统计数据 updateElectricityDataItem(indexId, timeCode, electricityType, dateTime, timeRange[0], timeRange[1], electricity, cost, "MONTH"); } /** * 更新年尖峰平谷统计数据 */ private void updateYearlyPeakValleyStatistics(String indexId, LocalDateTime dateTime, String electricityType, BigDecimal electricity, BigDecimal cost) { // 生成年时间编码 String timeCode = generateTimeCode("YEAR", dateTime); // 计算时间范围 LocalDateTime[] timeRange = calculateTimeRange("YEAR", dateTime); // 更新或创建统计数据 updateElectricityDataItem(indexId, timeCode, electricityType, dateTime, timeRange[0], timeRange[1], electricity, cost, "YEAR"); } /** * 更新或创建尖峰平谷统计数据 */ private void updateElectricityDataItem(String indexId, String timeCode, String electricityType, LocalDateTime dataTime, LocalDateTime beginTime, LocalDateTime endTime, BigDecimal electricity, BigDecimal cost, String timeType) { // 查询是否已存在 ElectricityDataItem existingItem = electricityDataItemRepository .findByIndexIdAndTimeCodeAndElectricityType(indexId, timeCode, electricityType); if (existingItem == null) { // 创建新记录 existingItem = new ElectricityDataItem(); existingItem.setIndexId(indexId); existingItem.setTimeCode(timeCode); existingItem.setElectricityType(electricityType); existingItem.setCreateTime(LocalDateTime.now()); existingItem.setElectricity(electricity); existingItem.setCost(cost); // 避免除零异常 if (electricity.compareTo(BigDecimal.ZERO) > 0) { existingItem.setPrice(cost.divide(electricity, 2, BigDecimal.ROUND_HALF_UP)); } else { existingItem.setPrice(BigDecimal.ZERO); log.warn("用电量为零,无法计算电价,设置为默认值0,索引ID={},时间编码={},类型={}", indexId, timeCode, electricityType); } } else { // 更新已有记录,累加电量和电费 existingItem.setElectricity(existingItem.getElectricity().add(electricity)); existingItem.setCost(existingItem.getCost().add(cost)); // 重新计算平均电价 if (existingItem.getElectricity().compareTo(BigDecimal.ZERO) > 0) { existingItem.setPrice(existingItem.getCost().divide(existingItem.getElectricity(), 2, BigDecimal.ROUND_HALF_UP)); } } existingItem.setDataTime(dataTime); existingItem.setBeginTime(beginTime); existingItem.setEndTime(endTime); existingItem.setTimeType(timeType); existingItem.setUpdateTime(LocalDateTime.now()); electricityDataItemRepository.save(existingItem); log.info("更新{}尖峰平谷统计数据成功:indexId={}, timeCode={}, type={}, electricity={}, cost={}", timeType, indexId, timeCode, electricityType, electricity, cost); } /** * 计算时间范围 * * @param timeType 时间类型 * @param dateTime 统计时间点 * @return 时间范围[开始时间, 结束时间] */ private LocalDateTime[] calculateTimeRange(String timeType, LocalDateTime dateTime) { LocalDateTime startTime; LocalDateTime endTime; switch (timeType) { case "HOUR": startTime = dateTime.withMinute(0).withSecond(0).withNano(0); endTime = startTime.plusHours(1).minusNanos(1); break; case "DAY": startTime = dateTime.withHour(0).withMinute(0).withSecond(0).withNano(0); endTime = startTime.plusDays(1).minusNanos(1); break; case "MONTH": startTime = dateTime.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0); endTime = startTime.plusMonths(1).minusNanos(1); break; case "YEAR": startTime = dateTime.withDayOfYear(1).withHour(0).withMinute(0).withSecond(0).withNano(0); endTime = startTime.plusYears(1).minusNanos(1); break; default: throw new IllegalArgumentException("不支持的时间类型:" + timeType); } return new LocalDateTime[]{startTime, endTime}; } /** * 生成时间编码 * * @param timeType 时间类型 * @param dateTime 统计时间点 * @return 时间编码 */ private String generateTimeCode(String timeType, LocalDateTime dateTime) { DateTimeFormatter formatter; switch (timeType) { case "HOUR": formatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); return "H" + dateTime.format(formatter); case "DAY": formatter = DateTimeFormatter.ofPattern("yyyyMMdd"); return "D" + dateTime.format(formatter); case "MONTH": formatter = DateTimeFormatter.ofPattern("yyyyMM"); return "M" + dateTime.format(formatter); case "YEAR": formatter = DateTimeFormatter.ofPattern("yyyy"); return "Y" + dateTime.format(formatter); default: throw new IllegalArgumentException("不支持的时间类型:" + timeType); } } }