From 08e0ed4fc6b53d80feebc5add86d1fd70e75b952 Mon Sep 17 00:00:00 2001
From: 疯狂的狮子li <15040126243@163.com>
Date: 星期二, 28 十二月 2021 11:23:32 +0800
Subject: [PATCH] [重磅更新] 增加 轻量级 分布式队列 支持

---
 ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java               |  215 +++++++++++++++++++++++
 ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java  |   83 +++++++++
 ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java |   85 +++++++++
 ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java  |   16 +
 ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java  |   79 ++++++++
 ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java            |   19 ++
 README.md                                                                             |    1 
 7 files changed, 498 insertions(+), 0 deletions(-)

diff --git a/README.md b/README.md
index cbf2da3..4916257 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@
 | 搴忓垪鍖栨鏋� | Jackson | [Jackson瀹樼綉](https://github.com/FasterXML/jackson) | 缁熶竴浣跨敤 jackson 楂樻晥鍙潬 |
 | Redis瀹㈡埛绔� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鏀寔鍗曟満銆侀泦缇ら厤缃� |
 | 鍒嗗竷寮忛檺娴� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鍏ㄥ眬銆佽姹侷P銆侀泦缇D 澶氱闄愭祦 |
+| 鍒嗗竷寮忛槦鍒� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鏅�氶槦鍒椼�佸欢杩熼槦鍒椼�佷紭鍏堥槦鍒� 绛� |
 | 鍒嗗竷寮忛攣 | Lock4j | [Lock4j瀹樼綉](https://gitee.com/baomidou/lock4j) | 娉ㄨВ閿併�佸伐鍏烽攣 澶氱澶氭牱 |
 | 鍒嗗竷寮忓箓绛� | Redisson | [Lock4j鏂囨。](https://gitee.com/baomidou/lock4j) | 鎷︽埅閲嶅鎻愪氦 |
 | 鍒嗗竷寮忔棩蹇� | TLog | [TLog鏂囨。](https://yomahub.com/tlog/docs) | 鏀寔璺熻釜閾捐矾鏃ュ織璁板綍銆佹�ц兘鍒嗘瀽銆侀摼璺帓鏌� |
diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
new file mode 100644
index 0000000..c70acba
--- /dev/null
+++ b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
@@ -0,0 +1,215 @@
+package com.ruoyi.common.utils.redis;
+
+import com.ruoyi.common.utils.spring.SpringUtils;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.redisson.api.*;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * 鍒嗗竷寮忛槦鍒楀伐鍏�
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * 瑕佹眰 redis 5.X 浠ヤ笂
+ *
+ * @author Lion Li
+ * @version 3.6.0 鏂板
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class QueueUtils {
+
+    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
+
+
+    /**
+     * 鑾峰彇瀹㈡埛绔疄渚�
+     */
+    public static RedissonClient getClient() {
+        return CLIENT;
+    }
+
+    /**
+     * 娣诲姞寤惰繜闃熷垪鏁版嵁 榛樿姣
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param data      鏁版嵁
+     * @param time      寤惰繜鏃堕棿
+     */
+    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
+        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 娣诲姞寤惰繜闃熷垪鏁版嵁
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param data      鏁版嵁
+     * @param time      寤惰繜鏃堕棿
+     * @param timeUnit  鍗曚綅
+     */
+    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        // 宸插瓨鍦ㄥ垯鏃犺
+        if (delayedQueue.contains(data)) {
+            return;
+        }
+        delayedQueue.offer(data, time, timeUnit);
+    }
+
+    /**
+     * 鍒犻櫎寤惰繜闃熷垪鏁版嵁
+     */
+    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        return delayedQueue.remove(data);
+    }
+
+    /**
+     * 閿�姣佸欢杩熼槦鍒� 鎵�鏈夐樆濉炵洃鍚� 鎶ラ敊
+     */
+    public static <T> void destroyDelayedQueue(String queueName) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+        delayedQueue.destroy();
+    }
+
+    /**
+     * 灏濊瘯璁剧疆 浼樺厛闃熷垪姣旇緝鍣� 鐢ㄤ簬鎺掑簭浼樺厛绾�
+     *
+     * @param queueName  闃熷垪鍚�
+     * @param comparator 姣旇緝鍣�
+     */
+    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.trySetComparator(comparator);
+    }
+
+    /**
+     * 灏濊瘯璁剧疆 浼樺厛闃熷垪姣旇緝鍣� 鐢ㄤ簬鎺掑簭浼樺厛绾�
+     *
+     * @param queueName  闃熷垪鍚�
+     * @param comparator 姣旇緝鍣�
+     * @param destroy    宸插瓨鍦ㄦ槸鍚﹂攢姣�
+     */
+    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        if (priorityBlockingQueue.isExists() && destroy) {
+            destroyPriorityQueueObject(queueName);
+        }
+        return priorityBlockingQueue.trySetComparator(comparator);
+    }
+
+    /**
+     * 娣诲姞浼樺厛闃熷垪鏁版嵁
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param data      鏁版嵁
+     */
+    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.offer(data);
+    }
+
+    /**
+     * 鑾峰彇涓�涓紭鍏堥槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null
+     *
+     * @param queueName 闃熷垪鍚�
+     */
+    public static <T> T getPriorityQueueObject(String queueName) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.poll();
+    }
+
+    /**
+     * 鍒犻櫎浼樺厛闃熷垪鏁版嵁
+     */
+    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
+        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.remove(data);
+    }
+
+    /**
+     * 閿�姣佷紭鍏堥槦鍒�
+     */
+    public static boolean destroyPriorityQueueObject(String queueName) {
+        RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+        return priorityBlockingQueue.delete();
+    }
+
+    /**
+     * 灏濊瘯璁剧疆 鏈夌晫闃熷垪 瀹归噺 鐢ㄤ簬闄愬埗鏁伴噺
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param capacity  瀹归噺
+     */
+    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.trySetCapacity(capacity);
+    }
+
+    /**
+     * 灏濊瘯璁剧疆 鏈夌晫闃熷垪 瀹归噺 鐢ㄤ簬闄愬埗鏁伴噺
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param capacity  瀹归噺
+     * @param destroy   宸插瓨鍦ㄦ槸鍚﹂攢姣�
+     */
+    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        if (boundedBlockingQueue.isExists() && destroy) {
+            destroyBoundedQueueObject(queueName);
+        }
+        return boundedBlockingQueue.trySetCapacity(capacity);
+    }
+
+    /**
+     * 娣诲姞鏈夌晫闃熷垪鏁版嵁
+     *
+     * @param queueName 闃熷垪鍚�
+     * @param data      鏁版嵁
+     * @return 娣诲姞鎴愬姛 true 宸茶揪鍒扮晫闄� false
+     */
+    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.offer(data);
+    }
+
+    /**
+     * 鑾峰彇涓�涓湁鐣岄槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null
+     *
+     * @param queueName 闃熷垪鍚�
+     */
+    public static <T> T getBoundedQueueObject(String queueName) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.poll();
+    }
+
+    /**
+     * 鍒犻櫎鏈夌晫闃熷垪鏁版嵁
+     */
+    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
+        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.remove(data);
+    }
+
+    /**
+     * 閿�姣佹湁鐣岄槦鍒�
+     */
+    public static boolean destroyBoundedQueueObject(String queueName) {
+        RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+        return boundedBlockingQueue.delete();
+    }
+
+    /**
+     * 璁㈤槄闃诲闃熷垪(鍙闃呮墍鏈夊疄鐜扮被 渚嬪: 寤惰繜 浼樺厛 鏈夌晫 绛�)
+     */
+    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
+        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+        queue.subscribeOnElements(consumer);
+    }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
new file mode 100644
index 0000000..9be47a1
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
@@ -0,0 +1,83 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 鏈夌晫闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓暟鎹彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 涓ょ鍒嗗埆璋冪敤鑾峰彇鎺ュ彛 涓�娆¤幏鍙栦竴鏉�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "鏈夌晫闃熷垪 婕旂ず妗堜緥", tags = {"鏈夌晫闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/bounded")
+public class BoundedQueueController {
+
+
+    @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName,
+                                @ApiParam("瀹归噺") int capacity) {
+        // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+        boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
+        log.info("閫氶亾: {} , 鍒犻櫎: {}", queueName, b);
+        // 鍒濆鍖栬缃竴娆″嵆鍙�
+        if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
+            log.info("閫氶亾: {} , 璁剧疆瀹归噺: {}", queueName, capacity);
+        } else {
+            log.info("閫氶亾: {} , 璁剧疆瀹归噺澶辫触", queueName);
+            return AjaxResult.error("鎿嶄綔澶辫触");
+        }
+        for (int i = 0; i < 11; i++) {
+            String data = "data-" + i;
+            boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
+            if (flag == false) {
+                log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {} 澶辫触, 閫氶亾宸叉弧", queueName, data);
+            } else {
+                log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, data);
+            }
+        }
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName) {
+        String data = "data-" + 5;
+        if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
+            log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, data);
+        } else {
+            return AjaxResult.error("鎿嶄綔澶辫触");
+        }
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("鑾峰彇闃熷垪鏁版嵁")
+    @GetMapping("/get")
+    public AjaxResult<Void> get(@ApiParam("闃熷垪鍚�") String queueName) {
+        String data;
+        do {
+            data = QueueUtils.getBoundedQueueObject(queueName);
+            log.info("閫氶亾: {} , 鑾峰彇鏁版嵁: {}", queueName, data);
+        } while (data != null);
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
new file mode 100644
index 0000000..f81765b
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
@@ -0,0 +1,79 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 寤惰繜闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * 渚嬪: 鍒涘缓璁㈠崟30鍒嗛挓鍚庤繃鏈熷鐞�
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓暟鎹彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 涓ゅ彴闆嗙兢鍒嗗埆寮�鍚闃� 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 瑙傚療鎺ユ敹娑堟伅鐨勮寰�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "寤惰繜闃熷垪 婕旂ず妗堜緥", tags = {"寤惰繜闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/delayed")
+public class DelayedQueueController {
+
+    @ApiOperation("璁㈤槄闃熷垪")
+    @GetMapping("/subscribe")
+    public AjaxResult<Void> subscribe(@ApiParam("闃熷垪鍚�") String queueName) {
+        log.info("閫氶亾: {} 鐩戝惉涓�......", queueName);
+        // 椤圭洰鍒濆鍖栬缃竴娆″嵆鍙�
+        QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
+            // 瑙傚療鎺ユ敹鏃堕棿
+            log.info("閫氶亾: {}, 鏀跺埌鏁版嵁: {}", queueName, orderNum);
+        });
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName,
+                                @ApiParam("璁㈠崟鍙�") String orderNum,
+                                @ApiParam("寤惰繜鏃堕棿(绉�)") Long time) {
+        QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
+        // 瑙傚療鍙戦�佹椂闂�
+        log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, orderNum);
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName,
+                                   @ApiParam("璁㈠崟鍙�") String orderNum) {
+        if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
+            log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, orderNum);
+        } else {
+            return AjaxResult.error("鎿嶄綔澶辫触");
+        }
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("閿�姣侀槦鍒�")
+    @GetMapping("/destroy")
+    public AjaxResult<Void> destroy(@ApiParam("闃熷垪鍚�") String queueName) {
+        // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+        QueueUtils.destroyDelayedQueue(queueName);
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
new file mode 100644
index 0000000..0857690
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
@@ -0,0 +1,19 @@
+package com.ruoyi.demo.controller.queue;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+/**
+ * 瀹炰綋绫� 娉ㄦ剰涓嶅厑璁镐娇鐢ㄥ唴閮ㄧ被 鍚﹀垯浼氭壘涓嶅埌绫�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Data
+@Accessors(chain = true)
+@NoArgsConstructor
+public class PriorityDemo {
+    private String name;
+    private Integer orderNum;
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
new file mode 100644
index 0000000..f72e695
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
@@ -0,0 +1,16 @@
+package com.ruoyi.demo.controller.queue;
+
+import java.util.Comparator;
+
+/**
+ * 姣旇緝鍣� 娉ㄦ剰涓嶅厑璁镐娇鐢� 鍐呴儴绫绘垨鍖垮悕绫绘垨lambda琛ㄨ揪寮� 浼氭壘涓嶅埌绫�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+public class PriorityDemoComparator implements Comparator<PriorityDemo> {
+    @Override
+    public int compare(PriorityDemo pd1, PriorityDemo pd2) {
+        return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum());
+    }
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
new file mode 100644
index 0000000..130c1f0
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
@@ -0,0 +1,85 @@
+package com.ruoyi.demo.controller.queue;
+
+import cn.hutool.core.util.RandomUtil;
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 浼樺厛闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓秷鎭彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 涓ょ鍒嗗埆璋冪敤鑾峰彇鎺ュ彛 涓�娆¤幏鍙栦竴鏉�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "浼樺厛闃熷垪 婕旂ず妗堜緥", tags = {"浼樺厛闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/priority")
+public class PriorityQueueController {
+
+    @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+    @GetMapping("/add")
+    public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName) {
+        // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+        boolean b = QueueUtils.destroyPriorityQueueObject(queueName);
+        log.info("閫氶亾: {} , 鍒犻櫎: {}", queueName, b);
+        // 鍒濆鍖栬缃竴娆″嵆鍙� 姝ゅ娉ㄦ剰 涓嶅厑璁哥敤鍐呴儴绫绘垨鍖垮悕绫�
+        boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator());
+        if (flag) {
+            log.info("閫氶亾: {} , 璁剧疆姣旇緝鍣ㄦ垚鍔�", queueName);
+        } else {
+            log.info("閫氶亾: {} , 璁剧疆姣旇緝鍣ㄥけ璐�", queueName);
+            return AjaxResult.error("鎿嶄綔澶辫触");
+        }
+        for (int i = 0; i < 10; i++) {
+            int randomNum = RandomUtil.randomInt(10);
+            PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum);
+            if (QueueUtils.addPriorityQueueObject(queueName, data)) {
+                log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, data);
+            } else {
+                log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}, 鍙戦�佸け璐�", queueName, data);
+            }
+        }
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+    @GetMapping("/remove")
+    public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName,
+                                   @ApiParam("瀵硅薄鍚�") String name,
+                                   @ApiParam("鎺掑簭鍙�") Integer orderNum) {
+        PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum);
+        if (QueueUtils.removePriorityQueueObject(queueName, data)) {
+            log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, data);
+        } else {
+            return AjaxResult.error("鎿嶄綔澶辫触");
+        }
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+    @ApiOperation("鑾峰彇闃熷垪鏁版嵁")
+    @GetMapping("/get")
+    public AjaxResult<Void> get(@ApiParam("闃熷垪鍚�") String queueName) {
+        PriorityDemo data;
+        do {
+            data = QueueUtils.getPriorityQueueObject(queueName);
+            log.info("閫氶亾: {} , 鑾峰彇鏁版嵁: {}", queueName, data);
+        } while (data != null);
+        return AjaxResult.success("鎿嶄綔鎴愬姛");
+    }
+
+}

--
Gitblit v1.9.3