From ee3525cfb26be97b126e07c1322c41fcc8b5014c Mon Sep 17 00:00:00 2001
From: 疯狂的狮子Li <15040126243@163.com>
Date: 星期五, 26 七月 2024 16:05:35 +0800
Subject: [PATCH] add 增加 ruoyi-common-sse 模块 支持SSE推送 比ws更轻量更稳定的推送

---
 ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports |    1 
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java                                      |   21 +++
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java                                    |  134 +++++++++++++++++++
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java                                 |   48 ++++++
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java                                     |   58 ++++++++
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java                               |   28 ++++
 ruoyi-common/ruoyi-common-bom/pom.xml                                                                                             |    7 +
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java                                  |   52 +++++++
 ruoyi-modules/ruoyi-system/pom.xml                                                                                                |    5 
 ruoyi-common/ruoyi-common-sse/pom.xml                                                                                             |   36 +++++
 ruoyi-common/pom.xml                                                                                                              |    1 
 ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java                                         |   29 ++++
 12 files changed, 420 insertions(+), 0 deletions(-)

diff --git a/ruoyi-common/pom.xml b/ruoyi-common/pom.xml
index 45493d3..2930fd0 100644
--- a/ruoyi-common/pom.xml
+++ b/ruoyi-common/pom.xml
@@ -33,6 +33,7 @@
         <module>ruoyi-common-encrypt</module>
         <module>ruoyi-common-tenant</module>
         <module>ruoyi-common-websocket</module>
+        <module>ruoyi-common-sse</module>
     </modules>
 
     <artifactId>ruoyi-common</artifactId>
diff --git a/ruoyi-common/ruoyi-common-bom/pom.xml b/ruoyi-common/ruoyi-common-bom/pom.xml
index 5388d8c..19ca420 100644
--- a/ruoyi-common/ruoyi-common-bom/pom.xml
+++ b/ruoyi-common/ruoyi-common-bom/pom.xml
@@ -172,6 +172,13 @@
                 <version>${revision}</version>
             </dependency>
 
+            <!-- SSE妯″潡 -->
+            <dependency>
+                <groupId>org.dromara</groupId>
+                <artifactId>ruoyi-common-sse</artifactId>
+                <version>${revision}</version>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 
diff --git a/ruoyi-common/ruoyi-common-sse/pom.xml b/ruoyi-common/ruoyi-common-sse/pom.xml
new file mode 100644
index 0000000..ae44c98
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.dromara</groupId>
+        <artifactId>ruoyi-common</artifactId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ruoyi-common-sse</artifactId>
+
+    <description>
+        ruoyi-common-sse 妯″潡
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-redis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-satoken</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-json</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
new file mode 100644
index 0000000..de5afa9
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
@@ -0,0 +1,28 @@
+package org.dromara.common.sse.config;
+
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.listener.SseTopicListener;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * @author Lion Li
+ */
+@AutoConfiguration
+@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
+@EnableConfigurationProperties(SseProperties.class)
+public class SseAutoConfiguration {
+
+    @Bean
+    public SseEmitterManager sseEmitterManager() {
+        return new SseEmitterManager();
+    }
+
+    @Bean
+    public SseTopicListener sseTopicListener() {
+        return new SseTopicListener();
+    }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
new file mode 100644
index 0000000..ce4e173
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
@@ -0,0 +1,21 @@
+package org.dromara.common.sse.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * SSE 閰嶇疆椤�
+ *
+ * @author Lion Li
+ */
+@Data
+@ConfigurationProperties("sse")
+public class SseProperties {
+
+    private Boolean enabled;
+
+    /**
+     * 璺緞
+     */
+    private String path;
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
new file mode 100644
index 0000000..57c7c1e
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
@@ -0,0 +1,52 @@
+package org.dromara.common.sse.controller;
+
+import cn.dev33.satoken.stp.StpUtil;
+import lombok.RequiredArgsConstructor;
+import org.dromara.common.core.domain.R;
+import org.dromara.common.satoken.utils.LoginHelper;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.dto.SseMessageDto;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.List;
+
+@RestController
+@RequiredArgsConstructor
+public class SseController {
+
+    private final SseEmitterManager sseEmitterManager;
+
+    @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    public SseEmitter connect() {
+        String tokenValue = StpUtil.getTokenValue();
+        Long userId = LoginHelper.getUserId();
+        return sseEmitterManager.connect(userId, tokenValue);
+    }
+
+    @GetMapping(value = "${sse.path}/close")
+    public R<Void> close() {
+        String tokenValue = StpUtil.getTokenValue();
+        Long userId = LoginHelper.getUserId();
+        sseEmitterManager.disconnect(userId, tokenValue);
+        return R.ok();
+    }
+
+    @GetMapping(value = "${sse.path}/send")
+    public R<Void> send(Long userId, String msg) {
+        SseMessageDto dto = new SseMessageDto();
+        dto.setUserIds(List.of(userId));
+        dto.setMessage(msg);
+        sseEmitterManager.publishMessage(dto);
+        return R.ok();
+    }
+
+    @GetMapping(value = "${sse.path}/sendAll")
+    public R<Void> send(String msg) {
+        sseEmitterManager.publishAll(msg);
+        return R.ok();
+    }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
new file mode 100644
index 0000000..4b56b69
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
@@ -0,0 +1,134 @@
+package org.dromara.common.sse.core;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.redis.utils.RedisUtils;
+import org.dromara.common.sse.dto.SseMessageDto;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+@Slf4j
+public class SseEmitterManager {
+    /**
+     * 璁㈤槄鐨勯閬�
+     */
+    private final static String SSE_TOPIC = "global:sse";
+
+    private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
+
+    public SseEmitter connect(Long userId, String token) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
+        SseEmitter emitter = new SseEmitter(0L);
+
+        emitters.put(token, emitter);
+
+        emitter.onCompletion(() -> emitters.remove(token));
+        emitter.onTimeout(() -> emitters.remove(token));
+
+        try {
+            emitter.send(SseEmitter.event().comment("connected"));
+        } catch (IOException e) {
+            emitters.remove(token);
+        }
+        return emitter;
+    }
+
+    public void disconnect(Long userId, String token) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (emitters != null) {
+            try {
+                emitters.get(token).send(SseEmitter.event().comment("disconnected"));
+            } catch (IOException ignore) {
+            }
+            emitters.remove(token);
+        }
+    }
+
+    /**
+     * 璁㈤槄SSE娑堟伅涓婚锛屽苟鎻愪緵涓�涓秷璐硅�呭嚱鏁版潵澶勭悊鎺ユ敹鍒扮殑娑堟伅
+     *
+     * @param consumer 澶勭悊SSE娑堟伅鐨勬秷璐硅�呭嚱鏁�
+     */
+    public void subscribeMessage(Consumer<SseMessageDto> consumer) {
+        RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
+    }
+
+    /**
+     * 鍚戞寚瀹氱殑鐢ㄦ埛浼氳瘽鍙戦�佹秷鎭�
+     *
+     * @param userId  瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public void sendMessage(Long userId, String message) {
+        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
+        if (emitters != null) {
+            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
+                try {
+                    entry.getValue().send(SseEmitter.event()
+                        .name("message")
+                        .reconnectTime(10000L)
+                        .data(message));
+                } catch (Exception e) {
+                    emitters.remove(entry.getKey());
+                }
+            }
+        }
+    }
+
+    /**
+     * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭�
+     *
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public void sendMessage(String message) {
+        for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
+            sendMessage(userId, message);
+        }
+    }
+
+    /**
+     * 鍙戝竷SSE璁㈤槄娑堟伅
+     *
+     * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄
+     */
+    public void publishMessage(SseMessageDto sseMessageDto) {
+        List<Long> unsentUserIds = new ArrayList<>();
+        // 褰撳墠鏈嶅姟鍐呯敤鎴�,鐩存帴鍙戦�佹秷鎭�
+        for (Long userId : sseMessageDto.getUserIds()) {
+            if (USER_TOKEN_EMITTERS.containsKey(userId)) {
+                sendMessage(userId, sseMessageDto.getMessage());
+                continue;
+            }
+            unsentUserIds.add(userId);
+        }
+        // 涓嶅湪褰撳墠鏈嶅姟鍐呯敤鎴�,鍙戝竷璁㈤槄娑堟伅
+        if (CollUtil.isNotEmpty(unsentUserIds)) {
+            SseMessageDto broadcastMessage = new SseMessageDto();
+            broadcastMessage.setMessage(sseMessageDto.getMessage());
+            broadcastMessage.setUserIds(unsentUserIds);
+            RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+                log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} session keys:{} message:{}",
+                    SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage());
+            });
+        }
+    }
+
+    /**
+     * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+     *
+     * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
+     */
+    public void publishAll(String message) {
+        SseMessageDto broadcastMessage = new SseMessageDto();
+        broadcastMessage.setMessage(message);
+        RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
+            log.info("SSE鍙戦�佷富棰樿闃呮秷鎭痶opic:{} message:{}", SSE_TOPIC, message);
+        });
+    }
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
new file mode 100644
index 0000000..a2e1210
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
@@ -0,0 +1,29 @@
+package org.dromara.common.sse.dto;
+
+import lombok.Data;
+
+import java.io.Serial;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * 娑堟伅鐨刣to
+ *
+ * @author zendwang
+ */
+@Data
+public class SseMessageDto implements Serializable {
+
+    @Serial
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 闇�瑕佹帹閫佸埌鐨剆ession key 鍒楄〃
+     */
+    private List<Long> userIds;
+
+    /**
+     * 闇�瑕佸彂閫佺殑娑堟伅
+     */
+    private String message;
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
new file mode 100644
index 0000000..7a4dff1
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
@@ -0,0 +1,48 @@
+package org.dromara.common.sse.listener;
+
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+
+/**
+ * SSE 涓婚璁㈤槄鐩戝惉鍣�
+ *
+ * @author Lion Li
+ */
+@Slf4j
+public class SseTopicListener implements ApplicationRunner, Ordered {
+
+    @Autowired
+    private SseEmitterManager sseEmitterManager;
+
+    /**
+     * 鍦⊿pring Boot搴旂敤绋嬪簭鍚姩鏃跺垵濮嬪寲SSE涓婚璁㈤槄鐩戝惉鍣�
+     *
+     * @param args 搴旂敤绋嬪簭鍙傛暟
+     * @throws Exception 鍒濆鍖栬繃绋嬩腑鍙兘鎶涘嚭鐨勫紓甯�
+     */
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        sseEmitterManager.subscribeMessage((message) -> {
+            log.info("SSE涓婚璁㈤槄鏀跺埌娑堟伅session keys={} message={}", message.getUserIds(), message.getMessage());
+            // 濡傛灉key涓嶄负绌哄氨鎸夌収key鍙戞秷鎭� 濡傛灉涓虹┖灏辩兢鍙�
+            if (CollUtil.isNotEmpty(message.getUserIds())) {
+                message.getUserIds().forEach(key -> {
+                    sseEmitterManager.sendMessage(key, message.getMessage());
+                });
+            } else {
+                sseEmitterManager.sendMessage(message.getMessage());
+            }
+        });
+        log.info("鍒濆鍖朣SE涓婚璁㈤槄鐩戝惉鍣ㄦ垚鍔�");
+    }
+
+    @Override
+    public int getOrder() {
+        return -1;
+    }
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
new file mode 100644
index 0000000..4334e98
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
@@ -0,0 +1,58 @@
+package org.dromara.common.sse.utils;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.dromara.common.core.utils.SpringUtils;
+import org.dromara.common.sse.core.SseEmitterManager;
+import org.dromara.common.sse.dto.SseMessageDto;
+
+/**
+ * 宸ュ叿绫�
+ *
+ * @author Lion Li
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class SseMessageUtils {
+
+    private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class);
+
+    /**
+     * 鍚戞寚瀹氱殑WebSocket浼氳瘽鍙戦�佹秷鎭�
+     *
+     * @param userId  瑕佸彂閫佹秷鎭殑鐢ㄦ埛id
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public static void sendMessage(Long userId, String message) {
+        MANAGER.sendMessage(userId, message);
+    }
+
+    /**
+     * 鏈満鍏ㄧ敤鎴蜂細璇濆彂閫佹秷鎭�
+     *
+     * @param message 瑕佸彂閫佺殑娑堟伅鍐呭
+     */
+    public static void sendMessage(String message) {
+        MANAGER.sendMessage(message);
+    }
+
+    /**
+     * 鍙戝竷SSE璁㈤槄娑堟伅
+     *
+     * @param sseMessageDto 瑕佸彂甯冪殑SSE娑堟伅瀵硅薄
+     */
+    public static void publishMessage(SseMessageDto sseMessageDto) {
+        MANAGER.publishMessage(sseMessageDto);
+    }
+
+    /**
+     * 鍚戞墍鏈夌殑鐢ㄦ埛鍙戝竷璁㈤槄鐨勬秷鎭�(缇ゅ彂)
+     *
+     * @param message 瑕佸彂甯冪殑娑堟伅鍐呭
+     */
+    public static void publishAll(String message) {
+        MANAGER.publishAll(message);
+    }
+
+}
diff --git a/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000..b809713
--- /dev/null
+++ b/ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+org.dromara.common.sse.config.SseAutoConfiguration
diff --git a/ruoyi-modules/ruoyi-system/pom.xml b/ruoyi-modules/ruoyi-system/pom.xml
index acf33ce..0fc6d55 100644
--- a/ruoyi-modules/ruoyi-system/pom.xml
+++ b/ruoyi-modules/ruoyi-system/pom.xml
@@ -95,6 +95,11 @@
             <artifactId>ruoyi-common-websocket</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.dromara</groupId>
+            <artifactId>ruoyi-common-sse</artifactId>
+        </dependency>
+
     </dependencies>
 
 </project>

--
Gitblit v1.9.3