ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/config/WebSocketConfig.java
@@ -27,17 +27,20 @@ @Bean public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { // 如果WebSocket的路径为空,则设置默认路径为 "/websocket" if (StrUtil.isBlank(webSocketProperties.getPath())) { webSocketProperties.setPath("/websocket"); } // 如果允许跨域访问的地址为空,则设置为 "*",表示允许所有来源的跨域请求 if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { webSocketProperties.setAllowedOrigins("*"); } // 返回一个WebSocketConfigurer对象,用于配置WebSocket return registry -> registry // 添加WebSocket处理程序和拦截器到指定路径,设置允许的跨域来源 .addHandler(webSocketHandler, webSocketProperties.getPath()) .addInterceptors(handshakeInterceptor) .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/constant/WebSocketConstants.java
@@ -6,6 +6,7 @@ * @author zendwang */ public interface WebSocketConstants { /** * websocketSession中的参数的key */ ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/handler/PlusWebSocketHandler.java
@@ -31,33 +31,42 @@ } /** * 处理发送来的文本消息 * 处理接收到的文本消息 * * @param session * @param message * @throws Exception * @param session WebSocket会话 * @param message 接收到的文本消息 * @throws Exception 处理消息过程中可能抛出的异常 */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { // 从WebSocket会话中获取登录用户信息 LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); List<Long> userIds = List.of(loginUser.getUserId()); // 创建WebSocket消息DTO对象 WebSocketMessageDto webSocketMessageDto = new WebSocketMessageDto(); webSocketMessageDto.setSessionKeys(userIds); webSocketMessageDto.setSessionKeys(List.of(loginUser.getUserId())); webSocketMessageDto.setMessage(message.getPayload()); WebSocketUtils.publishMessage(webSocketMessageDto); } /** * 处理接收到的二进制消息 * * @param session WebSocket会话 * @param message 接收到的二进制消息 * @throws Exception 处理消息过程中可能抛出的异常 */ @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); } /** * 心跳监测的回复 * 处理接收到的Pong消息(心跳监测) * * @param session * @param message * @throws Exception * @param session WebSocket会话 * @param message 接收到的Pong消息 * @throws Exception 处理消息过程中可能抛出的异常 */ @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { @@ -65,11 +74,11 @@ } /** * 连接出错时 * 处理WebSocket传输错误 * * @param session * @param exception * @throws Exception * @param session WebSocket会话 * @param exception 发生的异常 * @throws Exception 处理过程中可能抛出的异常 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { @@ -77,10 +86,10 @@ } /** * 连接关闭后 * 在WebSocket连接关闭后执行清理操作 * * @param session * @param status * @param session WebSocket会话 * @param status 关闭状态信息 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { @@ -90,9 +99,9 @@ } /** * 是否支持分片消息 * 指示处理程序是否支持接收部分消息 * * @return * @return 如果支持接收部分消息,则返回true;否则返回false */ @Override public boolean supportsPartialMessages() { ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java
@@ -18,24 +18,52 @@ private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>(); /** * 将WebSocket会话添加到用户会话Map中 * * @param sessionKey 会话键,用于检索会话 * @param session 要添加的WebSocket会话 */ public static void addSession(Long sessionKey, WebSocketSession session) { USER_SESSION_MAP.put(sessionKey, session); } /** * 从用户会话Map中移除指定会话键对应的WebSocket会话 * * @param sessionKey 要移除的会话键 */ public static void removeSession(Long sessionKey) { if (USER_SESSION_MAP.containsKey(sessionKey)) { USER_SESSION_MAP.remove(sessionKey); } } /** * 根据会话键从用户会话Map中获取WebSocket会话 * * @param sessionKey 要获取的会话键 * @return 与给定会话键对应的WebSocket会话,如果不存在则返回null */ public static WebSocketSession getSessions(Long sessionKey) { return USER_SESSION_MAP.get(sessionKey); } /** * 获取存储在用户会话Map中所有WebSocket会话的会话键集合 * * @return 所有WebSocket会话的会话键集合 */ public static Set<Long> getSessionsAll() { return USER_SESSION_MAP.keySet(); } /** * 检查给定的会话键是否存在于用户会话Map中 * * @param sessionKey 要检查的会话键 * @return 如果存在对应的会话键,则返回true;否则返回false */ public static Boolean existSession(Long sessionKey) { return USER_SESSION_MAP.containsKey(sessionKey); } ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/interceptor/PlusWebSocketInterceptor.java
@@ -1,8 +1,8 @@ package org.dromara.common.websocket.interceptor; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.model.LoginUser; import org.dromara.common.satoken.utils.LoginHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; @@ -21,13 +21,13 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor { /** * 握手前 * WebSocket握手之前执行的前置处理方法 * * @param request request * @param response response * @param wsHandler wsHandler * @param attributes attributes * @return 是否握手成功 * @param request WebSocket握手请求 * @param response WebSocket握手响应 * @param wsHandler WebSocket处理程序 * @param attributes 与WebSocket会话关联的属性 * @return 如果允许握手继续进行,则返回true;否则返回false */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) { @@ -37,15 +37,16 @@ } /** * 握手后 * WebSocket握手成功后执行的后置处理方法 * * @param request request * @param response response * @param wsHandler wsHandler * @param exception 异常 * @param request WebSocket握手请求 * @param response WebSocket握手响应 * @param wsHandler WebSocket处理程序 * @param exception 握手过程中可能出现的异常 */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { // 在这个方法中可以执行一些握手成功后的后续处理逻辑,比如记录日志或者其他操作 } } ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java
@@ -1,9 +1,9 @@ package org.dromara.common.websocket.listener; import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.websocket.holder.WebSocketSessionHolder; import org.dromara.common.websocket.utils.WebSocketUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; @@ -16,8 +16,15 @@ @Slf4j public class WebSocketTopicListener implements ApplicationRunner, Ordered { /** * 在Spring Boot应用程序启动时初始化WebSocket主题订阅监听器 * * @param args 应用程序参数 * @throws Exception 初始化过程中可能抛出的异常 */ @Override public void run(ApplicationArguments args) throws Exception { // 订阅WebSocket消息 WebSocketUtils.subscribeMessage((message) -> { log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); // 如果key不为空就按照key发消息 如果为空就群发 ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
@@ -29,10 +29,10 @@ public class WebSocketUtils { /** * 发送消息 * 向指定的WebSocket会话发送消息 * * @param sessionKey session主键 一般为用户id * @param message 消息文本 * @param sessionKey 要发送消息的用户id * @param message 要发送的消息内容 */ public static void sendMessage(Long sessionKey, String message) { WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); @@ -40,18 +40,18 @@ } /** * 订阅消息 * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息 * * @param consumer 自定义处理 * @param consumer 处理WebSocket消息的消费者函数 */ public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) { RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); } /** * 发布订阅的消息 * 发布WebSocket订阅消息 * * @param webSocketMessage 消息对象 * @param webSocketMessage 要发布的WebSocket消息对象 */ public static void publishMessage(WebSocketMessageDto webSocketMessage) { List<Long> unsentSessionKeys = new ArrayList<>(); @@ -76,9 +76,9 @@ } /** * 发布订阅的消息(群发) * 向所有的WebSocket会话发布订阅的消息(群发) * * @param message 消息内容 * @param message 要发布的消息内容 */ public static void publishAll(String message) { WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); @@ -88,14 +88,31 @@ }); } /** * 向指定的WebSocket会话发送Pong消息 * * @param session 要发送Pong消息的WebSocket会话 */ public static void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); } /** * 向指定的WebSocket会话发送文本消息 * * @param session WebSocket会话 * @param message 要发送的文本消息内容 */ public static void sendMessage(WebSocketSession session, String message) { sendMessage(session, new TextMessage(message)); } /** * 向指定的WebSocket会话发送WebSocket消息对象 * * @param session WebSocket会话 * @param message 要发送的WebSocket消息对象 */ private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) { if (session == null || !session.isOpen()) { log.warn("[send] session会话已经关闭");