| | |
| | | package org.dromara.common.sse.core; |
| | | |
| | | import cn.hutool.core.map.MapUtil; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.sse.dto.SseMessageDto; |
| | |
| | | * @param token 用户的唯一令牌,用于识别具体的连接 |
| | | */ |
| | | public void disconnect(Long userId, String token) { |
| | | if (userId == null || token == null) { |
| | | return; |
| | | } |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | if (MapUtil.isNotEmpty(emitters)) { |
| | | try { |
| | | SseEmitter sseEmitter = emitters.get(token); |
| | | sseEmitter.send(SseEmitter.event().comment("disconnected")); |
| | |
| | | } catch (Exception ignore) { |
| | | } |
| | | emitters.remove(token); |
| | | } else { |
| | | USER_TOKEN_EMITTERS.remove(userId); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void sendMessage(Long userId, String message) { |
| | | Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.get(userId); |
| | | if (emitters != null) { |
| | | if (MapUtil.isNotEmpty(emitters)) { |
| | | for (Map.Entry<String, SseEmitter> entry : emitters.entrySet()) { |
| | | try { |
| | | entry.getValue().send(SseEmitter.event() |
| | |
| | | emitters.remove(entry.getKey()); |
| | | } |
| | | } |
| | | } else { |
| | | USER_TOKEN_EMITTERS.remove(userId); |
| | | } |
| | | } |
| | | |
| | |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(sseMessageDto.getUserIds()); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()) |
| | | ); |
| | | SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void publishAll(String message) { |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(message); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> |
| | | log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message) |
| | | ); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} message:{}", SSE_TOPIC, message); |
| | | }); |
| | | } |
| | | } |