ruoyi-admin/src/main/resources/application.yml
@@ -280,3 +280,11 @@ show-details: ALWAYS logfile: external-file: ./logs/sys-console.log --- # websocket websocket: enabled: true # è·¯å¾ path: /websocket # è®¾ç½®è®¿é®æºå°å allowedOrigins: '*' ruoyi-common/ruoyi-common-bom/pom.xml
@@ -159,6 +159,12 @@ <version>${revision}</version> </dependency> <!-- WebSocket模å --> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-websocket</artifactId> <version>${revision}</version> </dependency> </dependencies> </dependencyManagement> ruoyi-common/ruoyi-common-websocket/pom.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,41 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common</artifactId> <version>${revision}</version> <relativePath>../pom.xml</relativePath> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>ruoyi-common-websocket</artifactId> <description> ruoyi-common-websocket 模å </description> <dependencies> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-core</artifactId> </dependency> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-redis</artifactId> </dependency> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-satoken</artifactId> </dependency> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-json</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> </dependencies> </project> ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/WebSocketConfig.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,62 @@ package com.ruoyi.common.websocket.config; import cn.hutool.core.util.StrUtil; import com.ruoyi.common.websocket.config.properties.WebSocketProperties; import com.ruoyi.common.websocket.constant.WebSocketConstants; import com.ruoyi.common.websocket.handler.PlusWebSocketHandler; import com.ruoyi.common.websocket.interceptor.PlusWebSocketInterceptor; import com.ruoyi.common.websocket.listener.WebSocketTopicListener; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; /** * WebSocket é ç½® * * @author zendwang */ @AutoConfiguration @ConditionalOnProperty(value = "websocket.enabled", havingValue = "true") @EnableConfigurationProperties(WebSocketProperties.class) @EnableWebSocket public class WebSocketConfig { @Bean public WebSocketConfigurer webSocketConfigurer(HandshakeInterceptor handshakeInterceptor, WebSocketHandler webSocketHandler, WebSocketProperties webSocketProperties) { if (StrUtil.isBlank(webSocketProperties.getPath())) { webSocketProperties.setPath("/websocket"); } if (StrUtil.isBlank(webSocketProperties.getAllowedOrigins())) { webSocketProperties.setAllowedOrigins("*"); } return registry -> registry .addHandler(webSocketHandler, webSocketProperties.getPath()) .addInterceptors(handshakeInterceptor) .setAllowedOrigins(webSocketProperties.getAllowedOrigins()); } @Bean public HandshakeInterceptor handshakeInterceptor() { return new PlusWebSocketInterceptor(); } @Bean public WebSocketHandler webSocketHandler() { return new PlusWebSocketHandler(); } @Bean public WebSocketTopicListener topicListener() { return new WebSocketTopicListener(); } } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/config/properties/WebSocketProperties.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,27 @@ package com.ruoyi.common.websocket.config.properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; /** * WebSocket é 置项 * * @author zendwang */ @ConfigurationProperties("websocket") @Data public class WebSocketProperties { private Boolean enable; /** * è·¯å¾ */ private String path; /** * è®¾ç½®è®¿é®æºå°å */ private String allowedOrigins; } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/constant/WebSocketConstants.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,28 @@ package com.ruoyi.common.websocket.constant; /** * websocketç常éé ç½® * * @author zendwang */ public interface WebSocketConstants { /** * websocketSessionä¸çåæ°çkey */ String LOGIN_USER_KEY = "loginUser"; /** * 订é çé¢é */ String WEB_SOCKET_TOPIC = "global:websocket"; /** * å端å¿è·³æ£æ¥çå½ä»¤ */ String PING = "ping"; /** * æå¡ç«¯å¿è·³æ¢å¤çå符串 */ String PONG = "pong"; } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/dto/WebSocketMessageDto.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,29 @@ package com.ruoyi.common.websocket.dto; import lombok.Builder; import lombok.Data; import java.io.Serializable; import java.util.List; /** * æ¶æ¯çdto * * @author zendwang */ @Builder @Data public class WebSocketMessageDto implements Serializable { private static final long serialVersionUID = 1L; /** * éè¦æ¨éå°çsession key å表 */ private List<Long> sessionKeys; /** * éè¦åéçæ¶æ¯ */ private String message; } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/handler/PlusWebSocketHandler.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,104 @@ package com.ruoyi.common.websocket.handler; import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.websocket.dto.WebSocketMessageDto; import com.ruoyi.common.websocket.holder.WebSocketSessionHolder; import com.ruoyi.common.websocket.utils.WebSocketUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.*; import org.springframework.web.socket.handler.AbstractWebSocketHandler; import java.util.List; import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; /** * WebSocketHandler å®ç°ç±» * * @author zendwang */ @Slf4j public class PlusWebSocketHandler extends AbstractWebSocketHandler { /** * è¿æ¥æåå * * @param session */ @Override public void afterConnectionEstablished(WebSocketSession session) { LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); WebSocketSessionHolder.addSession(loginUser.getUserId(), session); log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); } /** * å¤çåéæ¥çææ¬æ¶æ¯ * * @param session * @param message * @throws Exception */ @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); log.info("PlusWebSocketHandler, è¿æ¥ï¼" + session.getId() + "ï¼å·²æ¶å°æ¶æ¯:" + message.getPayload()); List<Long> userIds = List.of(loginUser.getUserId()); WebSocketMessageDto webSocketMessageDto = WebSocketMessageDto.builder() .sessionKeys(userIds).message(message.getPayload()).build(); WebSocketUtils.publishMessage(webSocketMessageDto); } @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); } /** * å¿è·³çæµçåå¤ * * @param session * @param message * @throws Exception */ @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { WebSocketUtils.sendPongMessage(session); } /** * è¿æ¥åºéæ¶ * * @param session * @param exception * @throws Exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("[transport error] sessionId: {} , exception:{}", session.getId(), exception.getMessage()); } /** * è¿æ¥å ³éå * * @param session * @param status */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); WebSocketSessionHolder.removeSession(loginUser.getUserId()); log.info("[disconnect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType()); } /** * æ¯å¦æ¯æåçæ¶æ¯ * * @return */ @Override public boolean supportsPartialMessages() { return false; } } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/holder/WebSocketSessionHolder.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,37 @@ package com.ruoyi.common.websocket.holder; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.springframework.web.socket.WebSocketSession; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * WebSocketSession ç¨äºä¿åå½åææå¨çº¿çä¼è¯ä¿¡æ¯ * * @author zendwang */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class WebSocketSessionHolder { private static final Map<Long, WebSocketSession> USER_SESSION_MAP = new ConcurrentHashMap<>(); public static void addSession(Long sessionKey, WebSocketSession session) { USER_SESSION_MAP.put(sessionKey, session); } public static void removeSession(Long sessionKey) { if (USER_SESSION_MAP.containsKey(sessionKey)) { USER_SESSION_MAP.remove(sessionKey); } } public static WebSocketSession getSessions(Long sessionKey) { return USER_SESSION_MAP.get(sessionKey); } public static Boolean existSession(Long sessionKey) { return USER_SESSION_MAP.containsKey(sessionKey); } } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/interceptor/PlusWebSocketInterceptor.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,51 @@ package com.ruoyi.common.websocket.interceptor; import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.satoken.utils.LoginHelper; import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; /** * WebSocketæ¡æè¯·æ±çæ¦æªå¨ * * @author zendwang */ @Slf4j public class PlusWebSocketInterceptor implements HandshakeInterceptor { /** * æ¡æå * * @param request request * @param response response * @param wsHandler wsHandler * @param attributes attributes * @return æ¯å¦æ¡ææå */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) { LoginUser loginUser = LoginHelper.getLoginUser(); attributes.put(LOGIN_USER_KEY, loginUser); return true; } /** * æ¡æå * * @param request request * @param response response * @param wsHandler wsHandler * @param exception å¼å¸¸ */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/listener/WebSocketTopicListener.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,38 @@ package com.ruoyi.common.websocket.listener; import cn.hutool.core.collection.CollUtil; import com.ruoyi.common.websocket.holder.WebSocketSessionHolder; import com.ruoyi.common.websocket.utils.WebSocketUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; /** * WebSocket 主é¢è®¢é çå¬å¨ * * @author zendwang */ @Slf4j public class WebSocketTopicListener implements ApplicationRunner, Ordered { @Override public void run(ApplicationArguments args) throws Exception { WebSocketUtils.subscribeMessage((message) -> { log.info("WebSocket主é¢è®¢é æ¶å°æ¶æ¯session keys={} message={}ï¼", message.getSessionKeys(), message.getMessage()); if (CollUtil.isNotEmpty(message.getSessionKeys())) { message.getSessionKeys().forEach(key -> { if (WebSocketSessionHolder.existSession(key)) { WebSocketUtils.sendMessage(key, message.getMessage()); } }); } }); log.info("åå§åWebSocket主é¢è®¢é çå¬å¨æå"); } @Override public int getOrder() { return -1; } } ruoyi-common/ruoyi-common-websocket/src/main/java/com/ruoyi/common/websocket/utils/WebSocketUtils.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,102 @@ package com.ruoyi.common.websocket.utils; import cn.hutool.core.collection.CollUtil; import com.ruoyi.common.core.domain.model.LoginUser; import com.ruoyi.common.json.utils.JsonUtils; import com.ruoyi.common.redis.utils.RedisUtils; import com.ruoyi.common.satoken.utils.LoginHelper; import com.ruoyi.common.websocket.dto.WebSocketMessageDto; import com.ruoyi.common.websocket.holder.WebSocketSessionHolder; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.PongMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; import static com.ruoyi.common.websocket.constant.WebSocketConstants.LOGIN_USER_KEY; import static com.ruoyi.common.websocket.constant.WebSocketConstants.WEB_SOCKET_TOPIC; /** * å·¥å ·ç±» * * @author zendwang */ @Slf4j @NoArgsConstructor(access = AccessLevel.PRIVATE) public class WebSocketUtils { /** * åéæ¶æ¯ * @param sessionKey * @param message */ public static void sendMessage(Long sessionKey, String message) { WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey); sendMessage(session, message); } /** * è®¢é æ¶æ¯ * * @param consumer */ public static void subscribeMessage(Consumer<WebSocketMessageDto> consumer) { RedisUtils.subscribe(WEB_SOCKET_TOPIC, WebSocketMessageDto.class, consumer); } /** * åå¸è®¢é çæ¶æ¯ * * @param webSocketMessage */ public static void publishMessage(WebSocketMessageDto webSocketMessage) { List<Long> unsentSessionKeys = new ArrayList<>(); // å½åæå¡å session,ç´æ¥åéæ¶æ¯ for (Long sessionKey: webSocketMessage.getSessionKeys()) { if (WebSocketSessionHolder.existSession(sessionKey)) { WebSocketUtils.sendMessage(sessionKey, webSocketMessage.getMessage()); continue; } unsentSessionKeys.add(sessionKey); } // ä¸å¨å½åæå¡å session,åå¸è®¢é æ¶æ¯ if (CollUtil.isNotEmpty(unsentSessionKeys)) { WebSocketMessageDto broadcastMessage = WebSocketMessageDto.builder() .message(webSocketMessage.getMessage()).sessionKeys(unsentSessionKeys).build(); RedisUtils.publish(WEB_SOCKET_TOPIC, broadcastMessage, consumer -> { log.info(" WebSocketåé主é¢è®¢é æ¶æ¯topic:{} session keys:{} message:{}", WEB_SOCKET_TOPIC, unsentSessionKeys, webSocketMessage.getMessage()); }); } } public static void sendPongMessage(WebSocketSession session) { sendMessage(session, new PongMessage()); } public static void sendMessage(WebSocketSession session, String message) { sendMessage(session, new TextMessage(message)); } private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) { if (session == null || !session.isOpen()) { log.error("[send] sessionä¼è¯å·²ç»å ³é"); } else { try { // è·åå½åä¼è¯ä¸çç¨æ· LoginUser loginUser = (LoginUser) session.getAttributes().get(LOGIN_USER_KEY); session.sendMessage(message); log.info("[send] sessionId: {},userId:{},userType:{},message:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType(), message); } catch (IOException e) { log.error("[send] session({}) åéæ¶æ¯({}) å¼å¸¸", session, message, e); } } } } ruoyi-common/ruoyi-common-websocket/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
¶Ô±ÈÐÂÎļþ @@ -0,0 +1 @@ com.ruoyi.common.websocket.config.WebSocketConfig ruoyi-modules/ruoyi-demo/pom.xml
@@ -94,6 +94,10 @@ <artifactId>ruoyi-common-tenant</artifactId> </dependency> <dependency> <groupId>com.ruoyi</groupId> <artifactId>ruoyi-common-websocket</artifactId> </dependency> <!-- çä¿¡ ç¨åªä¸ªå¯¼å ¥åªä¸ªä¾èµ --> <!-- <dependency>--> <!-- <groupId>com.aliyun</groupId>--> ruoyi-modules/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/WeSocketController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,33 @@ package com.ruoyi.demo.controller; import com.ruoyi.common.core.domain.R; import com.ruoyi.common.websocket.dto.WebSocketMessageDto; import com.ruoyi.common.websocket.utils.WebSocketUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * WebSocket æ¼ç¤ºæ¡ä¾ * * @author zendwang */ @RequiredArgsConstructor @RestController @RequestMapping("/demo/websocket") @Slf4j public class WeSocketController { /** * å叿¶æ¯ * * @param dto åéå 容 */ @GetMapping("/send") public R<Void> send(WebSocketMessageDto dto) throws InterruptedException { WebSocketUtils.publishMessage(dto); return R.ok("æä½æå"); } }