From 07df3edd01d77d77d2653a6a9ee325e53e607955 Mon Sep 17 00:00:00 2001
From: bsw215583320 <baoshiwei121@163.com>
Date: 星期一, 04 十二月 2023 08:31:09 +0800
Subject: [PATCH] 增加opc设备维护和控制

---
 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java |  296 ++++++++++++++++++++++------------------------------------
 1 files changed, 114 insertions(+), 182 deletions(-)

diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java
index 6103217..8b908b6 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java
@@ -1,214 +1,146 @@
 package org.jeecg.modules.dry.websocket;
 
-import cn.hutool.core.bean.BeanUtil;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
 import lombok.extern.slf4j.Slf4j;
-import org.jeecg.common.constant.DrySocketConst;
-import org.jeecg.common.util.SpringContextUtils;
-import org.jeecg.modules.dry.service.IDryRealTimeDataService;
-import org.jeecg.modules.dry.vo.RealTimeDataVo;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.jeecg.common.constant.WebsocketConst;
 import org.springframework.stereotype.Component;
 
-import javax.websocket.OnClose;
-import javax.websocket.OnMessage;
-import javax.websocket.OnOpen;
-import javax.websocket.Session;
+import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
-import java.util.Collection;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
-@Slf4j
+/**
+ * @Author scott
+ * @Date 2019/11/29 9:41
+ * @Description: 姝ゆ敞瑙g浉褰撲簬璁剧疆璁块棶URL
+ */
 @Component
-@ServerEndpoint("/drySocket/{tenantId}/{machineId}")
+@Slf4j
+@ServerEndpoint("/drySocket/{tenantId}")
 public class DrySocket {
+    
+    /**绾跨▼瀹夊叏Map*/
+    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
 
-    /**
-     * 褰撳墠 session
-     */
-    private Session session;
-    /**
-     * 褰撳墠绉熸埛id
-     */
-    private String tenantId;
-    /**
-     * 璁惧id锛岀敤浜庢爣璇嗗悓涓�绉熸埛锛屼笉鍚岃澶囩殑鏁版嵁
-     */
-    private String machineId;
-    /**
-     * 褰撳墠socket鍞竴id
-     */
-    private String socketId;
 
-    /**
-     * 绉熸埛杩炴帴姹狅紝鍖呭惈鍗曚釜绉熸埛鐨勬墍鏈塻ocket杩炴帴锛�
-     * 鍥犱负涓�涓鎴峰彲鑳芥墦寮�澶氫釜璁惧锛屽涓澶囧氨浼氭湁澶氫釜杩炴帴锛�
-     * key鏄痶enantId锛寁alue鏄疢ap瀵硅薄锛涘瓙Map鐨刱ey鏄痬achineId锛寁alue鏄痙rySocket瀵硅薄
-     */
-    private static Map<String, Map<String, DrySocket>> tenantPool = new HashMap<>();
-    /**
-     * 杩炴帴姹狅紝鍖呭惈鎵�鏈塛ebSocket杩炴帴锛�
-     * key鏄痵ocketId锛寁alue鏄痙rySocket瀵硅薄
-     */
-    private static Map<String, DrySocket> machinePool = new HashMap<>();
 
-    /**
-     * 鑾峰彇鏌愪釜绉熸埛鎵�鏈夌殑璁惧
-     */
-    public static Map<String, DrySocket> getTenantPool(String tenantId) {
-        return tenantPool.computeIfAbsent(tenantId, k -> new HashMap<>(5));
-    }
 
-    /**
-     * 鍚戝綋鍓嶇鎴峰彂閫佹秷鎭�
-     *
-     * @param message 娑堟伅鍐呭
-     */
-    public void sendMessage(String message) {
-        try {
-            this.session.getAsyncRemote().sendText(message);
-        } catch (Exception e) {
-            log.error("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細" + e.getMessage());
-        }
-    }
-
-    /**
-     * 灏佽娑堟伅json
-     *
-     * @param data 娑堟伅鍐呭
-     */
-    public static String packageMessage(String type, Object data) {
-        JSONObject message = new JSONObject();
-        message.put(DrySocketConst.TYPE, type);
-        message.put(DrySocketConst.DATA, data);
-        return message.toJSONString();
-    }
-
-    /**
-     * 鍚戞寚瀹氱鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭�
-     *
-     * @param tenantId  鎺ユ敹娑堟伅鐨勭鎴稩D
-     * @param message 娑堟伅鍐呭
-     */
-    public static void sendMessageTo(String tenantId, String message) {
-        Collection<DrySocket> values = getTenantPool(tenantId).values();
-        if (values.size() > 0) {
-            for (DrySocket socketItem : values) {
-                socketItem.sendMessage(message);
-            }
-        } else {
-            log.warn("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細tenantId\"" + tenantId + "\"涓嶅瓨鍦ㄦ垨鏈湪绾匡紒");
-        }
-    }
-
-    /**
-     * 鍚戞寚瀹氱鎴风殑鎸囧畾璁惧鍙戦�佹秷鎭�
-     *
-     * @param tenantId  鎺ユ敹娑堟伅鐨勭鎴稩D
-     * @param message 娑堟伅鍐呭
-     */
-    public static void sendMessageTo(String tenantId, String machineId, String message) {
-        DrySocket socketItem = getTenantPool(tenantId).get(machineId);
-        if (socketItem != null) {
-            socketItem.sendMessage(message);
-        } else {
-            log.warn("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細tenantId\"" + tenantId + "\"鐨刴achineId\"" + machineId + "\"涓嶅瓨鍦ㄦ垨鏈湪绾匡紒");
-        }
-    }
-
-    /**
-     * 鍚戝涓鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭�
-     *
-     * @param tenantIds 鎺ユ敹娑堟伅鐨勭鎴稩D鏁扮粍
-     * @param message 娑堟伅鍐呭
-     */
-    public static void sendMessageTo(String[] tenantIds, String message) {
-        for (String tenantId : tenantIds) {
-            DrySocket.sendMessageTo(tenantId, message);
-        }
-    }
-
-    /**
-     * 鍚戞墍鏈夌鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭�
-     *
-     * @param message 娑堟伅鍐呭
-     */
-    public static void sendMessageToAll(String message) {
-        for (DrySocket socketItem : machinePool.values()) {
-            socketItem.sendMessage(message);
-        }
-    }
-
-    /**
-     * websocket 寮�鍚繛鎺�
-     */
+    //==========銆恮ebsocket鎺ュ彈銆佹帹閫佹秷鎭瓑鏂规硶 鈥斺�� 鍏蜂綋鏈嶅姟鑺傜偣鎺ㄩ�亀s娑堟伅銆�========================================================================================
     @OnOpen
-    public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("machineId") String machineId) {
+    public void onOpen(Session session, @PathParam(value = "tenantId") String tenantId) {
         try {
-            this.tenantId = tenantId;
-            this.machineId = machineId;
-            this.socketId = tenantId + machineId;
-            this.session = session;
-
-            machinePool.put(this.socketId, this);
-            getTenantPool(tenantId).put(this.machineId, this);
-
-            log.info("銆恉rySocket銆戞湁鏂扮殑杩炴帴锛屾�绘暟涓�:" + machinePool.size());
-        } catch (Exception ignored) {
+            sessionPool.put(tenantId, session);
+            log.info("銆愮郴缁� WebSocket銆戞湁鏂扮殑杩炴帴锛屾�绘暟涓�:" + sessionPool.size());
+        } catch (Exception e) {
         }
     }
 
-    /**
-     * websocket 鏂紑杩炴帴
-     */
     @OnClose
-    public void onClose() {
+    public void onClose(@PathParam("tenantId") String tenantId) {
         try {
-            machinePool.remove(this.socketId);
-            getTenantPool(this.tenantId).remove(this.machineId);
-
-            log.info("銆恉rySocket銆戣繛鎺ユ柇寮�锛屾�绘暟涓�:" + machinePool.size());
-        } catch (Exception ignored) {
+            sessionPool.remove(tenantId);
+            log.info("銆愮郴缁� WebSocket銆戣繛鎺ユ柇寮�锛屾�绘暟涓�:" + sessionPool.size());
+        } catch (Exception e) {
+            e.printStackTrace();
         }
     }
 
     /**
-     * websocket 鏀跺埌娑堟伅
+     * ws鎺ㄩ�佹秷鎭�
+     *
+     * @param tenantId
+     * @param message
+     */
+    public void pushMessage(String tenantId, String message) {
+        for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
+            //userId key鍊�= {鐢ㄦ埛id + "_"+ 鐧诲綍token鐨刴d5涓瞹
+            //TODO vue2鏈敼key鏂拌鍒欙紝鏆傛椂涓嶅奖鍝嶉�昏緫
+            if (item.getKey().contains(tenantId)) {
+                Session session = item.getValue();
+                try {
+                    //update-begin-author:taoyan date:20211012 for: websocket鎶ラ敊 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
+                    synchronized (session){
+                        log.info("銆愮郴缁� WebSocket銆戞帹閫佸崟浜烘秷鎭�:" + message);
+                        session.getBasicRemote().sendText(message);
+                    }
+                    //update-end-author:taoyan date:20211012 for: websocket鎶ラ敊 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
+                } catch (Exception e) {
+                    log.error(e.getMessage(),e);
+                }
+            }
+        }
+    }
+
+    /**
+     * ws閬嶅巻缇ゅ彂娑堟伅
+     */
+    public void pushMessage(String message) {
+        try {
+            for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
+                try {
+                    item.getValue().getAsyncRemote().sendText(message);
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            }
+            log.info("銆愮郴缁� WebSocket銆戠兢鍙戞秷鎭�:" + message);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
+
+    /**
+     * ws鎺ュ彈瀹㈡埛绔秷鎭�
      */
     @OnMessage
-    public void onMessage(String message) {
-        log.info("銆恉rySocket銆憃nMessage:" + message);
-        IDryRealTimeDataService realTimeDataService = SpringContextUtils.getBean(IDryRealTimeDataService.class);
-        JSONObject json;
-        try {
-            json = JSON.parseObject(message);
-        } catch (Exception e) {
-            log.warn("銆恉rySocket銆戞敹鍒颁笉鍚堟硶鐨勬秷鎭�:" + message);
-            return;
+    public void onMessage(String message, @PathParam(value = "tenantId") String tenantId) {
+        if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){
+            log.info("銆愮郴缁� WebSocket銆戞敹鍒板鎴风娑堟伅:" + message);
+        }else{
+            log.debug("銆愮郴缁� WebSocket銆戞敹鍒板鎴风娑堟伅:" + message);
         }
-        String type = json.getString(DrySocketConst.TYPE);
-        switch (type) {
-            // 蹇冭烦妫�娴�
-            case DrySocketConst.TYPE_HB:
-                this.sendMessage(DrySocket.packageMessage(type, true));
-                break;
-            // 瀹炴椂鏁版嵁澶勭悊
-            case DrySocketConst.TYPE_RDT:
-                Object o = json.get(DrySocketConst.DATA);
-                RealTimeDataVo realTimeDataVo = BeanUtil.toBean(o, RealTimeDataVo.class);
-                realTimeDataService.realTimeDataHandle(realTimeDataVo);
-                break;
-
-            default:
-                log.warn("銆恉rySocket銆戞敹鍒颁笉璇嗗埆鐨勬秷鎭被鍨�:" + type);
-                break;
-        }
-
-
+        
+//        //------------------------------------------------------------------------------
+//        JSONObject obj = new JSONObject();
+//        //涓氬姟绫诲瀷
+//        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
+//        //娑堟伅鍐呭
+//        obj.put(WebsocketConst.MSG_TXT, "蹇冭烦鍝嶅簲");
+//        this.pushMessage(userId, obj.toJSONString());
+//        //------------------------------------------------------------------------------
     }
 
+    /**
+     * 閰嶇疆閿欒淇℃伅澶勭悊
+     *
+     * @param session
+     * @param t
+     */
+    @OnError
+    public void onError(Session session, Throwable t) {
+        log.warn("銆愮郴缁� WebSocket銆戞秷鎭嚭鐜伴敊璇�");
+        t.printStackTrace();
+    }
+    //==========銆愮郴缁� WebSocket鎺ュ彈銆佹帹閫佹秷鎭瓑鏂规硶 鈥斺�� 鍏蜂綋鏈嶅姟鑺傜偣鎺ㄩ�亀s娑堟伅銆�========================================================================================
+    
 
-}
+    //==========銆愰噰鐢╮edis鍙戝竷璁㈤槄妯″紡鈥斺�旀帹閫佹秷鎭��========================================================================================
+
+
+
+    /**
+     * 姝や负鍗曠偣娑堟伅(澶氫汉) redis
+     *
+     * @param userIds
+     * @param message
+     */
+//    public void sendMessage(String[] userIds, String message) {
+//        for (String userId : userIds) {
+//            sendMessage(userId, message);
+//        }
+//    }
+    //=======銆愰噰鐢╮edis鍙戝竷璁㈤槄妯″紡鈥斺�旀帹閫佹秷鎭��==========================================================================================
+    
+}
\ No newline at end of file

--
Gitblit v1.9.3