疯狂的狮子Li
2024-07-26 ee3525cfb26be97b126e07c1322c41fcc8b5014c
add 增加 ruoyi-common-sse 模块 支持SSE推送 比ws更轻量更稳定的推送
已修改3个文件
已添加9个文件
420 ■■■■■ 文件已修改
ruoyi-common/pom.xml 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-bom/pom.xml 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/pom.xml 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-modules/ruoyi-system/pom.xml 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/pom.xml
@@ -33,6 +33,7 @@
        <module>ruoyi-common-encrypt</module>
        <module>ruoyi-common-tenant</module>
        <module>ruoyi-common-websocket</module>
        <module>ruoyi-common-sse</module>
    </modules>
    <artifactId>ruoyi-common</artifactId>
ruoyi-common/ruoyi-common-bom/pom.xml
@@ -172,6 +172,13 @@
                <version>${revision}</version>
            </dependency>
            <!-- SSE模块 -->
            <dependency>
                <groupId>org.dromara</groupId>
                <artifactId>ruoyi-common-sse</artifactId>
                <version>${revision}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>
ruoyi-common/ruoyi-common-sse/pom.xml
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.dromara</groupId>
        <artifactId>ruoyi-common</artifactId>
        <version>${revision}</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>ruoyi-common-sse</artifactId>
    <description>
        ruoyi-common-sse æ¨¡å—
    </description>
    <dependencies>
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>ruoyi-common-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>ruoyi-common-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>ruoyi-common-satoken</artifactId>
        </dependency>
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>ruoyi-common-json</artifactId>
        </dependency>
    </dependencies>
</project>
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseAutoConfiguration.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,28 @@
package org.dromara.common.sse.config;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.listener.SseTopicListener;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
/**
 * @author Lion Li
 */
@AutoConfiguration
@ConditionalOnProperty(value = "sse.enabled", havingValue = "true")
@EnableConfigurationProperties(SseProperties.class)
public class SseAutoConfiguration {
    @Bean
    public SseEmitterManager sseEmitterManager() {
        return new SseEmitterManager();
    }
    @Bean
    public SseTopicListener sseTopicListener() {
        return new SseTopicListener();
    }
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/config/SseProperties.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,21 @@
package org.dromara.common.sse.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
 * SSE é…ç½®é¡¹
 *
 * @author Lion Li
 */
@Data
@ConfigurationProperties("sse")
public class SseProperties {
    private Boolean enabled;
    /**
     * è·¯å¾„
     */
    private String path;
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/controller/SseController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,52 @@
package org.dromara.common.sse.controller;
import cn.dev33.satoken.stp.StpUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.R;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
@RestController
@RequiredArgsConstructor
public class SseController {
    private final SseEmitterManager sseEmitterManager;
    @GetMapping(value = "${sse.path}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter connect() {
        String tokenValue = StpUtil.getTokenValue();
        Long userId = LoginHelper.getUserId();
        return sseEmitterManager.connect(userId, tokenValue);
    }
    @GetMapping(value = "${sse.path}/close")
    public R<Void> close() {
        String tokenValue = StpUtil.getTokenValue();
        Long userId = LoginHelper.getUserId();
        sseEmitterManager.disconnect(userId, tokenValue);
        return R.ok();
    }
    @GetMapping(value = "${sse.path}/send")
    public R<Void> send(Long userId, String msg) {
        SseMessageDto dto = new SseMessageDto();
        dto.setUserIds(List.of(userId));
        dto.setMessage(msg);
        sseEmitterManager.publishMessage(dto);
        return R.ok();
    }
    @GetMapping(value = "${sse.path}/sendAll")
    public R<Void> send(String msg) {
        sseEmitterManager.publishAll(msg);
        return R.ok();
    }
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,134 @@
package org.dromara.common.sse.core;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@Slf4j
public class SseEmitterManager {
    /**
     * è®¢é˜…的频道
     */
    private final static String SSE_TOPIC = "global:sse";
    private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
    public SseEmitter connect(Long userId, String token) {
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
        SseEmitter emitter = new SseEmitter(0L);
        emitters.put(token, emitter);
        emitter.onCompletion(() -> emitters.remove(token));
        emitter.onTimeout(() -> emitters.remove(token));
        try {
            emitter.send(SseEmitter.event().comment("connected"));
        } catch (IOException e) {
            emitters.remove(token);
        }
        return emitter;
    }
    public void disconnect(Long userId, String token) {
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
        if (emitters != null) {
            try {
                emitters.get(token).send(SseEmitter.event().comment("disconnected"));
            } catch (IOException ignore) {
            }
            emitters.remove(token);
        }
    }
    /**
     * è®¢é˜…SSE消息主题,并提供一个消费者函数来处理接收到的消息
     *
     * @param consumer å¤„理SSE消息的消费者函数
     */
    public void subscribeMessage(Consumer<SseMessageDto> consumer) {
        RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer);
    }
    /**
     * å‘指定的用户会话发送消息
     *
     * @param userId  è¦å‘送消息的用户id
     * @param message è¦å‘送的消息内容
     */
    public void sendMessage(Long userId, String message) {
        Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId);
        if (emitters != null) {
            for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) {
                try {
                    entry.getValue().send(SseEmitter.event()
                        .name("message")
                        .reconnectTime(10000L)
                        .data(message));
                } catch (Exception e) {
                    emitters.remove(entry.getKey());
                }
            }
        }
    }
    /**
     * æœ¬æœºå…¨ç”¨æˆ·ä¼šè¯å‘送消息
     *
     * @param message è¦å‘送的消息内容
     */
    public void sendMessage(String message) {
        for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
            sendMessage(userId, message);
        }
    }
    /**
     * å‘布SSE订阅消息
     *
     * @param sseMessageDto è¦å‘布的SSE消息对象
     */
    public void publishMessage(SseMessageDto sseMessageDto) {
        List<Long> unsentUserIds = new ArrayList<>();
        // å½“前服务内用户,直接发送消息
        for (Long userId : sseMessageDto.getUserIds()) {
            if (USER_TOKEN_EMITTERS.containsKey(userId)) {
                sendMessage(userId, sseMessageDto.getMessage());
                continue;
            }
            unsentUserIds.add(userId);
        }
        // ä¸åœ¨å½“前服务内用户,发布订阅消息
        if (CollUtil.isNotEmpty(unsentUserIds)) {
            SseMessageDto broadcastMessage = new SseMessageDto();
            broadcastMessage.setMessage(sseMessageDto.getMessage());
            broadcastMessage.setUserIds(unsentUserIds);
            RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
                log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}",
                    SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage());
            });
        }
    }
    /**
     * å‘所有的用户发布订阅的消息(群发)
     *
     * @param message è¦å‘布的消息内容
     */
    public void publishAll(String message) {
        SseMessageDto broadcastMessage = new SseMessageDto();
        broadcastMessage.setMessage(message);
        RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> {
            log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message);
        });
    }
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/dto/SseMessageDto.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,29 @@
package org.dromara.common.sse.dto;
import lombok.Data;
import java.io.Serial;
import java.io.Serializable;
import java.util.List;
/**
 * æ¶ˆæ¯çš„dto
 *
 * @author zendwang
 */
@Data
public class SseMessageDto implements Serializable {
    @Serial
    private static final long serialVersionUID = 1L;
    /**
     * éœ€è¦æŽ¨é€åˆ°çš„session key åˆ—表
     */
    private List<Long> userIds;
    /**
     * éœ€è¦å‘送的消息
     */
    private String message;
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/listener/SseTopicListener.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,48 @@
package org.dromara.common.sse.listener;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.sse.core.SseEmitterManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
/**
 * SSE ä¸»é¢˜è®¢é˜…监听器
 *
 * @author Lion Li
 */
@Slf4j
public class SseTopicListener implements ApplicationRunner, Ordered {
    @Autowired
    private SseEmitterManager sseEmitterManager;
    /**
     * åœ¨Spring Boot应用程序启动时初始化SSE主题订阅监听器
     *
     * @param args åº”用程序参数
     * @throws Exception åˆå§‹åŒ–过程中可能抛出的异常
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        sseEmitterManager.subscribeMessage((message) -> {
            log.info("SSE主题订阅收到消息session keys={} message={}", message.getUserIds(), message.getMessage());
            // å¦‚æžœkey不为空就按照key发消息 å¦‚果为空就群发
            if (CollUtil.isNotEmpty(message.getUserIds())) {
                message.getUserIds().forEach(key -> {
                    sseEmitterManager.sendMessage(key, message.getMessage());
                });
            } else {
                sseEmitterManager.sendMessage(message.getMessage());
            }
        });
        log.info("初始化SSE主题订阅监听器成功");
    }
    @Override
    public int getOrder() {
        return -1;
    }
}
ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/utils/SseMessageUtils.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,58 @@
package org.dromara.common.sse.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.sse.core.SseEmitterManager;
import org.dromara.common.sse.dto.SseMessageDto;
/**
 * å·¥å…·ç±»
 *
 * @author Lion Li
 */
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class SseMessageUtils {
    private final static SseEmitterManager MANAGER = SpringUtils.getBean(SseEmitterManager.class);
    /**
     * å‘指定的WebSocket会话发送消息
     *
     * @param userId  è¦å‘送消息的用户id
     * @param message è¦å‘送的消息内容
     */
    public static void sendMessage(Long userId, String message) {
        MANAGER.sendMessage(userId, message);
    }
    /**
     * æœ¬æœºå…¨ç”¨æˆ·ä¼šè¯å‘送消息
     *
     * @param message è¦å‘送的消息内容
     */
    public static void sendMessage(String message) {
        MANAGER.sendMessage(message);
    }
    /**
     * å‘布SSE订阅消息
     *
     * @param sseMessageDto è¦å‘布的SSE消息对象
     */
    public static void publishMessage(SseMessageDto sseMessageDto) {
        MANAGER.publishMessage(sseMessageDto);
    }
    /**
     * å‘所有的用户发布订阅的消息(群发)
     *
     * @param message è¦å‘布的消息内容
     */
    public static void publishAll(String message) {
        MANAGER.publishAll(message);
    }
}
ruoyi-common/ruoyi-common-sse/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1 @@
org.dromara.common.sse.config.SseAutoConfiguration
ruoyi-modules/ruoyi-system/pom.xml
@@ -95,6 +95,11 @@
            <artifactId>ruoyi-common-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>ruoyi-common-sse</artifactId>
        </dependency>
    </dependencies>
</project>