package com.dingzhuo.energy.dataservice.data.influxdb; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.Instant; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import okhttp3.OkHttpClient; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.influxdb.BatchOptions; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.impl.InfluxDBMapper; import org.joda.time.DateTime; import org.joda.time.Duration; public class FastDB { private Logger logger = LogManager.getLogger(FastDB.class); private InfluxDB influxDB; private static final String TABLE_NAME = "daq"; private String dataBaseName; private String host; private int port; public FastDB(String host, int port, String dataBaseName) { this.dataBaseName = dataBaseName; this.host = host; this.port = port; try { this.open(); } catch (Exception e) { logger.fatal(e); } } private static InfluxDBMapper resultMapper; /** * 打开连接 */ private void open() { try { OkHttpClient.Builder builder = new OkHttpClient.Builder() .connectTimeout(30, TimeUnit.SECONDS) .readTimeout(60, TimeUnit.SECONDS) .retryOnConnectionFailure(true); influxDB = InfluxDBFactory.connect("http://" + this.host + ":" + this.port, builder); influxDB .enableBatch(BatchOptions.DEFAULTS.jitterDuration(500).actions(2000).flushDuration(100)); influxDB.enableGzip(); influxDB.ping(); resultMapper = new InfluxDBMapper(influxDB); logger.info("实时库连接成功!"); } catch (Exception ex) { logger.fatal("实时库连接失败!" + ex); } } /** * 关闭连接 */ public void close() { influxDB.close(); } /** * 根据点位号获取实时数据 * * @param tagCodes 点位号列表 * @returnS */ public List snapShot(List tagCodes) { final String sql = "SELECT LAST(value) as value, quality FROM %s WHERE %s and " + "time<=(now()) and time>=(now()-20m) GROUP BY code"; List> newList = Lists.partition(tagCodes, 10); List> taskList = new ArrayList<>(); ListeningExecutorService service = MoreExecutors.listeningDecorator(customerThreadPool()); for (List tags : newList) { ListenableFuture futureTask = service.submit(() -> { StringBuilder sb = new StringBuilder(); String codes = formatCode(tags); sb.append(String.format(sql, TABLE_NAME, codes)); return builderResultValue(sb.toString()); }); taskList.add(futureTask); } List results = buildResult(taskList); service.shutdown(); return results; } private ExecutorService customerThreadPool() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("fast-db-thread-pool-%d").build(); return new ThreadPoolExecutor(10, 50, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); } /** * 根据点位号获取某一时刻的历史数据 * * @param tagCodes 点位号集合 * @param dataTime 历史时刻 * @param timeCode 区分时间类型的time code * @return List */ public List getDataByTime(List tagCodes, Date dataTime, String timeCode) { DateTime begin = new DateTime(dataTime).plusDays(-1);; // if (timeCode.startsWith("M")) { // begin = new DateTime(dataTime).plusMonths(-1); // } else if (timeCode.startsWith("Y")) { // begin = new DateTime(dataTime).plusYears(-1); // } else { // begin = new DateTime(dataTime).plusDays(-1); // } DateTime end = new DateTime(dataTime); // logger.error("开始计算数据,timeCode=" + timeCode + "开始时间=" + begin.toString("yyyy-MM-dd HH:mm:ss") + "结束时间=" + end.toString("yyyy-MM-dd HH:mm:ss")); final String sql = "SELECT LAST(value) as value, quality FROM %s WHERE %s AND time >=%dms AND " + "time<=%dms GROUP BY code"; List> newList = Lists.partition(tagCodes, 10); List> taskList = new ArrayList<>(); ListeningExecutorService service = MoreExecutors.listeningDecorator(customerThreadPool()); for (List tags : newList) { ListenableFuture futureTask = service.submit(() -> { StringBuilder sb = new StringBuilder(); String codes = formatCode(tags); sb.append(String.format(sql, TABLE_NAME, codes, begin.toDate().getTime(), end.toDate().getTime() + 999)); return builderResultValue(sb.toString()); }); taskList.add(futureTask); } List results = buildResult(taskList); service.shutdown(); return results; } /** * 根据查询方式获取一段时间内的历史数据 * * @param tagCodes 点位号集合 * @param beginTime 开始时间 * @param endTime 结束时间 * @param queryType 查询方式(循环、拟合、全部) * @param pointCount 要查询的数据个数 * @return */ public List getHistoryData(List tagCodes, Date beginTime, Date endTime, QueryType queryType, int pointCount) { DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); long millis = new Duration(begin, end).getMillis(); List> newList = Lists.partition(tagCodes, 10); List> taskList = new ArrayList<>(); ListeningExecutorService service = MoreExecutors.listeningDecorator(customerThreadPool()); final String sqlFull = "select value,quality FROM %s WHERE %s AND time >=%dms AND time <=%dms " + "GROUP BY code"; final String sql = "select %s(value) as value, quality FROM %s WHERE %s AND time >=%dms " + "AND" + " time <=%dms GROUP BY code,time(%dms) fill(previous)"; for (List tags : newList) { ListenableFuture futureTask = service.submit(() -> { StringBuilder sb = new StringBuilder(); String codes = formatCode(tags); if (queryType != QueryType.FULL && pointCount > 0) { long interval = millis / pointCount; sb.append(String.format(sql, queryType.name(), TABLE_NAME, codes, begin.toDate().getTime(), end.toDate().getTime() + 999, interval)); } else { sb.append(String.format(sqlFull, TABLE_NAME, codes, begin.toDate().getTime(), end.toDate().getTime() + 999)); } return builderResultValue(sb.toString()); }); taskList.add(futureTask); } List results = buildResult(taskList); service.shutdown(); return results; } /** * 汇总实时数据 * * @param tagCodes 点位号集合 * @param beginTime 开始时间 * @param endTime 结束时间 * @param queryType 汇总方式(最大值、最小值、平均值、求和等) * @return */ public List statistics(List tagCodes, Date beginTime, Date endTime, QueryType queryType) throws ExecutionException, InterruptedException { if (queryType == QueryType.FULL) { return new ArrayList<>(); } DateTime begin = new DateTime(beginTime); DateTime end = new DateTime(endTime); List> newList = Lists.partition(tagCodes, 10); List> taskList = new ArrayList<>(); ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(50)); final String sql = "select %s(value) as value, quality FROM FROM %s WHERE %s AND time >=%dms " + "AND" + " time <=%dms GROUP BY code fill(previous)"; for (List tags : newList) { ListenableFuture futureTask = service.submit(() -> { StringBuilder sb = new StringBuilder(); String codes = formatCode(tags); sb.append(String.format(sql, queryType.name(), TABLE_NAME, codes, begin.toDate().getTime(), end.toDate().getTime() + 999)); return builderResultValue(sb.toString()); }); taskList.add(futureTask); } List results = buildResult(taskList); service.shutdown(); return results; } /** * 插入实时数据 * * @param tagValues 实时数据集合 * @return */ public Boolean storeData(List tagValues) { Instant now = Instant.now(); for (RtdbResult tagValue : tagValues) { tagValue.setTime(now); } return saveData(tagValues); } /** * 插入历史数据 * * @param tagValues 历史数据集合 * @return */ public Boolean insertData(List tagValues) { return saveData(tagValues); } /** * 删除测点 * * @param tagCode 点位号 * @return */ public Boolean deleteTag(String tagCode) { return false; } private Boolean saveData(List tagValues) { BatchPoints.Builder builder = BatchPoints .database(dataBaseName) .retentionPolicy("autogen").precision(TimeUnit.SECONDS); tagValues.forEach(tagValue -> { Point point = Point .measurement(TABLE_NAME) .addField("value", tagValue.getValue()) .addField("quality", tagValue.getQuality()) .tag("code", tagValue.getTagCode()) .time(tagValue.getTime().getEpochSecond(), TimeUnit.SECONDS).build(); builder.point(point); }); influxDB.write(builder.build()); return true; } private QueryResult builderResultValue(String sql) { try { // logger.error("计算数据的SQL是" + sql); Query query = new Query(sql, dataBaseName); return influxDB.query(query); } catch (Throwable throwable) { return new QueryResult(); } } private List buildResult(List> taskList) { List results = new ArrayList<>(); try { for (QueryResult queryResult : Futures.successfulAsList(taskList).get()) { try { synchronized (this) { if (queryResult != null) { List rtdbResults = resultMapper.toPOJO(queryResult, RtdbResult.class); results.addAll(rtdbResults); } } } catch (Throwable throwable) { } } } catch (InterruptedException | ExecutionException e) { } return results; } private String formatCode(List codeList) { return "(code='" + String.join("' or code='", codeList) + "')"; } }