| | |
| | | package org.dromara.common.sse.core; |
| | | |
| | | import cn.hutool.core.collection.CollUtil; |
| | | import cn.hutool.core.map.MapUtil; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.sse.dto.SseMessageDto; |
| | | import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.function.Consumer; |
| | | |
| | | /** |
| | | * 管理 Server-Sent Events (SSE) 连接 |
| | | * |
| | | * @author Lion Li |
| | | */ |
| | | @Slf4j |
| | | public class SseEmitterManager { |
| | | |
| | | /** |
| | | * 订阅的频道 |
| | | */ |
| | |
| | | |
| | | private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 建立与指定用户的 SSE 连接 |
| | | * |
| | | * @param userId 用户的唯一标识符,用于区分不同用户的连接 |
| | | * @param token 用户的唯一令牌,用于识别具体的连接 |
| | | * @return 返回一个 SseEmitter 实例,客户端可以通过该实例接收 SSE 事件 |
| | | */ |
| | | public SseEmitter connect(Long userId, String token) { |
| | | // 从 USER_TOKEN_EMITTERS 中获取或创建当前用户的 SseEmitter 映射表(ConcurrentHashMap) |
| | | // 每个用户可以有多个 SSE 连接,通过 token 进行区分 |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); |
| | | |
| | | // 创建一个新的 SseEmitter 实例,超时时间设置为 0 表示无限制 |
| | | SseEmitter emitter = new SseEmitter(0L); |
| | | |
| | | emitters.put(token, emitter); |
| | | |
| | | // 当 emitter 完成、超时或发生错误时,从映射表中移除对应的 token |
| | | emitter.onCompletion(() -> emitters.remove(token)); |
| | | emitter.onTimeout(() -> emitters.remove(token)); |
| | | emitter.onError((e) -> emitters.remove(token)); |
| | | |
| | | try { |
| | | // 向客户端发送一条连接成功的事件 |
| | | emitter.send(SseEmitter.event().comment("connected")); |
| | | } catch (IOException e) { |
| | | // 如果发送消息失败,则从映射表中移除 emitter |
| | | emitters.remove(token); |
| | | } |
| | | return emitter; |
| | | } |
| | | |
| | | /** |
| | | * 断开指定用户的 SSE 连接 |
| | | * |
| | | * @param userId 用户的唯一标识符,用于区分不同用户的连接 |
| | | * @param token 用户的唯一令牌,用于识别具体的连接 |
| | | */ |
| | | public void disconnect(Long userId, String token) { |
| | | if (userId == null || token == null) { |
| | | return; |
| | | } |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | if (MapUtil.isNotEmpty(emitters)) { |
| | | try { |
| | | emitters.get(token).send(SseEmitter.event().comment("disconnected")); |
| | | SseEmitter sseEmitter = emitters.get(token); |
| | | sseEmitter.send(SseEmitter.event().comment("disconnected")); |
| | | sseEmitter.complete(); |
| | | } catch (Exception ignore) { |
| | | } |
| | | emitters.remove(token); |
| | | } else { |
| | | USER_TOKEN_EMITTERS.remove(userId); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void sendMessage(Long userId, String message) { |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | if (MapUtil.isNotEmpty(emitters)) { |
| | | for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { |
| | | try { |
| | | entry.getValue().send(SseEmitter.event() |
| | |
| | | emitters.remove(entry.getKey()); |
| | | } |
| | | } |
| | | } else { |
| | | USER_TOKEN_EMITTERS.remove(userId); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param sseMessageDto 要发布的SSE消息对象 |
| | | */ |
| | | public void publishMessage(SseMessageDto sseMessageDto) { |
| | | List<Long> unsentUserIds = new ArrayList<>(); |
| | | // 当前服务内用户,直接发送消息 |
| | | for (Long userId : sseMessageDto.getUserIds()) { |
| | | if (USER_TOKEN_EMITTERS.containsKey(userId)) { |
| | | sendMessage(userId, sseMessageDto.getMessage()); |
| | | continue; |
| | | } |
| | | unsentUserIds.add(userId); |
| | | } |
| | | // 不在当前服务内用户,发布订阅消息 |
| | | if (CollUtil.isNotEmpty(unsentUserIds)) { |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(unsentUserIds); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(sseMessageDto.getUserIds()); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | |
| | | /** |