From 457e59e61c9d7dbf4ef299c34aadd2c00fc0f1c9 Mon Sep 17 00:00:00 2001
From: 疯狂的狮子Li <15040126243@163.com>
Date: 星期四, 22 八月 2024 17:53:55 +0800
Subject: [PATCH] fix 修复 多线程对同一个session发送ws消息报错问题

---
 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java |   49 +++++++++++++++++++++++++++++--------------------
 1 files changed, 29 insertions(+), 20 deletions(-)

diff --git a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
index 28679e4..8c4170a 100644
--- a/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
+++ b/ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
@@ -4,7 +4,6 @@
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.dromara.common.core.domain.model.LoginUser;
 import org.dromara.common.redis.utils.RedisUtils;
 import org.dromara.common.websocket.dto.WebSocketMessageDto;
 import org.dromara.common.websocket.holder.WebSocketSessionHolder;
@@ -18,7 +17,6 @@
 import java.util.List;
 import java.util.function.Consumer;
 
-import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
 import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
 
 /**
@@ -31,10 +29,10 @@
 public class WebSocketUtils {
 
     /**
-     * 鍙戦�佹秷鎭�
+     * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�佹秷鎭�
      *
-     * @param sessionKey session涓婚敭 涓�鑸负鐢ㄦ埛id
-     * @param message    娑堟伅鏂囨湰
+     * @param sessionKey 瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+     * @param message    瑕佸彂閫佺殑娑堟伅鍐呭
      */
     public static void sendMessage(Long sessionKey, String message) {
         WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
@@ -42,18 +40,18 @@
     }
 
     /**
-     * 璁㈤槄娑堟伅
+     * 璁㈤槄WebSocket娑堟伅涓婚锛屽苟鎻愪緵涓�涓秷璐硅�呭嚱鏁版潵澶勭悊鎺ユ敹鍒扮殑娑堟伅
      *
-     * @param consumer 鑷畾涔夊鐞�
+     * @param consumer 澶勭悊WebSocket娑堟伅鐨勬秷璐硅�呭嚱鏁�
      */
     public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
         RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
     }
 
     /**
-     * 鍙戝竷璁㈤槄鐨勬秷鎭�
+     * 鍙戝竷WebSocket璁㈤槄娑堟伅
      *
-     * @param webSocketMessage 娑堟伅瀵硅薄
+     * @param webSocketMessage 瑕佸彂甯冪殑WebSocket娑堟伅瀵硅薄
      */
     public static void publishMessage(WebSocketMessageDto webSocketMessage) {
         List<Long> unsentSessionKeys = new ArrayList<>();
@@ -78,38 +76,49 @@
     }
 
     /**
-     * 鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+     * 鍚戞墍鏈夌殑WebSocket浼氳瘽鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
      *
-     * @param message 娑堟伅鍐呭
+     * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
      */
     public static void publishAll(String message) {
-        WebSocketSessionHolder.getSessionsAll().forEach(key -> {
-            WebSocketUtils.sendMessage(key, message);
-        });
         WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
         broadcastMessage.setMessage(message);
         RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
-            log.info(" WebSocket鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", WEB_SOCKET_TOPIC, message);
+            log.info("WebSocket鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", WEB_SOCKET_TOPIC, message);
         });
     }
 
+    /**
+     * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�丳ong娑堟伅
+     *
+     * @param session 瑕佸彂閫丳ong娑堟伅鐨刉ebSocket浼氳瘽
+     */
     public static void sendPongMessage(WebSocketSession session) {
         sendMessage(session, new PongMessage());
     }
 
+    /**
+     * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�佹枃鏈秷鎭�
+     *
+     * @param session WebSocket浼氳瘽
+     * @param message 瑕佸彂閫佺殑鏂囨湰娑堟伅鍐呭
+     */
     public static void sendMessage(WebSocketSession session, String message) {
         sendMessage(session, new TextMessage(message));
     }
 
-    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
+    /**
+     * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�乄ebSocket娑堟伅瀵硅薄
+     *
+     * @param session WebSocket浼氳瘽
+     * @param message 瑕佸彂閫佺殑WebSocket娑堟伅瀵硅薄
+     */
+    private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
         if (session == null || !session.isOpen()) {
-            log.error("[send] session浼氳瘽宸茬粡鍏抽棴");
+            log.warn("[send] session浼氳瘽宸茬粡鍏抽棴");
         } else {
             try {
-                // 鑾峰彇褰撳墠浼氳瘽涓殑鐢ㄦ埛
-                LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
                 session.sendMessage(message);
-                log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message);
             } catch (IOException e) {
                 log.error("[send] session({}) 鍙戦�佹秷鎭�({}) 寮傚父", session, message, e);
             }

--
Gitblit v1.9.3