疯狂的狮子Li
2024-08-22 457e59e61c9d7dbf4ef299c34aadd2c00fc0f1c9
ruoyi-common/ruoyi-common-websocket/src/main/java/org/dromara/common/websocket/utils/WebSocketUtils.java
@@ -4,7 +4,6 @@
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.domain.model.LoginUser;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.websocket.dto.WebSocketMessageDto;
import org.dromara.common.websocket.holder.WebSocketSessionHolder;
@@ -18,7 +17,6 @@
import java.util.List;
import java.util.function.Consumer;
import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY;
import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC;
/**
@@ -31,10 +29,10 @@
public class WebSocketUtils {
    /**
     * 发送消息
     * 向指定的WebSocket会话发送消息
     *
     * @param sessionKey session主键 一般为用户id
     * @param message    消息文本
     * @param sessionKey 要发送消息的用户id
     * @param message    要发送的消息内容
     */
    public static void sendMessage(Long sessionKey, String message) {
        WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
@@ -42,18 +40,18 @@
    }
    /**
     * 订阅消息
     * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息
     *
     * @param consumer 自定义处理
     * @param consumer 处理WebSocket消息的消费者函数
     */
    public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) {
        RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer);
    }
    /**
     * 发布订阅的消息
     * 发布WebSocket订阅消息
     *
     * @param webSocketMessage 消息对象
     * @param webSocketMessage 要发布的WebSocket消息对象
     */
    public static void publishMessage(WebSocketMessageDto webSocketMessage) {
        List<Long> unsentSessionKeys = new ArrayList<>();
@@ -78,38 +76,49 @@
    }
    /**
     * 发布订阅的消息(群发)
     * 向所有的WebSocket会话发布订阅的消息(群发)
     *
     * @param message 消息内容
     * @param message 要发布的消息内容
     */
    public static void publishAll(String message) {
        WebSocketSessionHolder.getSessionsAll().forEach(key -> {
            WebSocketUtils.sendMessage(key, message);
        });
        WebSocketMessageDto broadcastMessage = new WebSocketMessageDto();
        broadcastMessage.setMessage(message);
        RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> {
            log.info(" WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
            log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message);
        });
    }
    /**
     * 向指定的WebSocket会话发送Pong消息
     *
     * @param session 要发送Pong消息的WebSocket会话
     */
    public static void sendPongMessage(WebSocketSession session) {
        sendMessage(session, new PongMessage());
    }
    /**
     * 向指定的WebSocket会话发送文本消息
     *
     * @param session WebSocket会话
     * @param message 要发送的文本消息内容
     */
    public static void sendMessage(WebSocketSession session, String message) {
        sendMessage(session, new TextMessage(message));
    }
    private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
    /**
     * 向指定的WebSocket会话发送WebSocket消息对象
     *
     * @param session WebSocket会话
     * @param message 要发送的WebSocket消息对象
     */
    private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
        if (session == null || !session.isOpen()) {
            log.error("[send] session会话已经关闭");
            log.warn("[send] session会话已经关闭");
        } else {
            try {
                // 获取当前会话中的用户
                LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY);
                session.sendMessage(message);
                log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message);
            } catch (IOException e) {
                log.error("[send] session({}) 发送消息({}) 异常", session, message, e);
            }