From ee3525cfb26be97b126e07c1322c41fcc8b5014c Mon Sep 17 00:00:00 2001
From: 疯狂的狮子Li <15040126243@163.com>
Date: 星期五, 26 七月 2024 16:05:35 +0800
Subject: [PATCH] add 增加 ruoyi-common-sse 模块 支持SSE推送 比ws更轻量更稳定的推送
---
ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports | 1
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java | 21 +++
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java | 134 +++++++++++++++++++
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java | 48 ++++++
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java | 58 ++++++++
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java | 28 ++++
ruoyi-common/ruoyi-common-bom/pom.xml | 7 +
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java | 52 +++++++
ruoyi-modules/ruoyi-system/pom.xml | 5
ruoyi-common/ruoyi-common-sse/pom.xml | 36 +++++
ruoyi-common/pom.xml | 1
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java | 29 ++++
12 files changed, 420 insertions(+), 0 deletions(-)
diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml
index 45493d3..2930fd0 100644
--- a/ruoyi-common/pom.xml
+++ b/ruoyi-common/pom.xml
@@ -33,6 +33,7 @@
<module>ruoyi-common-encrypt</module>
<module>ruoyi-common-tenant</module>
<module>ruoyi-common-websocket</module>
+ <module>ruoyi-common-sse</module>
</modules>
<artifactId>ruoyi-common</artifactId>
diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml
index 5388d8c..19ca420 100644
--- a/ruoyi-common/ruoyi-common-bom/pom.xml
+++ b/ruoyi-common/ruoyi-common-bom/pom.xml
@@ -172,6 +172,13 @@
<version>${revision}</version>
</dependency>
+ <!-- SSE妯″潡 -->
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-sse</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml
new file mode 100644
index 0000000..ae44c98
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ruoyi-common-sse</artifactId>
+
+ <description>
+ ruoyi-common-sse 妯″潡
+ </description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-redis</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-satoken</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-json</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
new file mode 100644
index 0000000..de5afa9
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
@@ -0,0 +1,28 @@
+package org.dromara.common.sse.config;
+
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.listener.SseTopicListener;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * @author Lion Li
+ */
+@AutoConfiguration
+@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
+@EnableConfigurationProperties(SseProperties.class)
+public class SseAutoConfiguration {
+
+ @Bean
+ public SseEmitterManager sseEmitterManager() {
+ return new SseEmitterManager();
+ }
+
+ @Bean
+ public SseTopicListener sseTopicListener() {
+ return new SseTopicListener();
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
new file mode 100644
index 0000000..ce4e173
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
@@ -0,0 +1,21 @@
+package org.dromara.common.sse.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * SSE 閰嶇疆椤�
+ *
+ * @author Lion Li
+ */
+@Data
+@ConfigurationProperties("sse")
+public class SseProperties {
+
+ private Boolean enabled;
+
+ /**
+ * 璺緞
+ */
+ private String path;
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
new file mode 100644
index 0000000..57c7c1e
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
@@ -0,0 +1,52 @@
+package org.dromara.common.sse.controller;
+
+import cn.dev33.satoken.stp.StpUtil;
+import lombok.RequiredArgsConstructor;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.satoken.utils.LoginHelper;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.dto.SseMessageDto;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.List;
+
+@RestController
+@RequiredArgsConstructor
+public class SseController {
+
+ private final SseEmitterManager sseEmitterManager;
+
+ @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public SseEmitter connect() {
+ String tokenValue = StpUtil.getTokenValue();
+ Long userId = LoginHelper.getUserId();
+ return sseEmitterManager.connect(userId, tokenValue);
+ }
+
+ @GetMapping(value = "${sse.path}/close")
+ public R<Void> close() {
+ String tokenValue = StpUtil.getTokenValue();
+ Long userId = LoginHelper.getUserId();
+ sseEmitterManager.disconnect(userId, tokenValue);
+ return R.ok();
+ }
+
+ @GetMapping(value = "${sse.path}/send")
+ public R<Void> send(Long userId, String msg) {
+ SseMessageDto dto = new SseMessageDto();
+ dto.setUserIds(List.of(userId));
+ dto.setMessage(msg);
+ sseEmitterManager.publishMessage(dto);
+ return R.ok();
+ }
+
+ @GetMapping(value = "${sse.path}/sendAll")
+ public R<Void> send(String msg) {
+ sseEmitterManager.publishAll(msg);
+ return R.ok();
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
new file mode 100644
index 0000000..4b56b69
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
@@ -0,0 +1,134 @@
+package org.dromara.common.sse.core;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.redis.utils.RedisUtils;
+import org.dromara.common.sse.dto.SseMessageDto;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SseEmitterManager {
+ /**
+ * 璁㈤槄鐨勯閬�
+ */
+ private final static String SSE_TOPIC = "global:sse";
+
+ private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+ public SseEmitter connect(Long userId, String token) {
+ Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+ SseEmitter emitter = new SseEmitter(0L);
+
+ emitters.put(token, emitter);
+
+ emitter.onCompletion(() -> emitters.remove(token));
+ emitter.onTimeout(() -> emitters.remove(token));
+
+ try {
+ emitter.send(SseEmitter.event().comment("connected"));
+ } catch (IOException e) {
+ emitters.remove(token);
+ }
+ return emitter;
+ }
+
+ public void disconnect(Long userId, String token) {
+ Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+ if (emitters != null) {
+ try {
+ emitters.get(token).send(SseEmitter.event().comment("disconnected"));
+ } catch (IOException ignore) {
+ }
+ emitters.remove(token);
+ }
+ }
+
+ /**
+ * 璁㈤槄SSE娑堟伅涓婚锛屽苟鎻愪緵涓�涓秷璐硅�呭嚱鏁版潵澶勭悊鎺ユ敹鍒扮殑娑堟伅
+ *
+ * @param consumer 澶勭悊SSE娑堟伅鐨勬秷璐硅�呭嚱鏁�
+ */
+ public void subscribeMessage(Consumer<SseMessageDto> consumer) {
+ RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
+ }
+
+ /**
+ * 鍚戞寚瀹氱殑鐢ㄦ埛浼氳瘽鍙戦�佹秷鎭�
+ *
+ * @param userId 瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+ * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+ */
+ public void sendMessage(Long userId, String message) {
+ Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+ if (emitters != null) {
+ for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
+ try {
+ entry.getValue().send(SseEmitter.event()
+ .name("message")
+ .reconnectTime(10000L)
+ .data(message));
+ } catch (Exception e) {
+ emitters.remove(entry.getKey());
+ }
+ }
+ }
+ }
+
+ /**
+ * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭�
+ *
+ * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+ */
+ public void sendMessage(String message) {
+ for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
+ sendMessage(userId, message);
+ }
+ }
+
+ /**
+ * 鍙戝竷SSE璁㈤槄娑堟伅
+ *
+ * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄
+ */
+ public void publishMessage(SseMessageDto sseMessageDto) {
+ List<Long> unsentUserIds = new ArrayList<>();
+ // 褰撳墠鏈嶅姟鍐呯敤鎴�,鐩存帴鍙戦�佹秷鎭�
+ for (Long userId : sseMessageDto.getUserIds()) {
+ if (USER_TOKEN_EMITTERS.containsKey(userId)) {
+ sendMessage(userId, sseMessageDto.getMessage());
+ continue;
+ }
+ unsentUserIds.add(userId);
+ }
+ // 涓嶅湪褰撳墠鏈嶅姟鍐呯敤鎴�,鍙戝竷璁㈤槄娑堟伅
+ if (CollUtil.isNotEmpty(unsentUserIds)) {
+ SseMessageDto broadcastMessage = new SseMessageDto();
+ broadcastMessage.setMessage(sseMessageDto.getMessage());
+ broadcastMessage.setUserIds(unsentUserIds);
+ RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+ log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}",
+ SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage());
+ });
+ }
+ }
+
+ /**
+ * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+ *
+ * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
+ */
+ public void publishAll(String message) {
+ SseMessageDto broadcastMessage = new SseMessageDto();
+ broadcastMessage.setMessage(message);
+ RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+ log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", SSE_TOPIC, message);
+ });
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
new file mode 100644
index 0000000..a2e1210
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
@@ -0,0 +1,29 @@
+package org.dromara.common.sse.dto;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 娑堟伅鐨刣to
+ *
+ * @author zendwang
+ */
+@Data
+public class SseMessageDto implements Serializable {
+
+ @Serial
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 闇�瑕佹帹閫佸埌鐨剆ession key 鍒楄〃
+ */
+ private List<Long> userIds;
+
+ /**
+ * 闇�瑕佸彂閫佺殑娑堟伅
+ */
+ private String message;
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
new file mode 100644
index 0000000..7a4dff1
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
@@ -0,0 +1,48 @@
+package org.dromara.common.sse.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+
+/**
+ * SSE 涓婚璁㈤槄鐩戝惉鍣�
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class SseTopicListener implements ApplicationRunner, Ordered {
+
+ @Autowired
+ private SseEmitterManager sseEmitterManager;
+
+ /**
+ * 鍦⊿pring Boot搴旂敤绋嬪簭鍚姩鏃跺垵濮嬪寲SSE涓婚璁㈤槄鐩戝惉鍣�
+ *
+ * @param args 搴旂敤绋嬪簭鍙傛暟
+ * @throws Exception 鍒濆鍖栬繃绋嬩腑鍙兘鎶涘嚭鐨勫紓甯�
+ */
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ sseEmitterManager.subscribeMessage((message) -> {
+ log.info("SSE涓婚璁㈤槄鏀跺埌娑堟伅session keys={} message={}", message.getUserIds(), message.getMessage());
+ // 濡傛灉key涓嶄负绌哄氨鎸夌収key鍙戞秷鎭� 濡傛灉涓虹┖灏辩兢鍙�
+ if (CollUtil.isNotEmpty(message.getUserIds())) {
+ message.getUserIds().forEach(key -> {
+ sseEmitterManager.sendMessage(key, message.getMessage());
+ });
+ } else {
+ sseEmitterManager.sendMessage(message.getMessage());
+ }
+ });
+ log.info("鍒濆鍖朣SE涓婚璁㈤槄鐩戝惉鍣ㄦ垚鍔�");
+ }
+
+ @Override
+ public int getOrder() {
+ return -1;
+ }
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
new file mode 100644
index 0000000..4334e98
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
@@ -0,0 +1,58 @@
+package org.dromara.common.sse.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.dto.SseMessageDto;
+
+/**
+ * 宸ュ叿绫�
+ *
+ * @author Lion Li
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class SseMessageUtils {
+
+ private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class);
+
+ /**
+ * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�佹秷鎭�
+ *
+ * @param userId 瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+ * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+ */
+ public static void sendMessage(Long userId, String message) {
+ MANAGER.sendMessage(userId, message);
+ }
+
+ /**
+ * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭�
+ *
+ * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+ */
+ public static void sendMessage(String message) {
+ MANAGER.sendMessage(message);
+ }
+
+ /**
+ * 鍙戝竷SSE璁㈤槄娑堟伅
+ *
+ * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄
+ */
+ public static void publishMessage(SseMessageDto sseMessageDto) {
+ MANAGER.publishMessage(sseMessageDto);
+ }
+
+ /**
+ * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+ *
+ * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
+ */
+ public static void publishAll(String message) {
+ MANAGER.publishAll(message);
+ }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000..b809713
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.common.sse.config.SseAutoConfiguration
diff --git a/ruoyi-modules/ruoyi-system/pom.xml b/ruoyi-modules/ruoyi-system/pom.xml
index acf33ce..0fc6d55 100644
--- a/ruoyi-modules/ruoyi-system/pom.xml
+++ b/ruoyi-modules/ruoyi-system/pom.xml
@@ -95,6 +95,11 @@
<artifactId>ruoyi-common-websocket</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.dromara</groupId>
+ <artifactId>ruoyi-common-sse</artifactId>
+ </dependency>
+
</dependencies>
</project>
--
Gitblit v1.9.3