package com.zhitan.service.impl; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import com.zhitan.model.IndexTemplate; import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; import com.zhitan.model.entity.KtDataEntity; import com.zhitan.model.entity.PowerEntity; import com.zhitan.redis.RedisCache; import com.zhitan.service.IDataService; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.Field; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; /** * 数据service */ @Slf4j @Service public class DataServiceImpl implements IDataService { private final String TAG = "tag"; private final String FIELD_VALUE = "value"; private final InfluxdbRepository repository; private final InfluxdbConfig influxdbConfig; private final CommonMapper commonMapper; private final RedisCache redisCache; @Autowired public DataServiceImpl(InfluxdbRepository repository, InfluxdbConfig influxdbConfig, CommonMapper commonMapper, RedisCache redisCache) { this.repository = repository; this.influxdbConfig = influxdbConfig; this.commonMapper = commonMapper; this.redisCache = redisCache; } /** * 写入时序数据 * * @param jsonString json数据 */ @Override public void writeTimeSeriesData(@NotNull String jsonString) { if (jsonString.isEmpty()) { return; } List points = new ArrayList<>(); JsonObject jsonObject = JsonParser.parseString(jsonString).getAsJsonObject(); for (Map.Entry entry : jsonObject.entrySet()) { String key = entry.getKey(); JsonElement value = entry.getValue(); JsonPrimitive primitive = value.getAsJsonPrimitive(); Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, key) .time(Instant.now(), WritePrecision.S); if (primitive.isJsonPrimitive()) { if (primitive.isNumber()) { point.addField(FIELD_VALUE, value.getAsDouble()); points.add(point); } else if (primitive.isString()) { //point.addField(FIELD_VALUE, value.getAsString()); } else if (primitive.isBoolean()) { //point.addField(FIELD_VALUE, value.getAsBoolean()); } } } repository.writePoints(points); } /** * 写入电力相关数据-固定格式,可自定义修改 * * @param electricPower 固定格式的数据 */ @Override public void writeTimeSeriesData(@NotNull ElectricPower electricPower) { List templates = getIndexTemplate(); // 获取类中所有声明的字段 Field[] fields = electricPower.getClass().getDeclaredFields(); List points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); // 设置字段可访问,允许访问私有字段 field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // 获取字段值 double value = field.getDouble(electricPower); point.addField(FIELD_VALUE, value); points.add(point); } catch (IllegalAccessException e) { log.error("获取属性值失败:{}", e.getMessage()); } } } } repository.writePoints(points); } /** * 写入电力相关数据-固定格式,可自定义修改 * * @param powerEntity 固定格式的数据 */ @Override public void writeTimeSeriesData(@NotNull PowerEntity powerEntity) { List templates = getIndexTemplate(); // 判断总有功是否为空,如果为空判断正向有功和反向有功是否为空,如果后两个任意一个不为空,则从时序数据库中获取为空的那一个,然后将总有功能电能赋值为正向有功能电能减去反向有功能电能 if (powerEntity.getEps() == null) { if (powerEntity.getEpsp() != null && powerEntity.getEpsn() != null) { powerEntity.setEps(powerEntity.getEpsp() - powerEntity.getEpsn()); } else if (powerEntity.getEpsp() != null) { double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(), TAG, powerEntity.getSn() + "_" + "Exp"); powerEntity.setEps(powerEntity.getEpsp() - lastValue); } else if (powerEntity.getEpsn() != null) { double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(), TAG, powerEntity.getSn() + "_" + "ActiveZT"); powerEntity.setEps(lastValue - powerEntity.getEpsn()); } } // 判断总无功是否为空,如果为空判断正向无功和反向无功是否为空,如果后两个任意一个不为空,则从时序数据库中获取为空的那一个,然后将总无功电能赋值为正向无功电能和反向无功电能的绝对值之和 if (powerEntity.getEqs() == null) { if (powerEntity.getEqsp() != null && powerEntity.getEqsn() != null) { powerEntity.setEqs(Math.abs(powerEntity.getEqsp()) + Math.abs(powerEntity.getEqsn())); } else if (powerEntity.getEqsp() != null) { double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(), TAG, powerEntity.getSn() + "_" + "ExpZN"); powerEntity.setEqs(Math.abs(lastValue) + Math.abs(powerEntity.getEqsp())); } else if (powerEntity.getEqsn() != null) { double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(), TAG, powerEntity.getSn() + "_" + "ActiveZN"); powerEntity.setEqs(Math.abs(lastValue) + Math.abs(powerEntity.getEqsn())); } } // 获取类中所有声明的字段 Field[] fields = powerEntity.getClass().getDeclaredFields(); List points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, powerEntity.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); // 设置字段可访问,允许访问私有字段 field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // 获取字段值 Object o = field.get(powerEntity); if (o==null) { // 查询出最后一次写入influxdb的数据 double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(), TAG, powerEntity.getSn() + "_" + indexTemplate.getCode()); if (lastValue>0) { log.info("查询出最后一次写入influxdb的数据:{}", lastValue); } point.addField(FIELD_VALUE, lastValue); } else { // 安全类型转换 if (o instanceof Number) { double value = ((Number) o).doubleValue(); point.addField(FIELD_VALUE, value); // 使用 value... } else { log.error("字段 {} 类型非法: {}", field.getName(), o.getClass()); } } points.add(point); } catch (IllegalAccessException e) { log.error("获取属性值失败:{}", e.getMessage()); } } } } // repository.writePoints(points); } /** * 写入KT数据 * * @param ktDataEntity 固定格式的数据 */ @Override public void writeTimeSeriesData(KtDataEntity ktDataEntity) { List templates = getIndexTemplate(); // 获取类中所有声明的字段 Field[] fields = ktDataEntity.getClass().getDeclaredFields(); List points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // 获取字段值 Object o = field.get(ktDataEntity); if (o!=null) { // 安全类型转换 if (o instanceof Number) { double value = ((Number) o).doubleValue(); point.addField(FIELD_VALUE, value); // 使用 value... } else { log.error("字段 {} 类型非法: {}", field.getName(), o.getClass()); } } points.add(point); } catch (IllegalAccessException e) { throw new RuntimeException(e); } } } } repository.writePoints(points); } /** * 获取点位模板 */ protected List getIndexTemplate() { String TEMPLATE_KEY = "template"; List result = redisCache.getCacheList(TEMPLATE_KEY); if (result == null || result.isEmpty()) { result = commonMapper.getIndexTemplate(); redisCache.setCacheList(TEMPLATE_KEY, result, 120, TimeUnit.SECONDS); } return result; } }