From 098d3347a0df808908aab8c554cd7c4febc5e6d9 Mon Sep 17 00:00:00 2001
From: 疯狂的狮子Li <15040126243@163.com>
Date: 星期一, 26 八月 2024 11:43:59 +0800
Subject: [PATCH] !577 发布 5.2.2 正式版 安全性提升 Merge pull request !577 from 疯狂的狮子Li/dev

---
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java |  160 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 160 insertions(+), 0 deletions(-)

diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
new file mode 100644
index 0000000..1d37a27
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
@@ -0,0 +1,160 @@
+package org.dromara.common.sse.core;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.redis.utils.RedisUtils;
+import org.dromara.common.sse.dto.SseMessageDto;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+/**
+ * 绠$悊 Server-Sent Events (SSE) 杩炴帴
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class SseEmitterManager {
+
+    /**
+     * 璁㈤槄鐨勯閬�
+     */
+    private final static String SSE_TOPIC = "global:sse";
+
+    private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+    /**
+     * 寤虹珛涓庢寚瀹氱敤鎴风殑 SSE 杩炴帴
+     *
+     * @param userId 鐢ㄦ埛鐨勫敮涓�鏍囪瘑绗︼紝鐢ㄤ簬鍖哄垎涓嶅悓鐢ㄦ埛鐨勮繛鎺�
+     * @param token  鐢ㄦ埛鐨勫敮涓�浠ょ墝锛岀敤浜庤瘑鍒叿浣撶殑杩炴帴
+     * @return 杩斿洖涓�涓� SseEmitter 瀹炰緥锛屽鎴风鍙互閫氳繃璇ュ疄渚嬫帴鏀� SSE 浜嬩欢
+     */
+    public SseEmitter connect(Long userId, String token) {
+        // 浠� USER_TOKEN_EMITTERS 涓幏鍙栨垨鍒涘缓褰撳墠鐢ㄦ埛鐨� SseEmitter 鏄犲皠琛紙ConcurrentHashMap锛�
+        // 姣忎釜鐢ㄦ埛鍙互鏈夊涓� SSE 杩炴帴锛岄�氳繃 token 杩涜鍖哄垎
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+
+        // 鍒涘缓涓�涓柊鐨� SseEmitter 瀹炰緥锛岃秴鏃舵椂闂磋缃负 0 琛ㄧず鏃犻檺鍒�
+        SseEmitter emitter = new SseEmitter(0L);
+
+        emitters.put(token, emitter);
+
+        // 褰� emitter 瀹屾垚銆佽秴鏃舵垨鍙戠敓閿欒鏃讹紝浠庢槧灏勮〃涓Щ闄ゅ搴旂殑 token
+        emitter.onCompletion(() -> emitters.remove(token));
+        emitter.onTimeout(() -> emitters.remove(token));
+        emitter.onError((e) -> emitters.remove(token));
+
+        try {
+            // 鍚戝鎴风鍙戦�佷竴鏉¤繛鎺ユ垚鍔熺殑浜嬩欢
+            emitter.send(SseEmitter.event().comment("connected"));
+        } catch (IOException e) {
+            // 濡傛灉鍙戦�佹秷鎭け璐ワ紝鍒欎粠鏄犲皠琛ㄤ腑绉婚櫎 emitter
+            emitters.remove(token);
+        }
+        return emitter;
+    }
+
+    /**
+     * 鏂紑鎸囧畾鐢ㄦ埛鐨� SSE 杩炴帴
+     *
+     * @param userId 鐢ㄦ埛鐨勫敮涓�鏍囪瘑绗︼紝鐢ㄤ簬鍖哄垎涓嶅悓鐢ㄦ埛鐨勮繛鎺�
+     * @param token  鐢ㄦ埛鐨勫敮涓�浠ょ墝锛岀敤浜庤瘑鍒叿浣撶殑杩炴帴
+     */
+    public void disconnect(Long userId, String token) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (emitters != null) {
+            try {
+                emitters.get(token).send(SseEmitter.event().comment("disconnected"));
+            } catch (Exception ignore) {
+            }
+            emitters.remove(token);
+        }
+    }
+
+    /**
+     * 璁㈤槄SSE娑堟伅涓婚锛屽苟鎻愪緵涓�涓秷璐硅�呭嚱鏁版潵澶勭悊鎺ユ敹鍒扮殑娑堟伅
+     *
+     * @param consumer 澶勭悊SSE娑堟伅鐨勬秷璐硅�呭嚱鏁�
+     */
+    public void subscribeMessage(Consumer<SseMessageDto> consumer) {
+        RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
+    }
+
+    /**
+     * 鍚戞寚瀹氱殑鐢ㄦ埛浼氳瘽鍙戦�佹秷鎭�
+     *
+     * @param userId  瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public void sendMessage(Long userId, String message) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (emitters != null) {
+            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
+                try {
+                    entry.getValue().send(SseEmitter.event()
+                        .name("message")
+                        .data(message));
+                } catch (Exception e) {
+                    emitters.remove(entry.getKey());
+                }
+            }
+        }
+    }
+
+    /**
+     * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭�
+     *
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public void sendMessage(String message) {
+        for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
+            sendMessage(userId, message);
+        }
+    }
+
+    /**
+     * 鍙戝竷SSE璁㈤槄娑堟伅
+     *
+     * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄
+     */
+    public void publishMessage(SseMessageDto sseMessageDto) {
+        List<Long> unsentUserIds = new ArrayList<>();
+        // 褰撳墠鏈嶅姟鍐呯敤鎴�,鐩存帴鍙戦�佹秷鎭�
+        for (Long userId : sseMessageDto.getUserIds()) {
+            if (USER_TOKEN_EMITTERS.containsKey(userId)) {
+                sendMessage(userId, sseMessageDto.getMessage());
+                continue;
+            }
+            unsentUserIds.add(userId);
+        }
+        // 涓嶅湪褰撳墠鏈嶅姟鍐呯敤鎴�,鍙戝竷璁㈤槄娑堟伅
+        if (CollUtil.isNotEmpty(unsentUserIds)) {
+            SseMessageDto broadcastMessage = new SseMessageDto();
+            broadcastMessage.setMessage(sseMessageDto.getMessage());
+            broadcastMessage.setUserIds(unsentUserIds);
+            RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+                log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}",
+                    SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage());
+            });
+        }
+    }
+
+    /**
+     * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+     *
+     * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
+     */
+    public void publishAll(String message) {
+        SseMessageDto broadcastMessage = new SseMessageDto();
+        broadcastMessage.setMessage(message);
+        RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+            log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", SSE_TOPIC, message);
+        });
+    }
+}

--
Gitblit v1.9.3