package com.zhitan.influxdb; import com.influxdb.LogLevel; import com.influxdb.client.InfluxDBClient; import com.influxdb.client.InfluxDBClientFactory; import com.influxdb.client.WriteApiBlocking; import com.influxdb.client.write.Point; import com.influxdb.query.FluxRecord; import com.influxdb.query.FluxTable; import com.zhitan.config.influxdb.InfluxdbConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Repository; import java.util.List; /** * influxdb的基础服务 * * @author Silence * @version 1.0 */ @Slf4j @Repository public class InfluxdbRepository { protected InfluxdbConfig config; protected InfluxDBClient client; @Autowired public InfluxdbRepository(InfluxdbConfig config) { this.config = config; init(); } /** * 初始化 */ private void init() { if (config.isEnable()) { if (null == client) { client = InfluxDBClientFactory.create(config.getHost(), config.getToken().toCharArray(), config.getOrg(), config.getBucket()) .enableGzip() .setLogLevel(LogLevel.BASIC); } if (!client.ping()) { log.error("实时库连接失败"); } else { log.info("实时库连接成功"); } } else { log.debug("时序库不可用"); } } /** * 写入单个点位 */ public void writePoint(Point point) { if (null == point) { return; } WriteApiBlocking writeApi = client.getWriteApiBlocking(); writeApi.writePoint(point); } /** * 写入多个点位 */ public void writePoints(List points) { if (null == points || points.isEmpty()) { return; } WriteApiBlocking writeApi = client.getWriteApiBlocking(); writeApi.writePoints(points); } public double getLastPoint(String measurement, String tag, String s) { if (client == null || !config.isEnable()) { log.warn("InfluxDB client is not initialized or disabled."); return 0; } String query = String.format("from(bucket: \"%s\") " + "|> range(start: -1h) " + "|> filter(fn: (r) => r._measurement == \"%s\" and r._field == \"value\") " + "|> filter(fn: (r) => r.tag == \"%s\") " + "|> last()", config.getBucket(), measurement, s); List tables = client.getQueryApi().query(query, config.getOrg()); if (tables != null && !tables.isEmpty()) { List records = tables.get(0).getRecords(); if (records != null && !records.isEmpty()) { FluxRecord record = records.get(0); double value = (double) record.getValue(); return value; } } log.warn("No data found for measurement: {}, tag: {}, field: {}", measurement, tag, s); return 0; } }