package com.zhitan.service.impl; import com.influxdb.client.domain.WritePrecision; import com.influxdb.client.write.Point; import com.zhitan.config.influxdb.InfluxdbConfig; import com.zhitan.config.opc.OpcConfig; import com.zhitan.handler.OpcDataHandler; import com.zhitan.influxdb.InfluxdbRepository; import com.zhitan.model.entity.OpcData; import com.zhitan.service.IOpcService; import lombok.extern.slf4j.Slf4j; import org.eclipse.milo.opcua.sdk.client.OpcUaClient; import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig; import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfigBuilder; import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; import org.eclipse.milo.opcua.stack.client.DiscoveryClient; import org.eclipse.milo.opcua.stack.core.AttributeId; import org.eclipse.milo.opcua.stack.core.UaException; import org.eclipse.milo.opcua.stack.core.types.builtin.*; import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger; import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; import org.eclipse.milo.opcua.stack.core.types.structured.ReadResponse; import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.text.ParseException; import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; /** * OPC UA服务实现类 */ @Slf4j @Service public class OpcServiceImpl implements IOpcService { private final OpcConfig opcConfig; private final InfluxdbRepository influxdbRepository; private final InfluxdbConfig influxdbConfig; private final OpcDataHandler opcDataHandler; private OpcUaClient client; private final AtomicBoolean isRunning = new AtomicBoolean(false); private final Map nodeMap = new HashMap<>(); private final String TAG = "tag"; private final String FIELD_VALUE = "value"; @Autowired public OpcServiceImpl(OpcConfig opcConfig, InfluxdbRepository influxdbRepository, InfluxdbConfig influxdbConfig, OpcDataHandler opcDataHandler) { this.opcConfig = opcConfig; this.influxdbRepository = influxdbRepository; this.influxdbConfig = influxdbConfig; this.opcDataHandler = opcDataHandler; parseNodeConfig(); } /** * 解析节点配置 */ private void parseNodeConfig() { if (opcConfig.getNodes() != null) { for (String nodeConfig : opcConfig.getNodes()) { String[] parts = nodeConfig.split("=", 2); if (parts.length == 2) { String name = parts[0]; String nodeId = parts[1]; nodeMap.put(nodeId, name); log.info("已配置OPC UA节点: {} -> {}", name, nodeId); } } } } /** * 应用启动时自动连接并开始数据采集 */ @PostConstruct public void init() { if (opcConfig.isEnable()) { log.info("正在初始化OPC UA客户端..."); if (connect()) { startDataCollection(); } } else { log.info("OPC UA客户端已禁用"); } } /** * 应用关闭时断开连接 */ @PreDestroy public void destroy() { stopDataCollection(); disconnect(); } @Override public boolean connect() { if (client != null && isConnected()) { log.info("OPC UA客户端已连接"); return true; } try { log.info("正在连接OPC UA服务器: {}", opcConfig.getServerUrl()); // 发现端点 List endpoints = DiscoveryClient.getEndpoints(opcConfig.getServerUrl()).get(); EndpointDescription endpoint = endpoints.stream() .findFirst() .orElseThrow(() -> new Exception("未找到可用的OPC UA端点")); // 配置客户端 OpcUaClientConfigBuilder configBuilder = OpcUaClientConfig.builder() .setEndpoint(endpoint) .setRequestTimeout(UInteger.valueOf(opcConfig.getConnectionTimeout())); // 设置认证方式 if (opcConfig.getUsername() != null && !opcConfig.getUsername().isEmpty()) { configBuilder.setIdentityProvider(new UsernameProvider( opcConfig.getUsername(), opcConfig.getPassword() )); } else { configBuilder.setIdentityProvider(new AnonymousProvider()); } // 创建客户端并连接 client = OpcUaClient.create(configBuilder.build()); client.connect().get(); List namespaceUris = Arrays.asList(client.getNamespaceTable().toArray()); for (int i = 0; i < namespaceUris.size(); i++) { System.out.println("Namespace Index: " + i + ", URI: " + namespaceUris.get(i)); } log.info("OPC UA客户端连接成功"); return true; } catch (Exception e) { log.error("OPC UA客户端连接失败: {}", e.getMessage(), e); return false; } } @Override public void disconnect() { if (client != null) { try { client.disconnect().get(); log.info("OPC UA客户端已断开连接"); } catch (Exception e) { log.error("OPC UA客户端断开连接失败: {}", e.getMessage(), e); } finally { client = null; } } } @Override public OpcData readNode(String nodeId, String name) { log.info("正在读取节点: {},名称: {}", nodeId, name); OpcData data = new OpcData(); data.setNodeId(nodeId); data.setName(name); data.setTimestamp(Instant.now()); if (client == null || !isConnected()) { data.setErrorMessage("OPC UA客户端未连接"); return data; } try { // 创建读取请求 ReadValueId readValueId = new ReadValueId( NodeId.parse(nodeId), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE ); // 执行读取 ReadResponse response = client.read( 0.0, TimestampsToReturn.Both, List.of(readValueId) ).get(); // 处理结果 DataValue[] results = response.getResults(); if (results == null || results.length == 0) { data.setStatusCode(-1); data.setErrorMessage("读取失败: 响应中无结果数据"); return data; } DataValue value = results[0]; data.setStatusCode((int) value.getStatusCode().getValue()); if (value.getStatusCode().isGood()) { Variant variant = value.getValue(); if (variant != null && variant.getValue() != null) { Object rawValue = variant.getValue(); data.setDataType(rawValue.getClass().getSimpleName()); // 转换为Double if (rawValue instanceof Number) { data.setValue(((Number) rawValue).doubleValue()); } else if (rawValue instanceof Boolean) { data.setValue((Boolean) rawValue ? 1.0 : 0.0); } else { data.setValue(null); data.setErrorMessage("不支持的数据类型: " + rawValue.getClass().getName()); } } } else { data.setErrorMessage("读取失败: " + value.getStatusCode().toString()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 恢复中断状态 data.setErrorMessage("读取异常: " + e.getMessage()); log.error("读取OPC UA节点 {} 被中断: {}", nodeId, e.getMessage(), e); } catch (ExecutionException e) { data.setErrorMessage("读取异常: " + e.getCause() != null ? e.getCause().getMessage() : e.getMessage()); log.error("读取OPC UA节点 {} 失败: {}", nodeId, e.getMessage(), e); } catch (Exception e) { data.setErrorMessage("未知异常: " + e.getMessage()); log.error("读取OPC UA节点 {} 发生未知异常", nodeId, e); } return data; } @Override public List readAllNodes() { List results = new ArrayList<>(); log.info("开始读取所有节点时间: {}", Instant.now()); if (nodeMap.isEmpty()) { log.warn("没有配置OPC UA节点"); return results; } try { // 构建批量读取请求 List readValueIds = new ArrayList<>(); Map> indexMap = new HashMap<>(); int index = 0; for (Map.Entry entry : nodeMap.entrySet()) { ReadValueId readValueId = new ReadValueId( NodeId.parse(entry.getKey()), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE ); readValueIds.add(readValueId); indexMap.put(index++, entry); } // 执行批量读取 ReadResponse response = client.read(0.0, TimestampsToReturn.Both, readValueIds).get(); DataValue[] dataValues = response.getResults(); if (dataValues == null || dataValues.length != readValueIds.size()) { log.error("批量读取失败: 返回结果数量不匹配"); return results; } // 处理响应数据 for (int i = 0; i < dataValues.length; i++) { Map.Entry entry = indexMap.get(i); String nodeId = entry.getKey(); String name = entry.getValue(); DataValue value = dataValues[i]; OpcData data = new OpcData(); data.setNodeId(nodeId); data.setName(name); data.setTimestamp(Instant.now()); if (value.getStatusCode().isGood() && value.getValue() != null) { Variant variant = value.getValue(); Object rawValue = variant.getValue(); data.setDataType(rawValue.getClass().getSimpleName()); data.setStatusCode((int) value.getStatusCode().getValue()); if (rawValue instanceof Number) { data.setValue(((Number) rawValue).doubleValue()); } else if (rawValue instanceof Boolean) { data.setValue((Boolean) rawValue ? 1.0 : 0.0); } else { data.setValue(null); data.setErrorMessage("不支持的数据类型: " + rawValue.getClass().getName()); } } else { data.setStatusCode((int) value.getStatusCode().getValue()); data.setErrorMessage("读取失败: " + value.getStatusCode().toString()); } results.add(data); } } catch (Exception e) { log.error("批量读取OPC UA节点失败", e); } log.info("读取所有节点完成, 共计 {} 个节点, 时间: {}", results.size(), Instant.now()); return results; } @Override public void startDataCollection() { if (isRunning.compareAndSet(false, true)) { log.info("OPC UA数据采集已启动"); } } @Override public void stopDataCollection() { if (isRunning.compareAndSet(true, false)) { log.info("OPC UA数据采集已停止"); } } @Override public boolean isConnected() { if (client == null) { return false; } try { return client.getSession().get() != null; } catch (Exception e) { log.warn("获取OPC UA会话失败: {}", e.getMessage()); return false; } } /** * 定时采集数据并写入InfluxDB */ @Scheduled(fixedDelayString = "${opc.scan-rate}") public void collectAndStoreData() { if (!opcConfig.isEnable() || !isRunning.get()) { return; } if (!isConnected() && !connect()) { log.warn("OPC UA客户端未连接,无法采集数据"); return; } try { List dataList = readAllNodes(); List points = new ArrayList<>(); // 使用两种方式存储数据 // 1. 直接通过InfluxDB存储库写入 for (OpcData data : dataList) { if (data.getValue() != null) { // 如果是电压,四路共用,需要处理 1-1#yijidiankonggui_1_VoltageA if (data.getName().contains("_Voltage")) { String prefix = data.getName().split("_")[0]; String suffix = data.getName().split("_")[2]; // 循环四次 for (int i = 1; i <= 4; i++) { String name = prefix + "_" + i + "_" + suffix; Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, name) .addField(FIELD_VALUE, data.getValue()) .time(data.getTimestamp(), WritePrecision.NS); points.add(point); } } else { Point point = Point .measurement(influxdbConfig.getMeasurement()) .addTag(TAG, data.getName()) .addField(FIELD_VALUE, data.getValue()) .time(data.getTimestamp(), WritePrecision.NS); points.add(point); } log.debug("采集OPC UA数据: {} = {}", data.getName(), data.getValue()); } else if (data.getErrorMessage() != null) { log.warn("采集OPC UA数据失败: {} - {}", data.getName(), data.getErrorMessage()); } } if (!points.isEmpty()) { influxdbRepository.writePoints(points); log.info("成功写入 {} 条OPC UA数据到InfluxDB", points.size()); } // 2. 通过数据处理器使用现有的数据服务接口写入 opcDataHandler.handleOpcData(dataList); } catch (Exception e) { log.error("OPC UA数据采集异常: {}", e.getMessage(), e); } } }