package com.zhitan.engine.influxdb;
|
|
import com.influxdb.LogLevel;
|
import com.influxdb.client.InfluxDBClient;
|
import com.influxdb.client.InfluxDBClientFactory;
|
import com.influxdb.client.QueryApi;
|
import com.influxdb.client.WriteApiBlocking;
|
import com.influxdb.client.domain.HealthCheck;
|
import com.influxdb.client.domain.WritePrecision;
|
import com.zhitan.engine.enums.CollectionModes;
|
import com.zhitan.engine.enums.GroupTimeType;
|
import com.zhitan.engine.enums.Quality;
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
import com.zhitan.engine.config.InfluxDBConfig;
|
import com.zhitan.engine.entity.TagValue;
|
import com.zhitan.engine.entity.WritePoint;
|
import lombok.extern.slf4j.Slf4j;
|
import org.joda.time.DateTime;
|
import org.joda.time.Duration;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Repository;
|
|
import java.time.*;
|
import java.time.format.DateTimeFormatter;
|
import java.util.ArrayList;
|
import java.util.Collections;
|
import java.util.Date;
|
import java.util.List;
|
|
/**
|
* influxdb的基础服务
|
*
|
* @author Silence
|
* @version 1.0
|
*/
|
@Slf4j
|
@Repository
|
public class InfluxdbRepository {
|
|
protected InfluxDBConfig config;
|
protected InfluxDBClient client;
|
|
@Value("${influxdb.bucket}")
|
private String database;
|
|
public InfluxdbRepository(InfluxDBConfig config) {
|
this.config = config;
|
connectInfluxDB();
|
}
|
|
private void connectInfluxDB() {
|
char[] token = config.getToken().toCharArray();
|
if (client == null) {
|
client = InfluxDBClientFactory.create(config.getHost(), token, config.getOrg(),
|
config.getBucket());
|
}
|
log.error("--------------------实时数据库连接成功--------------------");
|
HealthCheck health = client.health();
|
if (health.getStatus() == HealthCheck.StatusEnum.FAIL) {
|
client.close();
|
client = InfluxDBClientFactory.create(config.getHost(), token, config.getOrg(),
|
config.getBucket());
|
}
|
}
|
|
public void store(List<TagValue> tagValues) {
|
WriteApiBlocking write = client.getWriteApiBlocking();
|
List<WritePoint> writePoints = new ArrayList<>();
|
tagValues.forEach(tagValue -> {
|
WritePoint point = new WritePoint(tagValue.getTagCode(), tagValue.getValue(),
|
Instant.ofEpochMilli(tagValue.getDataTime().getTime()));
|
writePoints.add(point);
|
});
|
write.writeMeasurements(WritePrecision.MS, writePoints);
|
}
|
|
public TagValue query(String tagCode, Date time) {
|
List<TagValue> values = query(Collections.singletonList(tagCode), time);
|
return !values.isEmpty() ? values.get(0) : new TagValue();
|
}
|
|
public TagValue query(String tagCode) {
|
List<TagValue> values = query(Collections.singletonList(tagCode));
|
return !values.isEmpty() ? values.get(0) : new TagValue();
|
}
|
|
public List<TagValue> query(List<String> tagCodes, Date time) {
|
DateTime endTime = new DateTime(time);
|
DateTime beginTime = endTime.plusDays(-1);
|
return getTagValues(beginTime, endTime, tagCodes);
|
}
|
|
public List<TagValue> query(List<String> tagCodes) {
|
DateTime beginTime = DateTime.now().plusDays(-1);
|
DateTime endTime = DateTime.now();
|
return getTagValues(beginTime, endTime, tagCodes);
|
}
|
//
|
// /**
|
// * 获取一个小时内所有数据
|
// *
|
// * @param tagCodes 测点编号
|
// * @param time 开始时间
|
// * @return 测点统计结果
|
// */
|
// public List<TagValue> queryOneHour(List<String> tagCodes, Date time) {
|
// DateTime beginTime = new DateTime(time);
|
// DateTime endTime = beginTime.plusHours(1);
|
// return getHistoryData(tagCodes, beginTime.toDate(), endTime.toDate());
|
// }
|
|
private List<TagValue> getHistoryData(List<String> tagCodes, LocalDateTime beginTime, LocalDateTime endTime) {
|
// 将 beginTime 转换为 java.util.Date 对象
|
Date o = Date.from(beginTime.atZone(ZoneId.systemDefault()).toInstant());
|
DateTime begin = new DateTime(o);
|
Date o1 = Date.from(endTime.atZone(ZoneId.systemDefault()).toInstant());
|
DateTime end = new DateTime(o1);
|
StringBuilder timeRange = new StringBuilder()
|
.append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString())
|
.append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString())
|
.append(")");
|
StringBuilder fluxSql = getStringBuilder(timeRange,
|
tagCodes);
|
log.info(String.valueOf(fluxSql));
|
return getTagValues(fluxSql);
|
}
|
|
public List<TagValue> getHistoryData(List<String> tagCodes, Date beginTime, Date endTime,
|
long interval) {
|
DateTime begin = new DateTime(beginTime);
|
DateTime end = new DateTime(endTime);
|
StringBuilder timeRange = new StringBuilder()
|
.append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString())
|
.append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString())
|
.append(")");
|
StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes);
|
fluxSql.append("|> aggregateWindow(every: ").append(interval)
|
.append("s, fn: last, createEmpty: false)");
|
log.info(String.valueOf(fluxSql));
|
return getTagValues(fluxSql);
|
}
|
|
/**
|
* 获取指定标签代码在时间范围内的历史数据,并按指定点数计算数据间隔
|
*
|
* @param tagCodes 标签代码列表,用于标识需要获取历史数据的设备/变量
|
* @param beginTime 数据查询的起始时间(包含)
|
* @param endTime 数据查询的结束时间(包含)
|
* @param pointCount 期望获取的数据点数,用于自动计算采样间隔。实际点数可能受存储限制影响
|
* @return List<TagValue> 包含时间序列数据的对象集合,按时间顺序排列
|
*
|
* 方法逻辑:
|
* 1. 将Date类型参数转换为Joda-Time的DateTime类型以便计算时间间隔
|
* 2. 计算总时间跨度(毫秒),根据点数要求转换为秒级间隔
|
* 3. 调用底层方法获取按计算间隔采样的历史数据
|
*/
|
public List<TagValue> getHistoryData(List<String> tagCodes, Date beginTime, Date endTime,
|
int pointCount) {
|
// 转换时间对象为Joda-Time类型(兼容旧Date类型参数)
|
DateTime begin = new DateTime(beginTime);
|
DateTime end = new DateTime(endTime);
|
|
// 计算时间总跨度并转换为秒级采样间隔
|
long millis = new Duration(begin, end).getMillis();
|
long interval = millis / pointCount / 1000;
|
|
// 委托给核心方法执行实际数据查询
|
return getHistoryData(tagCodes, beginTime, endTime, interval);
|
}
|
|
|
private StringBuilder getStringBuilder(StringBuilder timeRange,
|
List<String> tagCodes) {
|
StringBuilder fluxSql = new StringBuilder();
|
fluxSql.append("from(bucket: \"").append(config.getBucket()).append("\")")
|
.append(timeRange).append("|> filter(fn: (r) => r[\"_measurement\"] == \"")
|
.append(config.getMeasurement()).append("\")");
|
fluxSql.append("|> filter(fn: (r) => r[\"_field\"] == \"value\")");
|
if (!tagCodes.isEmpty()) {
|
fluxSql.append("|> filter(fn: (r) => r[\"tag\"] =~ /");
|
List<String> filter = new ArrayList<>(tagCodes);
|
fluxSql.append(String.join("|", filter));
|
fluxSql.append("/)");
|
}
|
fluxSql.append("|> group(columns: [\"tag\"])");
|
log.info(String.valueOf(fluxSql));
|
return fluxSql;
|
}
|
|
private List<TagValue> getTagValues(StringBuilder fluxSql) {
|
QueryApi query = client.getQueryApi();
|
|
List<FluxTable> tables = query.query(fluxSql.toString());
|
|
List<TagValue> values = new ArrayList<>();
|
for (FluxTable fluxTable : tables) {
|
List<FluxRecord> records = fluxTable.getRecords();
|
for (FluxRecord fluxRecord : records) {
|
String tag = String.valueOf(fluxRecord.getValueByKey("tag"));
|
Double value = (Double) fluxRecord.getValueByKey("_value");
|
Date time = fluxRecord.getTime() == null ? null
|
: new DateTime(fluxRecord.getTime().toString()).toDate();
|
TagValue tagValue = new TagValue();
|
tagValue.setValue(value);
|
tagValue.setDataTime(time);
|
tagValue.setTagCode(tag);
|
tagValue.setQuality(Quality.GOOD);
|
values.add(tagValue);
|
}
|
}
|
return values;
|
}
|
|
public List<TagValue> statistics(List<String> tagCodes, Date beginTime, Date endTime,
|
CollectionModes queryType) {
|
DateTime begin = new DateTime(beginTime);
|
DateTime end = new DateTime(endTime);
|
return getTagValues(begin, end, tagCodes, queryType);
|
}
|
|
public List<TagValue> statistics(List<String> tagCodes, Date beginTime, Date endTime,
|
CollectionModes queryType, GroupTimeType timeType) {
|
DateTime begin = new DateTime(beginTime);
|
DateTime end = new DateTime(endTime);
|
StringBuilder timeRange = new StringBuilder()
|
.append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString())
|
.append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString())
|
.append(")");
|
StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes);
|
fluxSql.append("|> aggregateWindow(every: 1")
|
.append(timeType.name())
|
.append(", fn: ")
|
.append(queryType.name())
|
.append(", createEmpty: false)")
|
.append("|>yield(name: \"")
|
.append(queryType.name())
|
.append("\")");
|
log.info(String.valueOf(fluxSql));
|
return getTagValues(fluxSql);
|
}
|
|
private List<TagValue> getTagValues(DateTime begin, DateTime end, List<String> tagCodes) {
|
return getTagValues(begin, end, tagCodes, CollectionModes.last);
|
}
|
|
private List<TagValue> getTagValues(DateTime begin, DateTime end, List<String> tagCodes,
|
CollectionModes queryType) {
|
StringBuilder timeRange = new StringBuilder()
|
.append("|> range(start: ").append(Instant.ofEpochMilli(begin.getMillis()).toString())
|
.append(", stop: ").append(Instant.ofEpochMilli(end.getMillis()).toString())
|
.append(")");
|
StringBuilder fluxSql = getStringBuilder(timeRange, tagCodes);
|
fluxSql.append("|> ");
|
fluxSql.append(queryType.name());
|
fluxSql.append("()");
|
log.info(String.valueOf(fluxSql));
|
return getTagValues(fluxSql);
|
}
|
|
/**
|
* 解析查询结果中的数值
|
*
|
* @param tables 查询结果
|
* @return 数值列表
|
*/
|
public List<Double> getFieldValues(List<TagValue> tables) {
|
if (tables == null || tables.isEmpty()) {
|
return null;
|
}
|
|
List<Double> values = new ArrayList<>();
|
for (TagValue tagValue : tables) {
|
|
Double value = tagValue.getValue();
|
if (value != null) {
|
values.add(value);
|
}
|
|
}
|
|
return values.isEmpty() ? null : values;
|
}
|
|
/**
|
* 计算指定时间范围内的电能累计值
|
*
|
* @param field 字段 00601925032100026643_5_ActiveZT
|
* @param startTime 开始时间
|
* @param endTime 结束时间
|
* @return 累计值
|
*/
|
public Double calculateAccumulatedValue(String field, LocalDateTime startTime, LocalDateTime endTime) {
|
try {
|
List<String> tagCodes = new ArrayList<>();
|
tagCodes.add(field);
|
List<TagValue> tables = getHistoryData(tagCodes, startTime, endTime);
|
List<Double> values = getFieldValues(tables);
|
|
if (values == null || values.isEmpty()) {
|
log.warn("未查询到数据,返回默认值0.0,measurement={}, tag={},startTime={}, endTime={}",
|
config.getMeasurement(), field, startTime, endTime);
|
return 0.0;
|
}
|
|
// 计算累计值(最后一个值减去第一个值)
|
Object firstValue = values.get(0);
|
Object lastValue = values.get(values.size() - 1);
|
|
if (firstValue instanceof Number && lastValue instanceof Number) {
|
double result = ((Number) lastValue).doubleValue() - ((Number) firstValue).doubleValue();
|
log.info("计算累计值成功:measurement={}, tag={}, 起始值={}, 结束值={}, 累计值={}",
|
config.getMeasurement(), field, firstValue, lastValue, result);
|
return result;
|
} else {
|
log.warn("数据类型不是数值类型,返回默认值0.0,firstValue={}, lastValue={}", firstValue, lastValue);
|
return 0.0;
|
}
|
} catch (Exception e) {
|
log.error("计算累计值异常,返回默认值0.0,measurement={}, tag={}, error={}",
|
config.getMeasurement(), field, e.getMessage(), e);
|
return 0.0;
|
}
|
}
|
}
|