¶Ô±ÈÐÂÎļþ |
| | |
| | | package org.dromara.common.sse.core; |
| | | |
| | | import cn.hutool.core.collection.CollUtil; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.sse.dto.SseMessageDto; |
| | | import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.function.Consumer; |
| | | |
| | | /** |
| | | * 管ç Server-Sent Events (SSE) è¿æ¥ |
| | | * |
| | | * @author Lion Li |
| | | */ |
| | | @Slf4j |
| | | public class SseEmitterManager { |
| | | |
| | | /** |
| | | * 订é
çé¢é |
| | | */ |
| | | private final static String SSE_TOPIC = "global:sse"; |
| | | |
| | | private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 建ç«ä¸æå®ç¨æ·ç SSE è¿æ¥ |
| | | * |
| | | * @param userId ç¨æ·çå¯ä¸æ è¯ç¬¦ï¼ç¨äºåºåä¸åç¨æ·çè¿æ¥ |
| | | * @param token ç¨æ·çå¯ä¸ä»¤çï¼ç¨äºè¯å«å
·ä½çè¿æ¥ |
| | | * @return è¿åä¸ä¸ª SseEmitter å®ä¾ï¼å®¢æ·ç«¯å¯ä»¥éè¿è¯¥å®ä¾æ¥æ¶ SSE äºä»¶ |
| | | */ |
| | | public SseEmitter connect(Long userId, String token) { |
| | | // ä» USER_TOKEN_EMITTERS ä¸è·åæå建å½åç¨æ·ç SseEmitter æ å°è¡¨ï¼ConcurrentHashMapï¼ |
| | | // æ¯ä¸ªç¨æ·å¯ä»¥æå¤ä¸ª SSE è¿æ¥ï¼éè¿ token è¿è¡åºå |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); |
| | | |
| | | // å建ä¸ä¸ªæ°ç SseEmitter å®ä¾ï¼è¶
æ¶æ¶é´è®¾ç½®ä¸º 0 表示æ éå¶ |
| | | SseEmitter emitter = new SseEmitter(0L); |
| | | |
| | | emitters.put(token, emitter); |
| | | |
| | | // å½ emitter 宿ãè¶
æ¶æåçé误æ¶ï¼ä»æ å°è¡¨ä¸ç§»é¤å¯¹åºç token |
| | | emitter.onCompletion(() -> emitters.remove(token)); |
| | | emitter.onTimeout(() -> emitters.remove(token)); |
| | | emitter.onError((e) -> emitters.remove(token)); |
| | | |
| | | try { |
| | | // å客æ·ç«¯åé䏿¡è¿æ¥æåçäºä»¶ |
| | | emitter.send(SseEmitter.event().comment("connected")); |
| | | } catch (IOException e) { |
| | | // 妿åéæ¶æ¯å¤±è´¥ï¼å仿 å°è¡¨ä¸ç§»é¤ emitter |
| | | emitters.remove(token); |
| | | } |
| | | return emitter; |
| | | } |
| | | |
| | | /** |
| | | * æå¼æå®ç¨æ·ç SSE è¿æ¥ |
| | | * |
| | | * @param userId ç¨æ·çå¯ä¸æ è¯ç¬¦ï¼ç¨äºåºåä¸åç¨æ·çè¿æ¥ |
| | | * @param token ç¨æ·çå¯ä¸ä»¤çï¼ç¨äºè¯å«å
·ä½çè¿æ¥ |
| | | */ |
| | | public void disconnect(Long userId, String token) { |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | try { |
| | | emitters.get(token).send(SseEmitter.event().comment("disconnected")); |
| | | } catch (Exception ignore) { |
| | | } |
| | | emitters.remove(token); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 订é
SSEæ¶æ¯ä¸»é¢ï¼å¹¶æä¾ä¸ä¸ªæ¶è´¹è
彿°æ¥å¤çæ¥æ¶å°çæ¶æ¯ |
| | | * |
| | | * @param consumer å¤çSSEæ¶æ¯çæ¶è´¹è
彿° |
| | | */ |
| | | public void subscribeMessage(Consumer<SseMessageDto> consumer) { |
| | | RedisUtils.subscribe(SSE_TOPIC, SseMessageDto.class, consumer); |
| | | } |
| | | |
| | | /** |
| | | * åæå®çç¨æ·ä¼è¯åéæ¶æ¯ |
| | | * |
| | | * @param userId è¦åéæ¶æ¯çç¨æ·id |
| | | * @param message è¦åéçæ¶æ¯å
容 |
| | | */ |
| | | public void sendMessage(Long userId, String message) { |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { |
| | | try { |
| | | entry.getValue().send(SseEmitter.event() |
| | | .name("message") |
| | | .data(message)); |
| | | } catch (Exception e) { |
| | | emitters.remove(entry.getKey()); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æ¬æºå
¨ç¨æ·ä¼è¯åéæ¶æ¯ |
| | | * |
| | | * @param message è¦åéçæ¶æ¯å
容 |
| | | */ |
| | | public void sendMessage(String message) { |
| | | for (Long userId : USER_TOKEN_EMITTERS.keySet()) { |
| | | sendMessage(userId, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * åå¸SSE订é
æ¶æ¯ |
| | | * |
| | | * @param sseMessageDto è¦åå¸çSSEæ¶æ¯å¯¹è±¡ |
| | | */ |
| | | public void publishMessage(SseMessageDto sseMessageDto) { |
| | | List<Long> unsentUserIds = new ArrayList<>(); |
| | | // å½åæå¡å
ç¨æ·,ç´æ¥åéæ¶æ¯ |
| | | for (Long userId : sseMessageDto.getUserIds()) { |
| | | if (USER_TOKEN_EMITTERS.containsKey(userId)) { |
| | | sendMessage(userId, sseMessageDto.getMessage()); |
| | | continue; |
| | | } |
| | | unsentUserIds.add(userId); |
| | | } |
| | | // ä¸å¨å½åæå¡å
ç¨æ·,åå¸è®¢é
æ¶æ¯ |
| | | if (CollUtil.isNotEmpty(unsentUserIds)) { |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(unsentUserIds); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSEåé主é¢è®¢é
æ¶æ¯topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * åææçç¨æ·åå¸è®¢é
çæ¶æ¯(群å) |
| | | * |
| | | * @param message è¦åå¸çæ¶æ¯å
容 |
| | | */ |
| | | public void publishAll(String message) { |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(message); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSEåé主é¢è®¢é
æ¶æ¯topic:{} message:{}", SSE_TOPIC, message); |
| | | }); |
| | | } |
| | | } |