| | |
| | | package org.dromara.common.websocket.utils; |
| | | |
| | | import cn.hutool.core.collection.CollUtil; |
| | | import org.dromara.common.core.domain.model.LoginUser; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.websocket.dto.WebSocketMessageDto; |
| | | import org.dromara.common.websocket.holder.WebSocketSessionHolder; |
| | | import lombok.AccessLevel; |
| | | import lombok.NoArgsConstructor; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.websocket.dto.WebSocketMessageDto; |
| | | import org.dromara.common.websocket.holder.WebSocketSessionHolder; |
| | | import org.springframework.web.socket.PongMessage; |
| | | import org.springframework.web.socket.TextMessage; |
| | | import org.springframework.web.socket.WebSocketMessage; |
| | |
| | | import java.util.List; |
| | | import java.util.function.Consumer; |
| | | |
| | | import static org.dromara.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; |
| | | import static org.dromara.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; |
| | | |
| | | /** |
| | |
| | | public class WebSocketUtils { |
| | | |
| | | /** |
| | | * 发送消息 |
| | | * 向指定的WebSocket会话发送消息 |
| | | * |
| | | * @param sessionKey session主键 一般为用户id |
| | | * @param message 消息文本 |
| | | * @param sessionKey 要发送消息的用户id |
| | | * @param message 要发送的消息内容 |
| | | */ |
| | | public static void sendMessage(Long sessionKey, String message) { |
| | | WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); |
| | |
| | | } |
| | | |
| | | /** |
| | | * 订阅消息 |
| | | * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息 |
| | | * |
| | | * @param consumer 自定义处理 |
| | | * @param consumer 处理WebSocket消息的消费者函数 |
| | | */ |
| | | public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) { |
| | | RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); |
| | | } |
| | | |
| | | /** |
| | | * 发布订阅的消息 |
| | | * 发布WebSocket订阅消息 |
| | | * |
| | | * @param webSocketMessage 消息对象 |
| | | * @param webSocketMessage 要发布的WebSocket消息对象 |
| | | */ |
| | | public static void publishMessage(WebSocketMessageDto webSocketMessage) { |
| | | List<Long> unsentSessionKeys = new ArrayList<>(); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向所有的WebSocket会话发布订阅的消息(群发) |
| | | * |
| | | * @param message 要发布的消息内容 |
| | | */ |
| | | public static void publishAll(String message) { |
| | | WebSocketMessageDto broadcastMessage = new WebSocketMessageDto(); |
| | | broadcastMessage.setMessage(message); |
| | | RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("WebSocket发送主题订阅消息topic:{} message:{}", WEB_SOCKET_TOPIC, message); |
| | | }); |
| | | } |
| | | |
| | | /** |
| | | * 向指定的WebSocket会话发送Pong消息 |
| | | * |
| | | * @param session 要发送Pong消息的WebSocket会话 |
| | | */ |
| | | public static void sendPongMessage(WebSocketSession session) { |
| | | sendMessage(session, new PongMessage()); |
| | | } |
| | | |
| | | /** |
| | | * 向指定的WebSocket会话发送文本消息 |
| | | * |
| | | * @param session WebSocket会话 |
| | | * @param message 要发送的文本消息内容 |
| | | */ |
| | | public static void sendMessage(WebSocketSession session, String message) { |
| | | sendMessage(session, new TextMessage(message)); |
| | | } |
| | | |
| | | private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) { |
| | | /** |
| | | * 向指定的WebSocket会话发送WebSocket消息对象 |
| | | * |
| | | * @param session WebSocket会话 |
| | | * @param message 要发送的WebSocket消息对象 |
| | | */ |
| | | private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) { |
| | | if (session == null || !session.isOpen()) { |
| | | log.error("[send] session会话已经关闭"); |
| | | log.warn("[send] session会话已经关闭"); |
| | | } else { |
| | | try { |
| | | // 获取当前会话中的用户 |
| | | LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); |
| | | session.sendMessage(message); |
| | | log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message); |
| | | } catch (IOException e) { |
| | | log.error("[send] session({}) 发送消息({}) 异常", session, message, e); |
| | | } |