baoshiwei
2025-06-26 55e0a19f3b1c113732f8ee56ac964e5208ec7afd
src/main/java/com/zhitan/service/impl/DataServiceImpl.java
@@ -11,6 +11,8 @@
import com.zhitan.model.entity.ElectricPower;
import com.zhitan.influxdb.InfluxdbRepository;
import com.zhitan.mapper.CommonMapper;
import com.zhitan.model.entity.KtDataEntity;
import com.zhitan.model.entity.PowerEntity;
import com.zhitan.redis.RedisCache;
import com.zhitan.service.IDataService;
import lombok.extern.slf4j.Slf4j;
@@ -23,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
 * 数据service
@@ -118,6 +121,138 @@
        repository.writePoints(points);
    }
    /**
     * 写入电力相关数据-固定格式,可自定义修改
     *
     * @param powerEntity 固定格式的数据
     */
    @Override
    public void writeTimeSeriesData(@NotNull PowerEntity powerEntity) {
        List<IndexTemplate> templates = getIndexTemplate();
        // 判断总有功是否为空,如果为空判断正向有功和反向有功是否为空,如果后两个任意一个不为空,则从时序数据库中获取为空的那一个,然后将总有功能电能赋值为正向有功能电能减去反向有功能电能
        if (powerEntity.getEps() == null) {
            if (powerEntity.getEpsp() != null && powerEntity.getEpsn() != null) {
                powerEntity.setEps(powerEntity.getEpsp() - powerEntity.getEpsn());
            } else if (powerEntity.getEpsp() != null) {
                double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(),
                        TAG, powerEntity.getSn() + "_" + "Exp");
                powerEntity.setEps(powerEntity.getEpsp() - lastValue);
            } else if (powerEntity.getEpsn() != null) {
                double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(),
                        TAG, powerEntity.getSn() + "_" + "ActiveZT");
                powerEntity.setEps(lastValue - powerEntity.getEpsn());
            }
        }
        // 判断总无功是否为空,如果为空判断正向无功和反向无功是否为空,如果后两个任意一个不为空,则从时序数据库中获取为空的那一个,然后将总无功电能赋值为正向无功电能和反向无功电能的绝对值之和
        if (powerEntity.getEqs() == null) {
            if (powerEntity.getEqsp() != null && powerEntity.getEqsn() != null) {
                powerEntity.setEqs(Math.abs(powerEntity.getEqsp()) + Math.abs(powerEntity.getEqsn()));
            } else if (powerEntity.getEqsp() != null) {
                double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(),
                        TAG, powerEntity.getSn() + "_" + "ExpZN");
                powerEntity.setEqs(Math.abs(lastValue) + Math.abs(powerEntity.getEqsp()));
            } else if (powerEntity.getEqsn() != null) {
                double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(),
                        TAG, powerEntity.getSn() + "_" + "ActiveZN");
                powerEntity.setEqs(Math.abs(lastValue) + Math.abs(powerEntity.getEqsn()));
            }
        }
        // 获取类中所有声明的字段
        Field[] fields = powerEntity.getClass().getDeclaredFields();
        List<Point> points = new ArrayList<>();
        for (Field field : fields) {
            IndexTemplate indexTemplate = templates.stream().filter(template ->
                            field.getName().equalsIgnoreCase(template.getGatewayKey()))
                    .findFirst().orElse(null);
            if (indexTemplate != null) {
                Point point = Point
                        .measurement(influxdbConfig.getMeasurement())
                        .addTag(TAG, powerEntity.getSn() + "_" + indexTemplate.getCode())
                        .time(Instant.now(), WritePrecision.S);
                // 设置字段可访问,允许访问私有字段
                field.setAccessible(true);
                if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
                    try {
                        // 获取字段值
                        Object o = field.get(powerEntity);
                        if (o==null) {
                            // 查询出最后一次写入influxdb的数据
                            double lastValue = repository.getLastPoint(influxdbConfig.getMeasurement(),
                                    TAG, powerEntity.getSn() + "_" + indexTemplate.getCode());
                            if (lastValue>0) {
                                log.info("查询出最后一次写入influxdb的数据:{}", lastValue);
                            }
                            point.addField(FIELD_VALUE, lastValue);
                        } else {
                            // 安全类型转换
                            if (o instanceof Number) {
                                double value = ((Number) o).doubleValue();
                                point.addField(FIELD_VALUE, value);
                                // 使用 value...
                            } else {
                                log.error("字段 {} 类型非法: {}", field.getName(), o.getClass());
                            }
                        }
                        points.add(point);
                    } catch (IllegalAccessException e) {
                        log.error("获取属性值失败:{}", e.getMessage());
                    }
                }
            }
        }
        repository.writePoints(points);
    }
    /**
     * 写入KT数据
     *
     * @param ktDataEntity 固定格式的数据
     */
    @Override
    public void writeTimeSeriesData(KtDataEntity ktDataEntity) {
        List<IndexTemplate> templates = getIndexTemplate();
        // 获取类中所有声明的字段
        Field[] fields = ktDataEntity.getClass().getDeclaredFields();
        List<Point> points = new ArrayList<>();
        for (Field field : fields) {
            IndexTemplate indexTemplate = templates.stream().filter(template ->
                            field.getName().equalsIgnoreCase(template.getGatewayKey()))
                    .findFirst().orElse(null);
                    if (indexTemplate != null) {
                        Point point = Point
                                .measurement(influxdbConfig.getMeasurement())
                                .addTag(TAG, ktDataEntity.getSn() + "_" + indexTemplate.getCode())
                                .time(Instant.now(), WritePrecision.S);
                        field.setAccessible(true);
                        if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) {
                            try {
                                // 获取字段值
                                Object o = field.get(ktDataEntity);
                                if (o!=null) {
                                    // 安全类型转换
                                    if (o instanceof Number) {
                                        double value = ((Number) o).doubleValue();
                                        point.addField(FIELD_VALUE, value);
                                        // 使用 value...
                                    } else {
                                        log.error("字段 {} 类型非法: {}", field.getName(), o.getClass());
                                    }
                                }
                                points.add(point);
                            } catch (IllegalAccessException e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }
        }
        repository.writePoints(points);
    }
    /**
     * 获取点位模板
     */
@@ -126,7 +261,7 @@
        List<IndexTemplate> result = redisCache.getCacheList(TEMPLATE_KEY);
        if (result == null || result.isEmpty()) {
            result = commonMapper.getIndexTemplate();
            redisCache.setCacheList(TEMPLATE_KEY, result);
            redisCache.setCacheList(TEMPLATE_KEY, result, 120, TimeUnit.SECONDS);
        }
        return result;
    }