package com.zhitan.service.impl; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import com.zhitan.model.IndexTemplate; import com.zhitan.model.entity.ElectricPower; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.mapper.CommonMapper; import com.zhitan.redis.RedisCache; import com.zhitan.service.IDataService; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.Field; import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * 数据service */ @Slf4j @Service public class DataServiceImpl implements IDataService { private final String TAG = "tag"; private final String FIELD_VALUE = "value"; private final InfluxdbRepository repository; private final InfluxdbConfig influxdbConfig; private final CommonMapper commonMapper; private final RedisCache redisCache; @Autowired public DataServiceImpl(InfluxdbRepository repository, InfluxdbConfig influxdbConfig, CommonMapper commonMapper, RedisCache redisCache) { this.repository = repository; this.influxdbConfig = influxdbConfig; this.commonMapper = commonMapper; this.redisCache = redisCache; } /** * 写入时序数据 * * @param jsonString json数据 */ @Override public void writeTimeSeriesData(@NotNull String jsonString) { if (jsonString.isEmpty()) { return; } List points = new ArrayList<>(); JsonObject jsonObject = JsonParser.parseString(jsonString).getAsJsonObject(); for (Map.Entry entry : jsonObject.entrySet()) { String key = entry.getKey(); JsonElement value = entry.getValue(); JsonPrimitive primitive = value.getAsJsonPrimitive(); Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, key) .time(Instant.now(), WritePrecision.S); if (primitive.isJsonPrimitive()) { if (primitive.isNumber()) { point.addField(FIELD_VALUE, value.getAsDouble()); points.add(point); } else if (primitive.isString()) { //point.addField(FIELD_VALUE, value.getAsString()); } else if (primitive.isBoolean()) { //point.addField(FIELD_VALUE, value.getAsBoolean()); } } } repository.writePoints(points); } /** * 写入电力相关数据-固定格式,可自定义修改 * * @param electricPower 固定格式的数据 */ @Override public void writeTimeSeriesData(@NotNull ElectricPower electricPower) { List templates = getIndexTemplate(); // 获取类中所有声明的字段 Field[] fields = electricPower.getClass().getDeclaredFields(); List points = new ArrayList<>(); for (Field field : fields) { IndexTemplate indexTemplate = templates.stream().filter(template -> field.getName().equalsIgnoreCase(template.getGatewayKey())) .findFirst().orElse(null); if (indexTemplate != null) { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, electricPower.getSn() + "_" + indexTemplate.getCode()) .time(Instant.now(), WritePrecision.S); // 设置字段可访问,允许访问私有字段 field.setAccessible(true); if (Number.class.isAssignableFrom(field.getType()) || field.getType().isPrimitive()) { try { // 获取字段值 double value = field.getDouble(electricPower); point.addField(FIELD_VALUE, value); points.add(point); } catch (IllegalAccessException e) { log.error("获取属性值失败:{}", e.getMessage()); } } } } repository.writePoints(points); } /** * 获取点位模板 */ protected List getIndexTemplate() { String TEMPLATE_KEY = "template"; List result = redisCache.getCacheList(TEMPLATE_KEY); if (result == null || result.isEmpty()) { result = commonMapper.getIndexTemplate(); redisCache.setCacheList(TEMPLATE_KEY, result); } return result; } }