package com.zhitan.service.impl;
|
|
import com.influxdb.client.domain.WritePrecision;
|
import com.influxdb.client.write.Point;
|
import com.zhitan.config.influxdb.InfluxdbConfig;
|
import com.zhitan.config.opc.OpcConfig;
|
import com.zhitan.handler.OpcDataHandler;
|
import com.zhitan.influxdb.InfluxdbRepository;
|
import com.zhitan.model.entity.OpcData;
|
import com.zhitan.service.IOpcService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
|
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
|
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder;
|
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
|
import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
|
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
|
import org.eclipse.milo.opcua.stack.core.AttributeId;
|
import org.eclipse.milo.opcua.stack.core.UaException;
|
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
|
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
|
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
|
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
|
import org.eclipse.milo.opcua.stack.core.types.structured.ReadResponse;
|
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import javax.annotation.PreDestroy;
|
import java.text.ParseException;
|
import java.time.Instant;
|
import java.util.*;
|
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
/**
|
* OPC UA服务实现类
|
*/
|
@Slf4j
|
@Service
|
public class OpcServiceImpl implements IOpcService {
|
|
private final OpcConfig opcConfig;
|
private final InfluxdbRepository influxdbRepository;
|
private final InfluxdbConfig influxdbConfig;
|
private final OpcDataHandler opcDataHandler;
|
|
private OpcUaClient client;
|
private final AtomicBoolean isRunning = new AtomicBoolean(false);
|
private final Map<String, String> nodeMap = new HashMap<>();
|
private final String TAG = "tag";
|
private final String FIELD_VALUE = "value";
|
|
@Autowired
|
public OpcServiceImpl(OpcConfig opcConfig, InfluxdbRepository influxdbRepository,
|
InfluxdbConfig influxdbConfig, OpcDataHandler opcDataHandler) {
|
this.opcConfig = opcConfig;
|
this.influxdbRepository = influxdbRepository;
|
this.influxdbConfig = influxdbConfig;
|
this.opcDataHandler = opcDataHandler;
|
parseNodeConfig();
|
}
|
|
/**
|
* 解析节点配置
|
*/
|
private void parseNodeConfig() {
|
if (opcConfig.getNodes() != null) {
|
for (String nodeConfig : opcConfig.getNodes()) {
|
String[] parts = nodeConfig.split("=", 2);
|
if (parts.length == 2) {
|
String name = parts[0];
|
String nodeId = parts[1];
|
nodeMap.put(nodeId, name);
|
log.info("已配置OPC UA节点: {} -> {}", name, nodeId);
|
}
|
}
|
}
|
}
|
|
/**
|
* 应用启动时自动连接并开始数据采集
|
*/
|
@PostConstruct
|
public void init() {
|
if (opcConfig.isEnable()) {
|
log.info("正在初始化OPC UA客户端...");
|
if (connect()) {
|
startDataCollection();
|
}
|
} else {
|
log.info("OPC UA客户端已禁用");
|
}
|
}
|
|
/**
|
* 应用关闭时断开连接
|
*/
|
@PreDestroy
|
public void destroy() {
|
stopDataCollection();
|
disconnect();
|
}
|
|
@Override
|
public boolean connect() {
|
if (client != null && isConnected()) {
|
log.info("OPC UA客户端已连接");
|
return true;
|
}
|
|
try {
|
log.info("正在连接OPC UA服务器: {}", opcConfig.getServerUrl());
|
|
// 发现端点
|
List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(opcConfig.getServerUrl()).get();
|
EndpointDescription endpoint = endpoints.stream()
|
.findFirst()
|
.orElseThrow(() -> new Exception("未找到可用的OPC UA端点"));
|
|
// 配置客户端
|
OpcUaClientConfigBuilder configBuilder = OpcUaClientConfig.builder()
|
.setEndpoint(endpoint)
|
.setRequestTimeout(UInteger.valueOf(opcConfig.getConnectionTimeout()));
|
|
// 设置认证方式
|
if (opcConfig.getUsername() != null && !opcConfig.getUsername().isEmpty()) {
|
configBuilder.setIdentityProvider(new UsernameProvider(
|
opcConfig.getUsername(),
|
opcConfig.getPassword()
|
));
|
} else {
|
configBuilder.setIdentityProvider(new AnonymousProvider());
|
}
|
|
// 创建客户端并连接
|
client = OpcUaClient.create(configBuilder.build());
|
client.connect().get();
|
List<String> namespaceUris = Arrays.asList(client.getNamespaceTable().toArray());
|
for (int i = 0; i < namespaceUris.size(); i++) {
|
System.out.println("Namespace Index: " + i + ", URI: " + namespaceUris.get(i));
|
}
|
log.info("OPC UA客户端连接成功");
|
return true;
|
} catch (Exception e) {
|
log.error("OPC UA客户端连接失败: {}", e.getMessage(), e);
|
return false;
|
}
|
}
|
|
@Override
|
public void disconnect() {
|
if (client != null) {
|
try {
|
client.disconnect().get();
|
log.info("OPC UA客户端已断开连接");
|
} catch (Exception e) {
|
log.error("OPC UA客户端断开连接失败: {}", e.getMessage(), e);
|
} finally {
|
client = null;
|
}
|
}
|
}
|
|
@Override
|
public OpcData readNode(String nodeId, String name) {
|
log.info("正在读取节点: {},名称: {}", nodeId, name);
|
OpcData data = new OpcData();
|
data.setNodeId(nodeId);
|
data.setName(name);
|
data.setTimestamp(Instant.now());
|
|
if (client == null || !isConnected()) {
|
data.setErrorMessage("OPC UA客户端未连接");
|
return data;
|
}
|
|
try {
|
// 创建读取请求
|
ReadValueId readValueId = new ReadValueId(
|
NodeId.parse(nodeId),
|
AttributeId.Value.uid(),
|
null,
|
QualifiedName.NULL_VALUE
|
);
|
|
// 执行读取
|
ReadResponse response = client.read(
|
0.0,
|
TimestampsToReturn.Both,
|
List.of(readValueId)
|
).get();
|
|
// 处理结果
|
DataValue[] results = response.getResults();
|
if (results == null || results.length == 0) {
|
data.setStatusCode(-1);
|
data.setErrorMessage("读取失败: 响应中无结果数据");
|
return data;
|
}
|
|
DataValue value = results[0];
|
data.setStatusCode((int) value.getStatusCode().getValue());
|
|
if (value.getStatusCode().isGood()) {
|
Variant variant = value.getValue();
|
if (variant != null && variant.getValue() != null) {
|
Object rawValue = variant.getValue();
|
data.setDataType(rawValue.getClass().getSimpleName());
|
|
// 转换为Double
|
if (rawValue instanceof Number) {
|
data.setValue(((Number) rawValue).doubleValue());
|
} else if (rawValue instanceof Boolean) {
|
data.setValue((Boolean) rawValue ? 1.0 : 0.0);
|
} else {
|
data.setValue(null);
|
data.setErrorMessage("不支持的数据类型: " + rawValue.getClass().getName());
|
}
|
}
|
} else {
|
data.setErrorMessage("读取失败: " + value.getStatusCode().toString());
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt(); // 恢复中断状态
|
data.setErrorMessage("读取异常: " + e.getMessage());
|
log.error("读取OPC UA节点 {} 被中断: {}", nodeId, e.getMessage(), e);
|
} catch (ExecutionException e) {
|
data.setErrorMessage("读取异常: " + e.getCause() != null ? e.getCause().getMessage() : e.getMessage());
|
log.error("读取OPC UA节点 {} 失败: {}", nodeId, e.getMessage(), e);
|
} catch (Exception e) {
|
data.setErrorMessage("未知异常: " + e.getMessage());
|
log.error("读取OPC UA节点 {} 发生未知异常", nodeId, e);
|
}
|
|
return data;
|
}
|
|
|
@Override
|
public List<OpcData> readAllNodes() {
|
List<OpcData> results = new ArrayList<>();
|
log.info("开始读取所有节点时间: {}", Instant.now());
|
if (nodeMap.isEmpty()) {
|
log.warn("没有配置OPC UA节点");
|
return results;
|
}
|
|
try {
|
// 构建批量读取请求
|
List<ReadValueId> readValueIds = new ArrayList<>();
|
Map<Integer, Map.Entry<String, String>> indexMap = new HashMap<>();
|
|
int index = 0;
|
for (Map.Entry<String, String> entry : nodeMap.entrySet()) {
|
ReadValueId readValueId = new ReadValueId(
|
NodeId.parse(entry.getKey()),
|
AttributeId.Value.uid(),
|
null,
|
QualifiedName.NULL_VALUE
|
);
|
readValueIds.add(readValueId);
|
indexMap.put(index++, entry);
|
}
|
|
// 执行批量读取
|
ReadResponse response = client.read(0.0, TimestampsToReturn.Both, readValueIds).get();
|
DataValue[] dataValues = response.getResults();
|
|
if (dataValues == null || dataValues.length != readValueIds.size()) {
|
log.error("批量读取失败: 返回结果数量不匹配");
|
return results;
|
}
|
|
// 处理响应数据
|
for (int i = 0; i < dataValues.length; i++) {
|
Map.Entry<String, String> entry = indexMap.get(i);
|
String nodeId = entry.getKey();
|
String name = entry.getValue();
|
DataValue value = dataValues[i];
|
|
OpcData data = new OpcData();
|
data.setNodeId(nodeId);
|
data.setName(name);
|
data.setTimestamp(Instant.now());
|
|
if (value.getStatusCode().isGood() && value.getValue() != null) {
|
Variant variant = value.getValue();
|
Object rawValue = variant.getValue();
|
data.setDataType(rawValue.getClass().getSimpleName());
|
data.setStatusCode((int) value.getStatusCode().getValue());
|
|
if (rawValue instanceof Number) {
|
data.setValue(((Number) rawValue).doubleValue());
|
} else if (rawValue instanceof Boolean) {
|
data.setValue((Boolean) rawValue ? 1.0 : 0.0);
|
} else {
|
data.setValue(null);
|
data.setErrorMessage("不支持的数据类型: " + rawValue.getClass().getName());
|
}
|
} else {
|
data.setStatusCode((int) value.getStatusCode().getValue());
|
data.setErrorMessage("读取失败: " + value.getStatusCode().toString());
|
}
|
|
results.add(data);
|
}
|
|
} catch (Exception e) {
|
log.error("批量读取OPC UA节点失败", e);
|
}
|
log.info("读取所有节点完成, 共计 {} 个节点, 时间: {}", results.size(), Instant.now());
|
return results;
|
}
|
|
|
@Override
|
public void startDataCollection() {
|
if (isRunning.compareAndSet(false, true)) {
|
log.info("OPC UA数据采集已启动");
|
}
|
}
|
|
@Override
|
public void stopDataCollection() {
|
if (isRunning.compareAndSet(true, false)) {
|
log.info("OPC UA数据采集已停止");
|
}
|
}
|
@Override
|
public boolean isConnected() {
|
if (client == null) {
|
return false;
|
}
|
try {
|
return client.getSession().get() != null;
|
} catch (Exception e) {
|
log.warn("获取OPC UA会话失败: {}", e.getMessage());
|
return false;
|
}
|
}
|
|
|
|
/**
|
* 定时采集数据并写入InfluxDB
|
*/
|
@Scheduled(fixedDelayString = "${opc.scan-rate}")
|
public void collectAndStoreData() {
|
if (!opcConfig.isEnable() || !isRunning.get()) {
|
return;
|
}
|
|
if (!isConnected() && !connect()) {
|
log.warn("OPC UA客户端未连接,无法采集数据");
|
return;
|
}
|
|
try {
|
List<OpcData> dataList = readAllNodes();
|
List<Point> points = new ArrayList<>();
|
|
// 使用两种方式存储数据
|
// 1. 直接通过InfluxDB存储库写入
|
for (OpcData data : dataList) {
|
if (data.getValue() != null) {
|
// 如果是电压,四路共用,需要处理 1-1#yijidiankonggui_1_VoltageA
|
if (data.getName().contains("_Voltage")) {
|
String prefix = data.getName().split("_")[0];
|
String suffix = data.getName().split("_")[2];
|
// 循环四次
|
for (int i = 1; i <= 4; i++) {
|
String name = prefix + "_" + i + "_" + suffix;
|
Point point = Point
|
.measurement(influxdbConfig.getMeasurement())
|
.addTag(TAG, name)
|
.addField(FIELD_VALUE, data.getValue())
|
.time(data.getTimestamp(), WritePrecision.NS);
|
points.add(point);
|
}
|
} else {
|
Point point = Point
|
.measurement(influxdbConfig.getMeasurement())
|
.addTag(TAG, data.getName())
|
.addField(FIELD_VALUE, data.getValue())
|
.time(data.getTimestamp(), WritePrecision.NS);
|
points.add(point);
|
}
|
|
log.debug("采集OPC UA数据: {} = {}", data.getName(), data.getValue());
|
} else if (data.getErrorMessage() != null) {
|
log.warn("采集OPC UA数据失败: {} - {}", data.getName(), data.getErrorMessage());
|
}
|
}
|
|
if (!points.isEmpty()) {
|
influxdbRepository.writePoints(points);
|
log.info("成功写入 {} 条OPC UA数据到InfluxDB", points.size());
|
}
|
|
// 2. 通过数据处理器使用现有的数据服务接口写入
|
opcDataHandler.handleOpcData(dataList);
|
} catch (Exception e) {
|
log.error("OPC UA数据采集异常: {}", e.getMessage(), e);
|
}
|
}
|
}
|