疯狂的狮子li
2022-08-31 bd338dd9341a3473a2a504d55c00796dd7b98f5a
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
@@ -31,6 +31,43 @@
    }
    /**
     * 添加普通队列数据
     *
     * @param queueName 队列名
     * @param data      数据
     */
    public static <T> boolean addQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.offer(data);
    }
    /**
     * 通用获取一个队列数据 没有数据返回 null(不支持延迟队列)
     *
     * @param queueName 队列名
     */
    public static <T> T getQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.poll();
    }
    /**
     * 通用删除队列数据(不支持延迟队列)
     */
    public static <T> boolean removeQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.remove(data);
    }
    /**
     * 通用销毁队列 所有阻塞监听 报错(不支持延迟队列)
     */
    public static <T> boolean destroyQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        return queue.delete();
    }
    /**
     * 添加延迟队列数据 默认毫秒
     *
     * @param queueName 队列名
@@ -52,11 +89,18 @@
    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        // 已存在则无视
        if (delayedQueue.contains(data)) {
            return;
        }
        delayedQueue.offer(data, time, timeUnit);
    }
    /**
     * 获取一个延迟队列数据 没有数据返回 null
     *
     * @param queueName 队列名
     */
    public static <T> T getDelayedQueueObject(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.poll();
    }
    /**
@@ -82,23 +126,12 @@
     *
     * @param queueName  队列名
     * @param comparator 比较器
     */
    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.trySetComparator(comparator);
    }
    /**
     * 尝试设置 优先队列比较器 用于排序优先级
     *
     * @param queueName  队列名
     * @param comparator 比较器
     * @param destroy    已存在是否销毁
     */
    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        if (priorityBlockingQueue.isExists() && destroy) {
            destroyPriorityQueueObject(queueName);
            destroyQueue(queueName);
        }
        return priorityBlockingQueue.trySetComparator(comparator);
    }
@@ -112,32 +145,6 @@
    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.offer(data);
    }
    /**
     * 获取一个优先队列数据 没有数据返回 null
     *
     * @param queueName 队列名
     */
    public static <T> T getPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.poll();
    }
    /**
     * 删除优先队列数据
     */
    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.remove(data);
    }
    /**
     * 销毁优先队列
     */
    public static boolean destroyPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.delete();
    }
    /**
@@ -161,7 +168,7 @@
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        if (boundedBlockingQueue.isExists() && destroy) {
            destroyBoundedQueueObject(queueName);
            destroyQueue(queueName);
        }
        return boundedBlockingQueue.trySetCapacity(capacity);
    }
@@ -176,32 +183,6 @@
    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.offer(data);
    }
    /**
     * 获取一个有界队列数据 没有数据返回 null
     *
     * @param queueName 队列名
     */
    public static <T> T getBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.poll();
    }
    /**
     * 删除有界队列数据
     */
    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.remove(data);
    }
    /**
     * 销毁有界队列
     */
    public static boolean destroyBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.delete();
    }
    /**