package com.zhitan.engine.influxdb; import com.influxdb.LogLevel; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.QueryApi; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.domain.HealthCheck; import com.influxdb.client.domain.WritePrecision; import com.zhitan.engine.enums.CollectionModes; import com.zhitan.engine.enums.GroupTimeType; import com.zhitan.engine.enums.Quality; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import com.zhitan.engine.config.InfluxDBConfig; import com.zhitan.engine.entity.TagValue; import com.zhitan.engine.entity.WritePoint; import lombok.extern.slf4j.Slf4j; import org.joda.time.DateTime; import org.joda.time.Duration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; /** * influxdb的基础服务 * * @author Silence * @version 1.0 */ @Slf4j @Repository public class InfluxdbRepository { protected InfluxDBConfig config; protected InfluxDBClient client; @Value("${influxdb.bucket}") private String database; public InfluxdbRepository(InfluxDBConfig config) { this.config = config; connectInfluxDB(); } private void connectInfluxDB() { char[] token = config.getToken().toCharArray(); if (client == null) { client = InfluxDBClientFactory.create(config.getHost(), token, config.getOrg(), config.getBucket()); } log.error("--------------------实时数据库连接成功--------------------"); HealthCheck health = client.health(); if (health.getStatus() == HealthCheck.StatusEnum.FAIL) { client.close(); client = InfluxDBClientFactory.create(config.getHost(), token, config.getOrg(), config.getBucket()); } } public void store(List tagValues) { WriteApiBlocking write = client.getWriteApiBlocking(); List writePoints = new ArrayList<>(); tagValues.forEach(tagValue -> { WritePoint point = new WritePoint(tagValue.getTagCode(), tagValue.getValue(), Instant.ofEpochMilli(tagValue.getDataTime().getTime())); writePoints.add(point); }); write.writeMeasurements(WritePrecision.MS, writePoints); } public TagValue query(String tagCode, Date time) { List values = query(Collections.singletonList(tagCode), time); return !values.isEmpty() ? values.get(0) : new TagValue(); } public TagValue query(String tagCode) { List values = query(Collections.singletonList(tagCode)); return !values.isEmpty() ? values.get(0) : new TagValue(); } public List query(List tagCodes, Date time) { DateTime endTime = new DateTime(time); DateTime beginTime = endTime.plusDays(-1); return getTagValues(beginTime, endTime, tagCodes); } public List query(List tagCodes) { DateTime beginTime = DateTime.now().plusDays(-1); DateTime endTime = DateTime.now(); return getTagValues(beginTime, endTime, tagCodes); } // // /** // * 获取一个小时内所有数据 // * // * @param tagCodes 测点编号 // * @param time 开始时间 // * @return 测点统计结果 // */ // public List queryOneHour(List tagCodes, Date time) { // DateTime beginTime = new DateTime(time); // DateTime endTime = beginTime.plusHours(1); // return getHistoryData(tagCodes, beginTime.toDate(), endTime.toDate()); // } private List getHistoryData(List tagCodes, LocalDateTime beginTime, LocalDateTime endTime) { // 将 beginTime 转换为 java.util.Date 对象 Date o = Date.from(beginTime.atZone(ZoneId.systemDefault()).toInstant()); DateTime begin = new DateTime(o); Date o1 = Date.from(endTime.atZone(ZoneId.systemDefault()).toInstant()); DateTime end = new DateTime(o1); StringBuilder timeRange = new StringBuilder() .append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString()) .append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString()) .append(")"); StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes); log.info(String.valueOf(fluxSql)); return getTagValues(fluxSql); } public List getHistoryData(List tagCodes, Date beginTime, Date endTime, long interval) { DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); StringBuilder timeRange = new StringBuilder() .append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString()) .append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString()) .append(")"); StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes); fluxSql.append("|> aggregateWindow(every: ").append(interval) .append("s, fn: last, createEmpty: false)"); log.info(String.valueOf(fluxSql)); return getTagValues(fluxSql); } /** * 获取指定标签代码在时间范围内的历史数据,并按指定点数计算数据间隔 * * @param tagCodes 标签代码列表,用于标识需要获取历史数据的设备/变量 * @param beginTime 数据查询的起始时间(包含) * @param endTime 数据查询的结束时间(包含) * @param pointCount 期望获取的数据点数,用于自动计算采样间隔。实际点数可能受存储限制影响 * @return List 包含时间序列数据的对象集合,按时间顺序排列 * * 方法逻辑: * 1. 将Date类型参数转换为Joda-Time的DateTime类型以便计算时间间隔 * 2. 计算总时间跨度(毫秒),根据点数要求转换为秒级间隔 * 3. 调用底层方法获取按计算间隔采样的历史数据 */ public List getHistoryData(List tagCodes, Date beginTime, Date endTime, int pointCount) { // 转换时间对象为Joda-Time类型(兼容旧Date类型参数) DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); // 计算时间总跨度并转换为秒级采样间隔 long millis = new Duration(begin, end).getMillis(); long interval = millis / pointCount / 1000; // 委托给核心方法执行实际数据查询 return getHistoryData(tagCodes, beginTime, endTime, interval); } private StringBuilder getStringBuilder(StringBuilder timeRange, List tagCodes) { StringBuilder fluxSql = new StringBuilder(); fluxSql.append("from(bucket: \"").append(config.getBucket()).append("\")") .append(timeRange).append("|> filter(fn: (r) => r[\"_measurement\"] == \"") .append(config.getMeasurement()).append("\")"); fluxSql.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")"); if (!tagCodes.isEmpty()) { fluxSql.append("|> filter(fn: (r) => r[\"tag\"] =~ /"); List filter = new ArrayList<>(tagCodes); fluxSql.append(String.join("|", filter)); fluxSql.append("/)"); } fluxSql.append("|> group(columns: [\"tag\"])"); log.info(String.valueOf(fluxSql)); return fluxSql; } private List getTagValues(StringBuilder fluxSql) { QueryApi query = client.getQueryApi(); List tables = query.query(fluxSql.toString()); List values = new ArrayList<>(); for (FluxTable fluxTable : tables) { List records = fluxTable.getRecords(); for (FluxRecord fluxRecord : records) { String tag = String.valueOf(fluxRecord.getValueByKey("tag")); Double value = (Double) fluxRecord.getValueByKey("_value"); Date time = fluxRecord.getTime() == null ? null : new DateTime(fluxRecord.getTime().toString()).toDate(); TagValue tagValue = new TagValue(); tagValue.setValue(value); tagValue.setDataTime(time); tagValue.setTagCode(tag); tagValue.setQuality(Quality.GOOD); values.add(tagValue); } } return values; } public List statistics(List tagCodes, Date beginTime, Date endTime, CollectionModes queryType) { DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); return getTagValues(begin, end, tagCodes, queryType); } public List statistics(List tagCodes, Date beginTime, Date endTime, CollectionModes queryType, GroupTimeType timeType) { DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); StringBuilder timeRange = new StringBuilder() .append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString()) .append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString()) .append(")"); StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes); fluxSql.append("|> aggregateWindow(every: 1") .append(timeType.name()) .append(", fn: ") .append(queryType.name()) .append(", createEmpty: false)") .append("|>yield(name: \"") .append(queryType.name()) .append("\")"); log.info(String.valueOf(fluxSql)); return getTagValues(fluxSql); } private List getTagValues(DateTime begin, DateTime end, List tagCodes) { return getTagValues(begin, end, tagCodes, CollectionModes.last); } private List getTagValues(DateTime begin, DateTime end, List tagCodes, CollectionModes queryType) { StringBuilder timeRange = new StringBuilder() .append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString()) .append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString()) .append(")"); StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes); fluxSql.append("|> "); fluxSql.append(queryType.name()); fluxSql.append("()"); log.info(String.valueOf(fluxSql)); return getTagValues(fluxSql); } /** * 解析查询结果中的数值 * * @param tables 查询结果 * @return 数值列表 */ public List getFieldValues(List tables) { if (tables == null || tables.isEmpty()) { return null; } List values = new ArrayList<>(); for (TagValue tagValue : tables) { Double value = tagValue.getValue(); if (value != null) { values.add(value); } } return values.isEmpty() ? null : values; } /** * 计算指定时间范围内的电能累计值 * * @param field 字段 00601925032100026643_5_ActiveZT * @param startTime 开始时间 * @param endTime 结束时间 * @return 累计值 */ public Double calculateAccumulatedValue(String field, LocalDateTime startTime, LocalDateTime endTime) { try { List tagCodes = new ArrayList<>(); tagCodes.add(field); List tables = getHistoryData(tagCodes, startTime, endTime); List values = getFieldValues(tables); if (values == null || values.isEmpty()) { log.warn("未查询到数据,返回默认值0.0,measurement={}, tag={},startTime={}, endTime={}", config.getMeasurement(), field, startTime, endTime); return 0.0; } // 计算累计值(最后一个值减去第一个值) Object firstValue = values.get(0); Object lastValue = values.get(values.size() - 1); if (firstValue instanceof Number && lastValue instanceof Number) { double result = ((Number) lastValue).doubleValue() - ((Number) firstValue).doubleValue(); log.info("计算累计值成功:measurement={}, tag={}, 起始值={}, 结束值={}, 累计值={}", config.getMeasurement(), field, firstValue, lastValue, result); return result; } else { log.warn("数据类型不是数值类型,返回默认值0.0,firstValue={}, lastValue={}", firstValue, lastValue); return 0.0; } } catch (Exception e) { log.error("计算累计值异常,返回默认值0.0,measurement={}, tag={}, error={}", config.getMeasurement(), field, e.getMessage(), e); return 0.0; } } }