From f1208474f771a1c233d7425c8ed13fbaa0d521ac Mon Sep 17 00:00:00 2001
From: baoshiwei <baoshiwei@shlanbao.cn>
Date: 星期三, 12 三月 2025 09:35:13 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/5.X' into 5.X

---
 ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java |   69 +++++++++++++++++++++++++++++++---
 1 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java
index 45b5496..7c09e31 100644
--- a/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java
+++ b/ruoyi-common/ruoyi-common-redis/src/main/java/org/dromara/common/redis/utils/QueueUtils.java
@@ -1,12 +1,13 @@
 package org.dromara.common.redis.utils;
 
-import org.dromara.common.core.utils.SpringUtils;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import org.dromara.common.core.utils.SpringUtils;
 import org.redisson.api.*;
 
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * 鍒嗗竷寮忛槦鍒楀伐鍏�
@@ -132,6 +133,32 @@
     }
 
     /**
+     * 浼樺厛闃熷垪鑾峰彇涓�涓槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null(涓嶆敮鎸佸欢杩熼槦鍒�)
+     *
+     * @param queueName 闃熷垪鍚�
+     */
+    public static <T> T getPriorityQueueObject(String queueName) {
+        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
+        return queue.poll();
+    }
+
+    /**
+     * 浼樺厛闃熷垪鍒犻櫎闃熷垪鏁版嵁(涓嶆敮鎸佸欢杩熼槦鍒�)
+     */
+    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
+        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
+        return queue.remove(data);
+    }
+
+    /**
+     * 浼樺厛闃熷垪閿�姣侀槦鍒� 鎵�鏈夐樆濉炵洃鍚� 鎶ラ敊(涓嶆敮鎸佸欢杩熼槦鍒�)
+     */
+    public static <T> boolean destroyPriorityQueue(String queueName) {
+        RPriorityBlockingQueue<T> queue = CLIENT.getPriorityBlockingQueue(queueName);
+        return queue.delete();
+    }
+
+    /**
      * 灏濊瘯璁剧疆 鏈夌晫闃熷垪 瀹归噺 鐢ㄤ簬闄愬埗鏁伴噺
      *
      * @param queueName 闃熷垪鍚�
@@ -147,12 +174,12 @@
      *
      * @param queueName 闃熷垪鍚�
      * @param capacity  瀹归噺
-     * @param destroy   宸插瓨鍦ㄦ槸鍚﹂攢姣�
+     * @param destroy   鏄惁閿�姣�
      */
     public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
         RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
-        if (boundedBlockingQueue.isExists() && destroy) {
-            destroyQueue(queueName);
+        if (destroy) {
+            boundedBlockingQueue.delete();
         }
         return boundedBlockingQueue.trySetCapacity(capacity);
     }
@@ -170,10 +197,40 @@
     }
 
     /**
+     * 鏈夌晫闃熷垪鑾峰彇涓�涓槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null(涓嶆敮鎸佸欢杩熼槦鍒�)
+     *
+     * @param queueName 闃熷垪鍚�
+     */
+    public static <T> T getBoundedQueueObject(String queueName) {
+        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
+        return queue.poll();
+    }
+
+    /**
+     * 鏈夌晫闃熷垪鍒犻櫎闃熷垪鏁版嵁(涓嶆敮鎸佸欢杩熼槦鍒�)
+     */
+    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
+        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
+        return queue.remove(data);
+    }
+
+    /**
+     * 鏈夌晫闃熷垪閿�姣侀槦鍒� 鎵�鏈夐樆濉炵洃鍚� 鎶ラ敊(涓嶆敮鎸佸欢杩熼槦鍒�)
+     */
+    public static <T> boolean destroyBoundedQueue(String queueName) {
+        RBoundedBlockingQueue<T> queue = CLIENT.getBoundedBlockingQueue(queueName);
+        return queue.delete();
+    }
+
+    /**
      * 璁㈤槄闃诲闃熷垪(鍙闃呮墍鏈夊疄鐜扮被 渚嬪: 寤惰繜 浼樺厛 鏈夌晫 绛�)
      */
-    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
+    public static <T> void subscribeBlockingQueue(String queueName, Function<T, CompletionStage<Void>> consumer, boolean isDelayed) {
         RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        if (isDelayed) {
+            // 璁㈤槄寤惰繜闃熷垪
+            CLIENT.getDelayedQueue(queue);
+        }
         queue.subscribeOnElements(consumer);
     }
 

--
Gitblit v1.9.3