ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/holder/WebSocketSessionHolder.java
@@ -5,6 +5,7 @@ import org.springframework.web.socket.WebSocketSession; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -31,6 +32,10 @@ return USER_SESSION_MAP.get(sessionKey); } public static Set<Long> getSessionsAll() { return USER_SESSION_MAP.keySet(); } public static Boolean existSession(Long sessionKey) { return USER_SESSION_MAP.containsKey(sessionKey); } ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/listener/WebSocketTopicListener.java
@@ -19,13 +19,18 @@ @Override public void run(ApplicationArguments args) throws Exception { WebSocketUtils.subscribeMessage((message) -> { log.info("WebSocket主题订阅收到消息session keys={} message={}!", message.getSessionKeys(), message.getMessage()); log.info("WebSocket主题订阅收到消息session keys={} message={}", message.getSessionKeys(), message.getMessage()); // 如果key不为空就按照key发消息 如果为空就群发 if (CollUtil.isNotEmpty(message.getSessionKeys())) { message.getSessionKeys().forEach(key -> { if (WebSocketSessionHolder.existSession(key)) { WebSocketUtils.sendMessage(key, message.getMessage()); } }); } else { WebSocketSessionHolder.getSessionsAll().forEach(key -> { WebSocketUtils.sendMessage(key, message.getMessage()); }); } }); log.info("初始化WebSocket主题订阅监听器成功"); ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
@@ -1,13 +1,13 @@ package org.dromara.common.websocket.utils; import cn.hutool.core.collection.CollUtil; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.domain.model.LoginUser; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.websocket.dto.WebSocketMessageDto; import org.dromara.common.websocket.holder.WebSocketSessionHolder; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; @@ -77,6 +77,22 @@ } } /** * 发布订阅的消息(群发) * * @param message 消息内容 */ public static void publishAll(String message) { WebSocketSessionHolder.getSessionsAll().forEach(key -> { WebSocketUtils.sendMessage(key, message); }); WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); broadcastMessage.setMessage(message); RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { log.info(" WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); }); } public static void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); }