From ee3525cfb26be97b126e07c1322c41fcc8b5014c Mon Sep 17 00:00:00 2001 From: 疯狂的狮子Li <15040126243@163.com> Date: 星期五, 26 七月 2024 16:05:35 +0800 Subject: [PATCH] add 增加 ruoyi-common-sse 模块 支持SSE推送 比ws更轻量更稳定的推送 --- ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports | 1 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java | 21 +++ ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java | 134 +++++++++++++++++++ ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java | 48 ++++++ ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java | 58 ++++++++ ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java | 28 ++++ ruoyi-common/ruoyi-common-bom/pom.xml | 7 + ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java | 52 +++++++ ruoyi-modules/ruoyi-system/pom.xml | 5 ruoyi-common/ruoyi-common-sse/pom.xml | 36 +++++ ruoyi-common/pom.xml | 1 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java | 29 ++++ 12 files changed, 420 insertions(+), 0 deletions(-) diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml index 45493d3..2930fd0 100644 --- a/ruoyi-common/pom.xml +++ b/ruoyi-common/pom.xml @@ -33,6 +33,7 @@ <module>ruoyi-common-encrypt</module> <module>ruoyi-common-tenant</module> <module>ruoyi-common-websocket</module> + <module>ruoyi-common-sse</module> </modules> <artifactId>ruoyi-common</artifactId> diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml index 5388d8c..19ca420 100644 --- a/ruoyi-common/ruoyi-common-bom/pom.xml +++ b/ruoyi-common/ruoyi-common-bom/pom.xml @@ -172,6 +172,13 @@ <version>${revision}</version> </dependency> + <!-- SSE妯″潡 --> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-sse</artifactId> + <version>${revision}</version> + </dependency> + </dependencies> </dependencyManagement> diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml new file mode 100644 index 0000000..ae44c98 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common</artifactId> + <version>${revision}</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>ruoyi-common-sse</artifactId> + + <description> + ruoyi-common-sse 妯″潡 + </description> + + <dependencies> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-core</artifactId> + </dependency> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-redis</artifactId> + </dependency> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-satoken</artifactId> + </dependency> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-json</artifactId> + </dependency> + </dependencies> +</project> diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java new file mode 100644 index 0000000..de5afa9 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java @@ -0,0 +1,28 @@ +package org.dromara.common.sse.config; + +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.listener.SseTopicListener; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; + +/** + * @author Lion Li + */ +@AutoConfiguration +@ConditionalOnProperty(value = "sse.enabled", havingValue = "true") +@EnableConfigurationProperties(SseProperties.class) +public class SseAutoConfiguration { + + @Bean + public SseEmitterManager sseEmitterManager() { + return new SseEmitterManager(); + } + + @Bean + public SseTopicListener sseTopicListener() { + return new SseTopicListener(); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java new file mode 100644 index 0000000..ce4e173 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java @@ -0,0 +1,21 @@ +package org.dromara.common.sse.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * SSE 閰嶇疆椤� + * + * @author Lion Li + */ +@Data +@ConfigurationProperties("sse") +public class SseProperties { + + private Boolean enabled; + + /** + * 璺緞 + */ + private String path; +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java new file mode 100644 index 0000000..57c7c1e --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java @@ -0,0 +1,52 @@ +package org.dromara.common.sse.controller; + +import cn.dev33.satoken.stp.StpUtil; +import lombok.RequiredArgsConstructor; +import org.dromara.common.core.domain.R; +import org.dromara.common.satoken.utils.LoginHelper; +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.dto.SseMessageDto; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.List; + +@RestController +@RequiredArgsConstructor +public class SseController { + + private final SseEmitterManager sseEmitterManager; + + @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter connect() { + String tokenValue = StpUtil.getTokenValue(); + Long userId = LoginHelper.getUserId(); + return sseEmitterManager.connect(userId, tokenValue); + } + + @GetMapping(value = "${sse.path}/close") + public R<Void> close() { + String tokenValue = StpUtil.getTokenValue(); + Long userId = LoginHelper.getUserId(); + sseEmitterManager.disconnect(userId, tokenValue); + return R.ok(); + } + + @GetMapping(value = "${sse.path}/send") + public R<Void> send(Long userId, String msg) { + SseMessageDto dto = new SseMessageDto(); + dto.setUserIds(List.of(userId)); + dto.setMessage(msg); + sseEmitterManager.publishMessage(dto); + return R.ok(); + } + + @GetMapping(value = "${sse.path}/sendAll") + public R<Void> send(String msg) { + sseEmitterManager.publishAll(msg); + return R.ok(); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java new file mode 100644 index 0000000..4b56b69 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -0,0 +1,134 @@ +package org.dromara.common.sse.core; + +import cn.hutool.core.collection.CollUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.redis.utils.RedisUtils; +import org.dromara.common.sse.dto.SseMessageDto; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +@Slf4j +public class SseEmitterManager { + /** + * 璁㈤槄鐨勯閬� + */ + private final static String SSE_TOPIC = "global:sse"; + + private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + + public SseEmitter connect(Long userId, String token) { + Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + SseEmitter emitter = new SseEmitter(0L); + + emitters.put(token, emitter); + + emitter.onCompletion(() -> emitters.remove(token)); + emitter.onTimeout(() -> emitters.remove(token)); + + try { + emitter.send(SseEmitter.event().comment("connected")); + } catch (IOException e) { + emitters.remove(token); + } + return emitter; + } + + public void disconnect(Long userId, String token) { + Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); + if (emitters != null) { + try { + emitters.get(token).send(SseEmitter.event().comment("disconnected")); + } catch (IOException ignore) { + } + emitters.remove(token); + } + } + + /** + * 璁㈤槄SSE娑堟伅涓婚锛屽苟鎻愪緵涓�涓秷璐硅�呭嚱鏁版潵澶勭悊鎺ユ敹鍒扮殑娑堟伅 + * + * @param consumer 澶勭悊SSE娑堟伅鐨勬秷璐硅�呭嚱鏁� + */ + public void subscribeMessage(Consumer<SseMessageDto> consumer) { + RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer); + } + + /** + * 鍚戞寚瀹氱殑鐢ㄦ埛浼氳瘽鍙戦�佹秷鎭� + * + * @param userId 瑕佸彂閫佹秷鎭殑鐢ㄦ埛id + * @param message 瑕佸彂閫佺殑娑堟伅鍐呭 + */ + public void sendMessage(Long userId, String message) { + Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); + if (emitters != null) { + for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { + try { + entry.getValue().send(SseEmitter.event() + .name("message") + .reconnectTime(10000L) + .data(message)); + } catch (Exception e) { + emitters.remove(entry.getKey()); + } + } + } + } + + /** + * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭� + * + * @param message 瑕佸彂閫佺殑娑堟伅鍐呭 + */ + public void sendMessage(String message) { + for (Long userId : USER_TOKEN_EMITTERS.keySet()) { + sendMessage(userId, message); + } + } + + /** + * 鍙戝竷SSE璁㈤槄娑堟伅 + * + * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄 + */ + public void publishMessage(SseMessageDto sseMessageDto) { + List<Long> unsentUserIds = new ArrayList<>(); + // 褰撳墠鏈嶅姟鍐呯敤鎴�,鐩存帴鍙戦�佹秷鎭� + for (Long userId : sseMessageDto.getUserIds()) { + if (USER_TOKEN_EMITTERS.containsKey(userId)) { + sendMessage(userId, sseMessageDto.getMessage()); + continue; + } + unsentUserIds.add(userId); + } + // 涓嶅湪褰撳墠鏈嶅姟鍐呯敤鎴�,鍙戝竷璁㈤槄娑堟伅 + if (CollUtil.isNotEmpty(unsentUserIds)) { + SseMessageDto broadcastMessage = new SseMessageDto(); + broadcastMessage.setMessage(sseMessageDto.getMessage()); + broadcastMessage.setUserIds(unsentUserIds); + RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}", + SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); + }); + } + } + + /** + * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂) + * + * @param message 瑕佸彂甯冪殑娑堟伅鍐呭 + */ + public void publishAll(String message) { + SseMessageDto broadcastMessage = new SseMessageDto(); + broadcastMessage.setMessage(message); + RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { + log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", SSE_TOPIC, message); + }); + } +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java new file mode 100644 index 0000000..a2e1210 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java @@ -0,0 +1,29 @@ +package org.dromara.common.sse.dto; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.List; + +/** + * 娑堟伅鐨刣to + * + * @author zendwang + */ +@Data +public class SseMessageDto implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** + * 闇�瑕佹帹閫佸埌鐨剆ession key 鍒楄〃 + */ + private List<Long> userIds; + + /** + * 闇�瑕佸彂閫佺殑娑堟伅 + */ + private String message; +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java new file mode 100644 index 0000000..7a4dff1 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java @@ -0,0 +1,48 @@ +package org.dromara.common.sse.listener; + +import cn.hutool.core.collection.CollUtil; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.sse.core.SseEmitterManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; + +/** + * SSE 涓婚璁㈤槄鐩戝惉鍣� + * + * @author Lion Li + */ +@Slf4j +public class SseTopicListener implements ApplicationRunner, Ordered { + + @Autowired + private SseEmitterManager sseEmitterManager; + + /** + * 鍦⊿pring Boot搴旂敤绋嬪簭鍚姩鏃跺垵濮嬪寲SSE涓婚璁㈤槄鐩戝惉鍣� + * + * @param args 搴旂敤绋嬪簭鍙傛暟 + * @throws Exception 鍒濆鍖栬繃绋嬩腑鍙兘鎶涘嚭鐨勫紓甯� + */ + @Override + public void run(ApplicationArguments args) throws Exception { + sseEmitterManager.subscribeMessage((message) -> { + log.info("SSE涓婚璁㈤槄鏀跺埌娑堟伅session keys={} message={}", message.getUserIds(), message.getMessage()); + // 濡傛灉key涓嶄负绌哄氨鎸夌収key鍙戞秷鎭� 濡傛灉涓虹┖灏辩兢鍙� + if (CollUtil.isNotEmpty(message.getUserIds())) { + message.getUserIds().forEach(key -> { + sseEmitterManager.sendMessage(key, message.getMessage()); + }); + } else { + sseEmitterManager.sendMessage(message.getMessage()); + } + }); + log.info("鍒濆鍖朣SE涓婚璁㈤槄鐩戝惉鍣ㄦ垚鍔�"); + } + + @Override + public int getOrder() { + return -1; + } +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java new file mode 100644 index 0000000..4334e98 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java @@ -0,0 +1,58 @@ +package org.dromara.common.sse.utils; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; +import org.dromara.common.sse.core.SseEmitterManager; +import org.dromara.common.sse.dto.SseMessageDto; + +/** + * 宸ュ叿绫� + * + * @author Lion Li + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class SseMessageUtils { + + private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class); + + /** + * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�佹秷鎭� + * + * @param userId 瑕佸彂閫佹秷鎭殑鐢ㄦ埛id + * @param message 瑕佸彂閫佺殑娑堟伅鍐呭 + */ + public static void sendMessage(Long userId, String message) { + MANAGER.sendMessage(userId, message); + } + + /** + * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭� + * + * @param message 瑕佸彂閫佺殑娑堟伅鍐呭 + */ + public static void sendMessage(String message) { + MANAGER.sendMessage(message); + } + + /** + * 鍙戝竷SSE璁㈤槄娑堟伅 + * + * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄 + */ + public static void publishMessage(SseMessageDto sseMessageDto) { + MANAGER.publishMessage(sseMessageDto); + } + + /** + * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂) + * + * @param message 瑕佸彂甯冪殑娑堟伅鍐呭 + */ + public static void publishAll(String message) { + MANAGER.publishAll(message); + } + +} diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..b809713 --- /dev/null +++ b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +org.dromara.common.sse.config.SseAutoConfiguration diff --git a/ruoyi-modules/ruoyi-system/pom.xml b/ruoyi-modules/ruoyi-system/pom.xml index acf33ce..0fc6d55 100644 --- a/ruoyi-modules/ruoyi-system/pom.xml +++ b/ruoyi-modules/ruoyi-system/pom.xml @@ -95,6 +95,11 @@ <artifactId>ruoyi-common-websocket</artifactId> </dependency> + <dependency> + <groupId>org.dromara</groupId> + <artifactId>ruoyi-common-sse</artifactId> + </dependency> + </dependencies> </project> -- Gitblit v1.9.3