| | |
| | | package org.dromara.common.sse.core; |
| | | |
| | | import cn.hutool.core.collection.CollUtil; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.dromara.common.redis.utils.RedisUtils; |
| | | import org.dromara.common.sse.dto.SseMessageDto; |
| | | import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.function.Consumer; |
| | |
| | | * @param sseMessageDto 要发布的SSE消息对象 |
| | | */ |
| | | public void publishMessage(SseMessageDto sseMessageDto) { |
| | | List<Long> unsentUserIds = new ArrayList<>(); |
| | | // 当前服务内用户,直接发送消息 |
| | | for (Long userId : sseMessageDto.getUserIds()) { |
| | | if (USER_TOKEN_EMITTERS.containsKey(userId)) { |
| | | sendMessage(userId, sseMessageDto.getMessage()); |
| | | continue; |
| | | } |
| | | unsentUserIds.add(userId); |
| | | } |
| | | // 不在当前服务内用户,发布订阅消息 |
| | | if (CollUtil.isNotEmpty(unsentUserIds)) { |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(unsentUserIds); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, unsentUserIds, sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | SseMessageDto broadcastMessage = new SseMessageDto(); |
| | | broadcastMessage.setMessage(sseMessageDto.getMessage()); |
| | | broadcastMessage.setUserIds(sseMessageDto.getUserIds()); |
| | | RedisUtils.publish(SSE_TOPIC, broadcastMessage, consumer -> { |
| | | log.info("SSE发送主题订阅消息topic:{} session keys:{} message:{}", |
| | | SSE_TOPIC, sseMessageDto.getUserIds(), sseMessageDto.getMessage()); |
| | | }); |
| | | } |
| | | |
| | | /** |