From 08e0ed4fc6b53d80feebc5add86d1fd70e75b952 Mon Sep 17 00:00:00 2001
From: 疯狂的狮子li <15040126243@163.com>
Date: 星期二, 28 十二月 2021 11:23:32 +0800
Subject: [PATCH] [重磅更新] 增加 轻量级 分布式队列 支持
---
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java | 215 +++++++++++++++++++++++
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java | 83 +++++++++
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java | 85 +++++++++
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java | 16 +
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java | 79 ++++++++
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java | 19 ++
README.md | 1
7 files changed, 498 insertions(+), 0 deletions(-)
diff --git a/README.md b/README.md
index cbf2da3..4916257 100644
--- a/README.md
+++ b/README.md
@@ -33,6 +33,7 @@
| 搴忓垪鍖栨鏋� | Jackson | [Jackson瀹樼綉](https://github.com/FasterXML/jackson) | 缁熶竴浣跨敤 jackson 楂樻晥鍙潬 |
| Redis瀹㈡埛绔� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鏀寔鍗曟満銆侀泦缇ら厤缃� |
| 鍒嗗竷寮忛檺娴� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鍏ㄥ眬銆佽姹侷P銆侀泦缇D 澶氱闄愭祦 |
+| 鍒嗗竷寮忛槦鍒� | Redisson | [Redisson鏂囨。](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | 鏅�氶槦鍒椼�佸欢杩熼槦鍒椼�佷紭鍏堥槦鍒� 绛� |
| 鍒嗗竷寮忛攣 | Lock4j | [Lock4j瀹樼綉](https://gitee.com/baomidou/lock4j) | 娉ㄨВ閿併�佸伐鍏烽攣 澶氱澶氭牱 |
| 鍒嗗竷寮忓箓绛� | Redisson | [Lock4j鏂囨。](https://gitee.com/baomidou/lock4j) | 鎷︽埅閲嶅鎻愪氦 |
| 鍒嗗竷寮忔棩蹇� | TLog | [TLog鏂囨。](https://yomahub.com/tlog/docs) | 鏀寔璺熻釜閾捐矾鏃ュ織璁板綍銆佹�ц兘鍒嗘瀽銆侀摼璺帓鏌� |
diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
new file mode 100644
index 0000000..c70acba
--- /dev/null
+++ b/ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
@@ -0,0 +1,215 @@
+package com.ruoyi.common.utils.redis;
+
+import com.ruoyi.common.utils.spring.SpringUtils;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.redisson.api.*;
+
+import java.util.Comparator;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * 鍒嗗竷寮忛槦鍒楀伐鍏�
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * 瑕佹眰 redis 5.X 浠ヤ笂
+ *
+ * @author Lion Li
+ * @version 3.6.0 鏂板
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class QueueUtils {
+
+ private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
+
+
+ /**
+ * 鑾峰彇瀹㈡埛绔疄渚�
+ */
+ public static RedissonClient getClient() {
+ return CLIENT;
+ }
+
+ /**
+ * 娣诲姞寤惰繜闃熷垪鏁版嵁 榛樿姣
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param data 鏁版嵁
+ * @param time 寤惰繜鏃堕棿
+ */
+ public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
+ addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * 娣诲姞寤惰繜闃熷垪鏁版嵁
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param data 鏁版嵁
+ * @param time 寤惰繜鏃堕棿
+ * @param timeUnit 鍗曚綅
+ */
+ public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
+ RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+ RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+ // 宸插瓨鍦ㄥ垯鏃犺
+ if (delayedQueue.contains(data)) {
+ return;
+ }
+ delayedQueue.offer(data, time, timeUnit);
+ }
+
+ /**
+ * 鍒犻櫎寤惰繜闃熷垪鏁版嵁
+ */
+ public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
+ RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+ RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+ return delayedQueue.remove(data);
+ }
+
+ /**
+ * 閿�姣佸欢杩熼槦鍒� 鎵�鏈夐樆濉炵洃鍚� 鎶ラ敊
+ */
+ public static <T> void destroyDelayedQueue(String queueName) {
+ RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+ RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
+ delayedQueue.destroy();
+ }
+
+ /**
+ * 灏濊瘯璁剧疆 浼樺厛闃熷垪姣旇緝鍣� 鐢ㄤ簬鎺掑簭浼樺厛绾�
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param comparator 姣旇緝鍣�
+ */
+ public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) {
+ RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ return priorityBlockingQueue.trySetComparator(comparator);
+ }
+
+ /**
+ * 灏濊瘯璁剧疆 浼樺厛闃熷垪姣旇緝鍣� 鐢ㄤ簬鎺掑簭浼樺厛绾�
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param comparator 姣旇緝鍣�
+ * @param destroy 宸插瓨鍦ㄦ槸鍚﹂攢姣�
+ */
+ public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) {
+ RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ if (priorityBlockingQueue.isExists() && destroy) {
+ destroyPriorityQueueObject(queueName);
+ }
+ return priorityBlockingQueue.trySetComparator(comparator);
+ }
+
+ /**
+ * 娣诲姞浼樺厛闃熷垪鏁版嵁
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param data 鏁版嵁
+ */
+ public static <T> boolean addPriorityQueueObject(String queueName, T data) {
+ RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ return priorityBlockingQueue.offer(data);
+ }
+
+ /**
+ * 鑾峰彇涓�涓紭鍏堥槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null
+ *
+ * @param queueName 闃熷垪鍚�
+ */
+ public static <T> T getPriorityQueueObject(String queueName) {
+ RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ return priorityBlockingQueue.poll();
+ }
+
+ /**
+ * 鍒犻櫎浼樺厛闃熷垪鏁版嵁
+ */
+ public static <T> boolean removePriorityQueueObject(String queueName, T data) {
+ RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ return priorityBlockingQueue.remove(data);
+ }
+
+ /**
+ * 閿�姣佷紭鍏堥槦鍒�
+ */
+ public static boolean destroyPriorityQueueObject(String queueName) {
+ RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
+ return priorityBlockingQueue.delete();
+ }
+
+ /**
+ * 灏濊瘯璁剧疆 鏈夌晫闃熷垪 瀹归噺 鐢ㄤ簬闄愬埗鏁伴噺
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param capacity 瀹归噺
+ */
+ public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
+ RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ return boundedBlockingQueue.trySetCapacity(capacity);
+ }
+
+ /**
+ * 灏濊瘯璁剧疆 鏈夌晫闃熷垪 瀹归噺 鐢ㄤ簬闄愬埗鏁伴噺
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param capacity 瀹归噺
+ * @param destroy 宸插瓨鍦ㄦ槸鍚﹂攢姣�
+ */
+ public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
+ RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ if (boundedBlockingQueue.isExists() && destroy) {
+ destroyBoundedQueueObject(queueName);
+ }
+ return boundedBlockingQueue.trySetCapacity(capacity);
+ }
+
+ /**
+ * 娣诲姞鏈夌晫闃熷垪鏁版嵁
+ *
+ * @param queueName 闃熷垪鍚�
+ * @param data 鏁版嵁
+ * @return 娣诲姞鎴愬姛 true 宸茶揪鍒扮晫闄� false
+ */
+ public static <T> boolean addBoundedQueueObject(String queueName, T data) {
+ RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ return boundedBlockingQueue.offer(data);
+ }
+
+ /**
+ * 鑾峰彇涓�涓湁鐣岄槦鍒楁暟鎹� 娌℃湁鏁版嵁杩斿洖 null
+ *
+ * @param queueName 闃熷垪鍚�
+ */
+ public static <T> T getBoundedQueueObject(String queueName) {
+ RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ return boundedBlockingQueue.poll();
+ }
+
+ /**
+ * 鍒犻櫎鏈夌晫闃熷垪鏁版嵁
+ */
+ public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
+ RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ return boundedBlockingQueue.remove(data);
+ }
+
+ /**
+ * 閿�姣佹湁鐣岄槦鍒�
+ */
+ public static boolean destroyBoundedQueueObject(String queueName) {
+ RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
+ return boundedBlockingQueue.delete();
+ }
+
+ /**
+ * 璁㈤槄闃诲闃熷垪(鍙闃呮墍鏈夊疄鐜扮被 渚嬪: 寤惰繜 浼樺厛 鏈夌晫 绛�)
+ */
+ public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
+ RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
+ queue.subscribeOnElements(consumer);
+ }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
new file mode 100644
index 0000000..9be47a1
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
@@ -0,0 +1,83 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 鏈夌晫闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓暟鎹彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 涓ょ鍒嗗埆璋冪敤鑾峰彇鎺ュ彛 涓�娆¤幏鍙栦竴鏉�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "鏈夌晫闃熷垪 婕旂ず妗堜緥", tags = {"鏈夌晫闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/bounded")
+public class BoundedQueueController {
+
+
+ @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+ @GetMapping("/add")
+ public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName,
+ @ApiParam("瀹归噺") int capacity) {
+ // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+ boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
+ log.info("閫氶亾: {} , 鍒犻櫎: {}", queueName, b);
+ // 鍒濆鍖栬缃竴娆″嵆鍙�
+ if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
+ log.info("閫氶亾: {} , 璁剧疆瀹归噺: {}", queueName, capacity);
+ } else {
+ log.info("閫氶亾: {} , 璁剧疆瀹归噺澶辫触", queueName);
+ return AjaxResult.error("鎿嶄綔澶辫触");
+ }
+ for (int i = 0; i < 11; i++) {
+ String data = "data-" + i;
+ boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
+ if (flag == false) {
+ log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {} 澶辫触, 閫氶亾宸叉弧", queueName, data);
+ } else {
+ log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, data);
+ }
+ }
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+ @GetMapping("/remove")
+ public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName) {
+ String data = "data-" + 5;
+ if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
+ log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, data);
+ } else {
+ return AjaxResult.error("鎿嶄綔澶辫触");
+ }
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("鑾峰彇闃熷垪鏁版嵁")
+ @GetMapping("/get")
+ public AjaxResult<Void> get(@ApiParam("闃熷垪鍚�") String queueName) {
+ String data;
+ do {
+ data = QueueUtils.getBoundedQueueObject(queueName);
+ log.info("閫氶亾: {} , 鑾峰彇鏁版嵁: {}", queueName, data);
+ } while (data != null);
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
new file mode 100644
index 0000000..f81765b
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
@@ -0,0 +1,79 @@
+package com.ruoyi.demo.controller.queue;
+
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 寤惰繜闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * 渚嬪: 鍒涘缓璁㈠崟30鍒嗛挓鍚庤繃鏈熷鐞�
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓暟鎹彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 涓ゅ彴闆嗙兢鍒嗗埆寮�鍚闃� 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 瑙傚療鎺ユ敹娑堟伅鐨勮寰�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "寤惰繜闃熷垪 婕旂ず妗堜緥", tags = {"寤惰繜闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/delayed")
+public class DelayedQueueController {
+
+ @ApiOperation("璁㈤槄闃熷垪")
+ @GetMapping("/subscribe")
+ public AjaxResult<Void> subscribe(@ApiParam("闃熷垪鍚�") String queueName) {
+ log.info("閫氶亾: {} 鐩戝惉涓�......", queueName);
+ // 椤圭洰鍒濆鍖栬缃竴娆″嵆鍙�
+ QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
+ // 瑙傚療鎺ユ敹鏃堕棿
+ log.info("閫氶亾: {}, 鏀跺埌鏁版嵁: {}", queueName, orderNum);
+ });
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+ @GetMapping("/add")
+ public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName,
+ @ApiParam("璁㈠崟鍙�") String orderNum,
+ @ApiParam("寤惰繜鏃堕棿(绉�)") Long time) {
+ QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
+ // 瑙傚療鍙戦�佹椂闂�
+ log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, orderNum);
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+ @GetMapping("/remove")
+ public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName,
+ @ApiParam("璁㈠崟鍙�") String orderNum) {
+ if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
+ log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, orderNum);
+ } else {
+ return AjaxResult.error("鎿嶄綔澶辫触");
+ }
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("閿�姣侀槦鍒�")
+ @GetMapping("/destroy")
+ public AjaxResult<Void> destroy(@ApiParam("闃熷垪鍚�") String queueName) {
+ // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+ QueueUtils.destroyDelayedQueue(queueName);
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
new file mode 100644
index 0000000..0857690
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
@@ -0,0 +1,19 @@
+package com.ruoyi.demo.controller.queue;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
+
+/**
+ * 瀹炰綋绫� 娉ㄦ剰涓嶅厑璁镐娇鐢ㄥ唴閮ㄧ被 鍚﹀垯浼氭壘涓嶅埌绫�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Data
+@Accessors(chain = true)
+@NoArgsConstructor
+public class PriorityDemo {
+ private String name;
+ private Integer orderNum;
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
new file mode 100644
index 0000000..f72e695
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
@@ -0,0 +1,16 @@
+package com.ruoyi.demo.controller.queue;
+
+import java.util.Comparator;
+
+/**
+ * 姣旇緝鍣� 娉ㄦ剰涓嶅厑璁镐娇鐢� 鍐呴儴绫绘垨鍖垮悕绫绘垨lambda琛ㄨ揪寮� 浼氭壘涓嶅埌绫�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+public class PriorityDemoComparator implements Comparator<PriorityDemo> {
+ @Override
+ public int compare(PriorityDemo pd1, PriorityDemo pd2) {
+ return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum());
+ }
+}
diff --git a/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
new file mode 100644
index 0000000..130c1f0
--- /dev/null
+++ b/ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
@@ -0,0 +1,85 @@
+package com.ruoyi.demo.controller.queue;
+
+import cn.hutool.core.util.RandomUtil;
+import com.ruoyi.common.core.domain.AjaxResult;
+import com.ruoyi.common.utils.redis.QueueUtils;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * 浼樺厛闃熷垪 婕旂ず妗堜緥
+ * <p>
+ * 杞婚噺绾ч槦鍒� 閲嶉噺绾ф暟鎹噺 璇蜂娇鐢� MQ
+ * <p>
+ * 闆嗙兢娴嬭瘯閫氳繃 鍚屼竴涓秷鎭彧浼氳娑堣垂涓�娆� 鍋氬ソ浜嬪姟琛ュ伩
+ * 闆嗙兢娴嬭瘯娴佺▼ 鍦ㄥ叾涓竴鍙板彂閫佹暟鎹� 涓ょ鍒嗗埆璋冪敤鑾峰彇鎺ュ彛 涓�娆¤幏鍙栦竴鏉�
+ *
+ * @author Lion Li
+ * @version 3.6.0
+ */
+@Slf4j
+@Api(value = "浼樺厛闃熷垪 婕旂ず妗堜緥", tags = {"浼樺厛闃熷垪"})
+@RequiredArgsConstructor(onConstructor_ = @Autowired)
+@RestController
+@RequestMapping("/demo/queue/priority")
+public class PriorityQueueController {
+
+ @ApiOperation("娣诲姞闃熷垪鏁版嵁")
+ @GetMapping("/add")
+ public AjaxResult<Void> add(@ApiParam("闃熷垪鍚�") String queueName) {
+ // 鐢ㄥ畬浜嗕竴瀹氳閿�姣� 鍚﹀垯浼氫竴鐩村瓨鍦�
+ boolean b = QueueUtils.destroyPriorityQueueObject(queueName);
+ log.info("閫氶亾: {} , 鍒犻櫎: {}", queueName, b);
+ // 鍒濆鍖栬缃竴娆″嵆鍙� 姝ゅ娉ㄦ剰 涓嶅厑璁哥敤鍐呴儴绫绘垨鍖垮悕绫�
+ boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator());
+ if (flag) {
+ log.info("閫氶亾: {} , 璁剧疆姣旇緝鍣ㄦ垚鍔�", queueName);
+ } else {
+ log.info("閫氶亾: {} , 璁剧疆姣旇緝鍣ㄥけ璐�", queueName);
+ return AjaxResult.error("鎿嶄綔澶辫触");
+ }
+ for (int i = 0; i < 10; i++) {
+ int randomNum = RandomUtil.randomInt(10);
+ PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum);
+ if (QueueUtils.addPriorityQueueObject(queueName, data)) {
+ log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}", queueName, data);
+ } else {
+ log.info("閫氶亾: {} , 鍙戦�佹暟鎹�: {}, 鍙戦�佸け璐�", queueName, data);
+ }
+ }
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("鍒犻櫎闃熷垪鏁版嵁")
+ @GetMapping("/remove")
+ public AjaxResult<Void> remove(@ApiParam("闃熷垪鍚�") String queueName,
+ @ApiParam("瀵硅薄鍚�") String name,
+ @ApiParam("鎺掑簭鍙�") Integer orderNum) {
+ PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum);
+ if (QueueUtils.removePriorityQueueObject(queueName, data)) {
+ log.info("閫氶亾: {} , 鍒犻櫎鏁版嵁: {}", queueName, data);
+ } else {
+ return AjaxResult.error("鎿嶄綔澶辫触");
+ }
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+ @ApiOperation("鑾峰彇闃熷垪鏁版嵁")
+ @GetMapping("/get")
+ public AjaxResult<Void> get(@ApiParam("闃熷垪鍚�") String queueName) {
+ PriorityDemo data;
+ do {
+ data = QueueUtils.getPriorityQueueObject(queueName);
+ log.info("閫氶亾: {} , 鑾峰彇鏁版嵁: {}", queueName, data);
+ } while (data != null);
+ return AjaxResult.success("鎿嶄綔鎴愬姛");
+ }
+
+}
--
Gitblit v1.9.3