ruoyi-common/pom.xml
@@ -33,6 +33,7 @@ <module>ruoyi-common-encrypt</module> <module>ruoyi-common-tenant</module> <module>ruoyi-common-websocket</module> <module>ruoyi-common-sse</module> </modules> <artifactId>ruoyi-common</artifactId> ruoyi-common/ruoyi-common-bom/pom.xml
@@ -172,6 +172,13 @@ <version>${revision}</version> </dependency> <!-- SSE模å --> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-sse</artifactId> <version>${revision}</version> </dependency> </dependencies> </dependencyManagement> ruoyi-common/ruoyi-common-sse/pom.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,36 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.dromara</groupId> <artifactId>ruoyi-common</artifactId> <version>${revision}</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ruoyi-common-sse</artifactId> <description> ruoyi-common-sse 模å </description> <dependencies> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-core</artifactId> </dependency> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-redis</artifactId> </dependency> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-satoken</artifactId> </dependency> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-json</artifactId> </dependency> </dependencies> </project> ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,28 @@ package org.dromara.common.sse.config; import org.dromara.common.sse.core.SseEmitterManager; import org.dromara.common.sse.listener.SseTopicListener; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; /** * @author Lion Li */ @AutoConfiguration @ConditionalOnProperty(value = "sse.enabled", havingValue = "true") @EnableConfigurationProperties(SseProperties.class) public class SseAutoConfiguration { @Bean public SseEmitterManager sseEmitterManager() { return new SseEmitterManager(); } @Bean public SseTopicListener sseTopicListener() { return new SseTopicListener(); } } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,21 @@ package org.dromara.common.sse.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; /** * SSE é 置项 * * @author Lion Li */ @Data @ConfigurationProperties("sse") public class SseProperties { private Boolean enabled; /** * è·¯å¾ */ private String path; } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,52 @@ package org.dromara.common.sse.controller; import cn.dev33.satoken.stp.StpUtil; import lombok.RequiredArgsConstructor; import org.dromara.common.core.domain.R; import org.dromara.common.satoken.utils.LoginHelper; import org.dromara.common.sse.core.SseEmitterManager; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.List; @RestController @RequiredArgsConstructor public class SseController { private final SseEmitterManager sseEmitterManager; @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect() { String tokenValue = StpUtil.getTokenValue(); Long userId = LoginHelper.getUserId(); return sseEmitterManager.connect(userId, tokenValue); } @GetMapping(value = "${sse.path}/close") public R<Void> close() { String tokenValue = StpUtil.getTokenValue(); Long userId = LoginHelper.getUserId(); sseEmitterManager.disconnect(userId, tokenValue); return R.ok(); } @GetMapping(value = "${sse.path}/send") public R<Void> send(Long userId, String msg) { SseMessageDto dto = new SseMessageDto(); dto.setUserIds(List.of(userId)); dto.setMessage(msg); sseEmitterManager.publishMessage(dto); return R.ok(); } @GetMapping(value = "${sse.path}/sendAll") public R<Void> send(String msg) { sseEmitterManager.publishAll(msg); return R.ok(); } } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,134 @@ package org.dromara.common.sse.core; import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; @Slf4j public class SseEmitterManager { /** * 订é çé¢é */ private final static String SSE_TOPIC = "global:sse"; private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); public SseEmitter connect(Long userId, String token) { Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); SseEmitter emitter = new SseEmitter(0L); emitters.put(token, emitter); emitter.onCompletion(() -> emitters.remove(token)); emitter.onTimeout(() -> emitters.remove(token)); try { emitter.send(SseEmitter.event().comment("connected")); } catch (IOException e) { emitters.remove(token); } return emitter; } public void disconnect(Long userId, String token) { Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); if (emitters != null) { try { emitters.get(token).send(SseEmitter.event().comment("disconnected")); } catch (IOException ignore) { } emitters.remove(token); } } /** * 订é SSEæ¶æ¯ä¸»é¢ï¼å¹¶æä¾ä¸ä¸ªæ¶è´¹è 彿°æ¥å¤çæ¥æ¶å°çæ¶æ¯ * * @param consumer å¤çSSEæ¶æ¯çæ¶è´¹è 彿° */ public void subscribeMessage(Consumer<SseMessageDto> consumer) { RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer); } /** * åæå®çç¨æ·ä¼è¯åéæ¶æ¯ * * @param userId è¦åéæ¶æ¯çç¨æ·id * @param message è¦åéçæ¶æ¯å 容 */ public void sendMessage(Long userId, String message) { Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); if (emitters != null) { for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { try { entry.getValue().send(SseEmitter.event() .name("message") .reconnectTime(10000L) .data(message)); } catch (Exception e) { emitters.remove(entry.getKey()); } } } } /** * æ¬æºå ¨ç¨æ·ä¼è¯åéæ¶æ¯ * * @param message è¦åéçæ¶æ¯å 容 */ public void sendMessage(String message) { for (Long userId : USER_TOKEN_EMITTERS.keySet()) { sendMessage(userId, message); } } /** * åå¸SSEè®¢é æ¶æ¯ * * @param sseMessageDto è¦åå¸çSSEæ¶æ¯å¯¹è±¡ */ public void publishMessage(SseMessageDto sseMessageDto) { List<Long> unsentUserIds = new ArrayList<>(); // å½åæå¡å ç¨æ·,ç´æ¥åéæ¶æ¯ for (Long userId : sseMessageDto.getUserIds()) { if (USER_TOKEN_EMITTERS.containsKey(userId)) { sendMessage(userId, sseMessageDto.getMessage()); continue; } unsentUserIds.add(userId); } // ä¸å¨å½åæå¡å ç¨æ·,åå¸è®¢é æ¶æ¯ if (CollUtil.isNotEmpty(unsentUserIds)) { SseMessageDto broadcastMessage = new SseMessageDto(); broadcastMessage.setMessage(sseMessageDto.getMessage()); broadcastMessage.setUserIds(unsentUserIds); RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { log.info("SSEåé主é¢è®¢é æ¶æ¯topic:{} session keys:{} message:{}", SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); }); } } /** * åææçç¨æ·åå¸è®¢é çæ¶æ¯(群å) * * @param message è¦åå¸çæ¶æ¯å 容 */ public void publishAll(String message) { SseMessageDto broadcastMessage = new SseMessageDto(); broadcastMessage.setMessage(message); RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { log.info("SSEåé主é¢è®¢é æ¶æ¯topic:{} message:{}", SSE_TOPIC, message); }); } } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,29 @@ package org.dromara.common.sse.dto; import lombok.Data; import java.io.Serial; import java.io.Serializable; import java.util.List; /** * æ¶æ¯çdto * * @author zendwang */ @Data public class SseMessageDto implements Serializable { @Serial private static final long serialVersionUID = 1L; /** * éè¦æ¨éå°çsession key å表 */ private List<Long> userIds; /** * éè¦åéçæ¶æ¯ */ private String message; } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,48 @@ package org.dromara.common.sse.listener; import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.sse.core.SseEmitterManager; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; /** * SSE 主é¢è®¢é çå¬å¨ * * @author Lion Li */ @Slf4j public class SseTopicListener implements ApplicationRunner, Ordered { @Autowired private SseEmitterManager sseEmitterManager; /** * å¨Spring Bootåºç¨ç¨åºå¯å¨æ¶åå§åSSE主é¢è®¢é çå¬å¨ * * @param args åºç¨ç¨åºåæ° * @throws Exception åå§åè¿ç¨ä¸å¯è½æåºçå¼å¸¸ */ @Override public void run(ApplicationArguments args) throws Exception { sseEmitterManager.subscribeMessage((message) -> { log.info("SSE主é¢è®¢é æ¶å°æ¶æ¯session keys={} message={}", message.getUserIds(), message.getMessage()); // 妿keyä¸ä¸ºç©ºå°±æç §keyåæ¶æ¯ å¦æä¸ºç©ºå°±ç¾¤å if (CollUtil.isNotEmpty(message.getUserIds())) { message.getUserIds().forEach(key -> { sseEmitterManager.sendMessage(key, message.getMessage()); }); } else { sseEmitterManager.sendMessage(message.getMessage()); } }); log.info("åå§åSSE主é¢è®¢é çå¬å¨æå"); } @Override public int getOrder() { return -1; } } ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,58 @@ package org.dromara.common.sse.utils; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.sse.core.SseEmitterManager; import org.dromara.common.sse.dto.SseMessageDto; /** * å·¥å ·ç±» * * @author Lion Li */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public class SseMessageUtils { private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class); /** * åæå®çWebSocketä¼è¯åéæ¶æ¯ * * @param userId è¦åéæ¶æ¯çç¨æ·id * @param message è¦åéçæ¶æ¯å 容 */ public static void sendMessage(Long userId, String message) { MANAGER.sendMessage(userId, message); } /** * æ¬æºå ¨ç¨æ·ä¼è¯åéæ¶æ¯ * * @param message è¦åéçæ¶æ¯å 容 */ public static void sendMessage(String message) { MANAGER.sendMessage(message); } /** * åå¸SSEè®¢é æ¶æ¯ * * @param sseMessageDto è¦åå¸çSSEæ¶æ¯å¯¹è±¡ */ public static void publishMessage(SseMessageDto sseMessageDto) { MANAGER.publishMessage(sseMessageDto); } /** * åææçç¨æ·åå¸è®¢é çæ¶æ¯(群å) * * @param message è¦åå¸çæ¶æ¯å 容 */ public static void publishAll(String message) { MANAGER.publishAll(message); } } ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
¶Ô±ÈÐÂÎļþ @@ -0,0 +1 @@ org.dromara.common.sse.config.SseAutoConfiguration ruoyi-modules/ruoyi-system/pom.xml
@@ -95,6 +95,11 @@ <artifactId>ruoyi-common-websocket</artifactId> </dependency> <dependency> <groupId>org.dromara</groupId> <artifactId>ruoyi-common-sse</artifactId> </dependency> </dependencies> </project>