package com.zhitan.influxdb;
|
|
import com.influxdb.LogLevel;
|
import com.influxdb.client.InfluxDBClient;
|
import com.influxdb.client.InfluxDBClientFactory;
|
import com.influxdb.client.WriteApiBlocking;
|
import com.influxdb.client.write.Point;
|
import com.influxdb.query.FluxRecord;
|
import com.influxdb.query.FluxTable;
|
import com.zhitan.config.influxdb.InfluxdbConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Repository;
|
|
import java.util.List;
|
|
/**
|
* influxdb的基础服务
|
*
|
* @author Silence
|
* @version 1.0
|
*/
|
@Slf4j
|
@Repository
|
public class InfluxdbRepository {
|
|
protected InfluxdbConfig config;
|
protected InfluxDBClient client;
|
|
@Autowired
|
public InfluxdbRepository(InfluxdbConfig config) {
|
this.config = config;
|
init();
|
}
|
|
/**
|
* 初始化
|
*/
|
private void init() {
|
if (config.isEnable()) {
|
if (null == client) {
|
client = InfluxDBClientFactory.create(config.getHost(), config.getToken().toCharArray(),
|
config.getOrg(), config.getBucket())
|
.enableGzip()
|
.setLogLevel(LogLevel.BASIC);
|
}
|
if (!client.ping()) {
|
log.error("实时库连接失败");
|
} else {
|
log.info("实时库连接成功");
|
}
|
} else {
|
log.debug("时序库不可用");
|
}
|
}
|
|
/**
|
* 写入单个点位
|
*/
|
public void writePoint(Point point) {
|
if (null == point) {
|
return;
|
}
|
WriteApiBlocking writeApi = client.getWriteApiBlocking();
|
writeApi.writePoint(point);
|
}
|
|
/**
|
* 写入多个点位
|
*/
|
public void writePoints(List<Point> points) {
|
if (null == points || points.isEmpty()) {
|
return;
|
}
|
WriteApiBlocking writeApi = client.getWriteApiBlocking();
|
writeApi.writePoints(points);
|
}
|
|
public double getLastPoint(String measurement, String tag, String s) {
|
if (client == null || !config.isEnable()) {
|
log.warn("InfluxDB client is not initialized or disabled.");
|
return 0;
|
}
|
|
String query = String.format("from(bucket: \"%s\") " +
|
"|> range(start: -1h) " +
|
"|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"value\") " +
|
"|> filter(fn: (r) => r.tag == \"%s\") " +
|
"|> last()", config.getBucket(), measurement, s);
|
|
List<FluxTable> tables = client.getQueryApi().query(query, config.getOrg());
|
|
if (tables != null && !tables.isEmpty()) {
|
List<FluxRecord> records = tables.get(0).getRecords();
|
if (records != null && !records.isEmpty()) {
|
FluxRecord record = records.get(0);
|
double value = (double) record.getValue();
|
|
return value;
|
}
|
}
|
|
log.warn("No data found for measurement: {}, tag: {}, field: {}", measurement, tag, s);
|
return 0;
|
}
|
}
|