| | |
| | | package org.dromara.common.redis.utils; |
| | | |
| | | import org.dromara.common.core.utils.SpringUtils; |
| | | import lombok.AccessLevel; |
| | | import lombok.NoArgsConstructor; |
| | | import org.dromara.common.core.utils.SpringUtils; |
| | | import org.redisson.api.*; |
| | | |
| | | import java.util.concurrent.CompletionStage; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.function.Consumer; |
| | | import java.util.function.Function; |
| | | |
| | | /** |
| | | * 分布式队列工具 |
| | |
| | | * |
| | | * @param queueName 队列名 |
| | | * @param capacity 容量 |
| | | * @param destroy 已存在是否销毁 |
| | | * @param destroy 是否销毁 |
| | | */ |
| | | public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) { |
| | | RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); |
| | | if (boundedBlockingQueue.isExists() && destroy) { |
| | | destroyQueue(queueName); |
| | | if (destroy) { |
| | | boundedBlockingQueue.delete(); |
| | | } |
| | | return boundedBlockingQueue.trySetCapacity(capacity); |
| | | } |
| | |
| | | /** |
| | | * 订阅阻塞队列(可订阅所有实现类 例如: 延迟 优先 有界 等) |
| | | */ |
| | | public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) { |
| | | public static <T> void subscribeBlockingQueue(String queueName, Function<T, CompletionStage<Void>> consumer, boolean isDelayed) { |
| | | RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); |
| | | if (isDelayed) { |
| | | // 订阅延迟队列 |
| | | CLIENT.getDelayedQueue(queue); |
| | | } |
| | | queue.subscribeOnElements(consumer); |
| | | } |
| | | |