From 95c01301f65379e7634e4619bf4c49186aa5be41 Mon Sep 17 00:00:00 2001 From: 疯狂的狮子Li <15040126243@163.com> Date: 星期五, 07 二月 2025 14:19:28 +0800 Subject: [PATCH] !644 同步修复一些问题 Merge pull request !644 from 疯狂的狮子Li/dev --- ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java | 71 +++++++++++++++++++++++------------ 1 files changed, 46 insertions(+), 25 deletions(-) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index 039e17f..64dfcff 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,20 +1,24 @@ package org.dromara.common.sse.core; -import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +/** + * 绠$悊 Server-Sent Events (SSE) 杩炴帴 + * + * @author Lion Li + */ @Slf4j public class SseEmitterManager { + /** * 璁㈤槄鐨勯閬� */ @@ -22,32 +26,59 @@ private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + /** + * 寤虹珛涓庢寚瀹氱敤鎴风殑 SSE 杩炴帴 + * + * @param userId 鐢ㄦ埛鐨勫敮涓�鏍囪瘑绗︼紝鐢ㄤ簬鍖哄垎涓嶅悓鐢ㄦ埛鐨勮繛鎺� + * @param token 鐢ㄦ埛鐨勫敮涓�浠ょ墝锛岀敤浜庤瘑鍒叿浣撶殑杩炴帴 + * @return 杩斿洖涓�涓� SseEmitter 瀹炰緥锛屽鎴风鍙互閫氳繃璇ュ疄渚嬫帴鏀� SSE 浜嬩欢 + */ public SseEmitter connect(Long userId, String token) { + // 浠� USER_TOKEN_EMITTERS 涓幏鍙栨垨鍒涘缓褰撳墠鐢ㄦ埛鐨� SseEmitter 鏄犲皠琛紙ConcurrentHashMap锛� + // 姣忎釜鐢ㄦ埛鍙互鏈夊涓� SSE 杩炴帴锛岄�氳繃 token 杩涜鍖哄垎 Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + + // 鍒涘缓涓�涓柊鐨� SseEmitter 瀹炰緥锛岃秴鏃舵椂闂磋缃负 0 琛ㄧず鏃犻檺鍒� SseEmitter emitter = new SseEmitter(0L); emitters.put(token, emitter); + // 褰� emitter 瀹屾垚銆佽秴鏃舵垨鍙戠敓閿欒鏃讹紝浠庢槧灏勮〃涓Щ闄ゅ搴旂殑 token emitter.onCompletion(() -> emitters.remove(token)); emitter.onTimeout(() -> emitters.remove(token)); emitter.onError((e) -> emitters.remove(token)); try { + // 鍚戝鎴风鍙戦�佷竴鏉¤繛鎺ユ垚鍔熺殑浜嬩欢 emitter.send(SseEmitter.event().comment("connected")); } catch (IOException e) { + // 濡傛灉鍙戦�佹秷鎭け璐ワ紝鍒欎粠鏄犲皠琛ㄤ腑绉婚櫎 emitter emitters.remove(token); } return emitter; } + /** + * 鏂紑鎸囧畾鐢ㄦ埛鐨� SSE 杩炴帴 + * + * @param userId 鐢ㄦ埛鐨勫敮涓�鏍囪瘑绗︼紝鐢ㄤ簬鍖哄垎涓嶅悓鐢ㄦ埛鐨勮繛鎺� + * @param token 鐢ㄦ埛鐨勫敮涓�浠ょ墝锛岀敤浜庤瘑鍒叿浣撶殑杩炴帴 + */ public void disconnect(Long userId, String token) { + if (userId == null || token == null) { + return; + } Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); - if (emitters != null) { + if (MapUtil.isNotEmpty(emitters)) { try { - emitters.get(token).send(SseEmitter.event().comment("disconnected")); + SseEmitter sseEmitter = emitters.get(token); + sseEmitter.send(SseEmitter.event().comment("disconnected")); + sseEmitter.complete(); } catch (Exception ignore) { } emitters.remove(token); + } else { + USER_TOKEN_EMITTERS.remove(userId); } } @@ -68,7 +99,7 @@ */ public void sendMessage(Long userId, String message) { Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); - if (emitters != null) { + if (MapUtil.isNotEmpty(emitters)) { for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { try { entry.getValue().send(SseEmitter.event() @@ -78,6 +109,8 @@ emitters.remove(entry.getKey()); } } + } else { + USER_TOKEN_EMITTERS.remove(userId); } } @@ -98,25 +131,13 @@ * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄 */ public void publishMessage(SseMessageDto sseMessageDto) { - List<Long> unsentUserIds = new ArrayList<>(); - // 褰撳墠鏈嶅姟鍐呯敤鎴�,鐩存帴鍙戦�佹秷鎭� - for (Long userId : sseMessageDto.getUserIds()) { - if (USER_TOKEN_EMITTERS.containsKey(userId)) { - sendMessage(userId, sseMessageDto.getMessage()); - continue; - } - unsentUserIds.add(userId); - } - // 涓嶅湪褰撳墠鏈嶅姟鍐呯敤鎴�,鍙戝竷璁㈤槄娑堟伅 - if (CollUtil.isNotEmpty(unsentUserIds)) { - SseMessageDto broadcastMessage = new SseMessageDto(); - broadcastMessage.setMessage(sseMessageDto.getMessage()); - broadcastMessage.setUserIds(unsentUserIds); - RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { - log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}", - SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); - }); - } + SseMessageDto broadcastMessage = new SseMessageDto(); + broadcastMessage.setMessage(sseMessageDto.getMessage()); + broadcastMessage.setUserIds(sseMessageDto.getUserIds()); + RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}", + SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); + }); } /** -- Gitblit v1.9.3