fix 修复 延迟队列在投递消息未到达时间的时候 服务死机导致重启收不到消息
| | |
| | | /** |
| | | * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等) |
| | | */ |
| | | public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) { |
| | | public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer, boolean isDelayed) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | if (isDelayed) { |
| | | // 订阅延迟队列 |
| | | CLIENT.getDelayedQueue(queue); |
| | | } |
| | | queue.subscribeOnElements(consumer); |
| | | } |
| | | |
| | |
| | | package org.dromara.demo.controller.queue; |
| | | |
| | | import cn.dev33.satoken.annotation.SaIgnore; |
| | | import org.dromara.common.core.domain.R; |
| | | import org.dromara.common.redis.utils.QueueUtils; |
| | | import lombok.RequiredArgsConstructor; |
| | |
| | | * @author Lion Li |
| | | * @version 3.6.0 |
| | | */ |
| | | @SaIgnore |
| | | @Slf4j |
| | | @RequiredArgsConstructor |
| | | @RestController |
| | |
| | | QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> { |
| | | // 观察接收时间 |
| | | log.info("通道: {}, 收到数据: {}", queueName, orderNum); |
| | | }); |
| | | }, true); |
| | | return R.ok("操作成功"); |
| | | } |
| | | |