README.md | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
README.md
@@ -33,6 +33,7 @@ | åºååæ¡æ¶ | Jackson | [Jacksonå®ç½](https://github.com/FasterXML/jackson) | ç»ä¸ä½¿ç¨ jackson 髿å¯é | | Redis客æ·ç«¯ | Redisson | [Redissonææ¡£](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | æ¯æåæºãé群é ç½® | | åå¸å¼éæµ | Redisson | [Redissonææ¡£](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | å ¨å±ã请æ±IPãé群ID å¤ç§éæµ | | åå¸å¼éå | Redisson | [Redissonææ¡£](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | æ®ééåãå»¶è¿éåãä¼å éå ç | | åå¸å¼é | Lock4j | [Lock4jå®ç½](https://gitee.com/baomidou/lock4j) | 注解éãå·¥å ·é å¤ç§å¤æ · | | åå¸å¼å¹ç | Redisson | [Lock4jææ¡£](https://gitee.com/baomidou/lock4j) | æ¦æªéå¤æäº¤ | | åå¸å¼æ¥å¿ | TLog | [TLogææ¡£](https://yomahub.com/tlog/docs) | æ¯æè·è¸ªé¾è·¯æ¥å¿è®°å½ãæ§è½åæãé¾è·¯ææ¥ | ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,215 @@ package com.ruoyi.common.utils.redis; import com.ruoyi.common.utils.spring.SpringUtils; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.redisson.api.*; import java.util.Comparator; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** * åå¸å¼éåå·¥å · * è½»é级éå ééçº§æ°æ®é è¯·ä½¿ç¨ MQ * è¦æ± redis 5.X ä»¥ä¸ * * @author Lion Li * @version 3.6.0 æ°å¢ */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class QueueUtils { private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class); /** * è·å客æ·ç«¯å®ä¾ */ public static RedissonClient getClient() { return CLIENT; } /** * æ·»å å»¶è¿éåæ°æ® é»è®¤æ¯«ç§ * * @param queueName éåå * @param data æ°æ® * @param time å»¶è¿æ¶é´ */ public static <T> void addDelayedQueueObject(String queueName, T data, long time) { addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS); } /** * æ·»å å»¶è¿éåæ°æ® * * @param queueName éåå * @param data æ°æ® * @param time å»¶è¿æ¶é´ * @param timeUnit åä½ */ public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) { RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); // å·²åå¨åæ è§ if (delayedQueue.contains(data)) { return; } delayedQueue.offer(data, time, timeUnit); } /** * å é¤å»¶è¿éåæ°æ® */ public static <T> boolean removeDelayedQueueObject(String queueName, T data) { RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); return delayedQueue.remove(data); } /** * 鿝延è¿éå ææé»å¡çå¬ æ¥é */ public static <T> void destroyDelayedQueue(String queueName) { RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue); delayedQueue.destroy(); } /** * å°è¯è®¾ç½® ä¼å é忝è¾å¨ ç¨äºæåºä¼å 级 * * @param queueName éåå * @param comparator æ¯è¾å¨ */ public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) { RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); return priorityBlockingQueue.trySetComparator(comparator); } /** * å°è¯è®¾ç½® ä¼å é忝è¾å¨ ç¨äºæåºä¼å 级 * * @param queueName éåå * @param comparator æ¯è¾å¨ * @param destroy å·²å卿¯å¦éæ¯ */ public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) { RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); if (priorityBlockingQueue.isExists() && destroy) { destroyPriorityQueueObject(queueName); } return priorityBlockingQueue.trySetComparator(comparator); } /** * æ·»å ä¼å éåæ°æ® * * @param queueName éåå * @param data æ°æ® */ public static <T> boolean addPriorityQueueObject(String queueName, T data) { RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); return priorityBlockingQueue.offer(data); } /** * è·åä¸ä¸ªä¼å éåæ°æ® æ²¡ææ°æ®è¿å null * * @param queueName éåå */ public static <T> T getPriorityQueueObject(String queueName) { RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); return priorityBlockingQueue.poll(); } /** * å é¤ä¼å éåæ°æ® */ public static <T> boolean removePriorityQueueObject(String queueName, T data) { RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); return priorityBlockingQueue.remove(data); } /** * 鿝ä¼å éå */ public static boolean destroyPriorityQueueObject(String queueName) { RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName); return priorityBlockingQueue.delete(); } /** * å°è¯è®¾ç½® æçéå 容é ç¨äºéå¶æ°é * * @param queueName éåå * @param capacity 容é */ public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) { RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); return boundedBlockingQueue.trySetCapacity(capacity); } /** * å°è¯è®¾ç½® æçéå 容é ç¨äºéå¶æ°é * * @param queueName éåå * @param capacity 容é * @param destroy å·²å卿¯å¦éæ¯ */ public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) { RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); if (boundedBlockingQueue.isExists() && destroy) { destroyBoundedQueueObject(queueName); } return boundedBlockingQueue.trySetCapacity(capacity); } /** * æ·»å æçéåæ°æ® * * @param queueName éåå * @param data æ°æ® * @return æ·»å æå true 已达å°çé false */ public static <T> boolean addBoundedQueueObject(String queueName, T data) { RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); return boundedBlockingQueue.offer(data); } /** * è·åä¸ä¸ªæçéåæ°æ® æ²¡ææ°æ®è¿å null * * @param queueName éåå */ public static <T> T getBoundedQueueObject(String queueName) { RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); return boundedBlockingQueue.poll(); } /** * å 餿çéåæ°æ® */ public static <T> boolean removeBoundedQueueObject(String queueName, T data) { RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); return boundedBlockingQueue.remove(data); } /** * 鿝æçéå */ public static boolean destroyBoundedQueueObject(String queueName) { RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName); return boundedBlockingQueue.delete(); } /** * 订é é»å¡éå(å¯è®¢é ææå®ç°ç±» ä¾å¦: å»¶è¿ ä¼å æç ç) */ public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) { RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName); queue.subscribeOnElements(consumer); } } ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,83 @@ package com.ruoyi.demo.controller.queue; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.redis.QueueUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * æçéå æ¼ç¤ºæ¡ä¾ * <p> * è½»é级éå ééçº§æ°æ®é è¯·ä½¿ç¨ MQ * <p> * é群æµè¯éè¿ åä¸ä¸ªæ°æ®åªä¼è¢«æ¶è´¹ä¸æ¬¡ å好äºå¡è¡¥å¿ * é群æµè¯æµç¨ å¨å ¶ä¸ä¸å°åéæ°æ® ä¸¤ç«¯åå«è°ç¨è·åæ¥å£ 䏿¬¡è·å䏿¡ * * @author Lion Li * @version 3.6.0 */ @Slf4j @Api(value = "æçéå æ¼ç¤ºæ¡ä¾", tags = {"æçéå"}) @RequiredArgsConstructor(onConstructor_ = @Autowired) @RestController @RequestMapping("/demo/queue/bounded") public class BoundedQueueController { @ApiOperation("æ·»å éåæ°æ®") @GetMapping("/add") public AjaxResult<Void> add(@ApiParam("éåå") String queueName, @ApiParam("容é") int capacity) { // ç¨å®äºä¸å®è¦éæ¯ å¦åä¼ä¸ç´åå¨ boolean b = QueueUtils.destroyBoundedQueueObject(queueName); log.info("éé: {} , å é¤: {}", queueName, b); // åå§åè®¾ç½®ä¸æ¬¡å³å¯ if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) { log.info("éé: {} , 设置容é: {}", queueName, capacity); } else { log.info("éé: {} , 设置容é失败", queueName); return AjaxResult.error("æä½å¤±è´¥"); } for (int i = 0; i < 11; i++) { String data = "data-" + i; boolean flag = QueueUtils.addBoundedQueueObject(queueName, data); if (flag == false) { log.info("éé: {} , åéæ°æ®: {} 失败, éé已满", queueName, data); } else { log.info("éé: {} , åéæ°æ®: {}", queueName, data); } } return AjaxResult.success("æä½æå"); } @ApiOperation("å é¤éåæ°æ®") @GetMapping("/remove") public AjaxResult<Void> remove(@ApiParam("éåå") String queueName) { String data = "data-" + 5; if (QueueUtils.removeBoundedQueueObject(queueName, data)) { log.info("éé: {} , å 餿°æ®: {}", queueName, data); } else { return AjaxResult.error("æä½å¤±è´¥"); } return AjaxResult.success("æä½æå"); } @ApiOperation("è·åéåæ°æ®") @GetMapping("/get") public AjaxResult<Void> get(@ApiParam("éåå") String queueName) { String data; do { data = QueueUtils.getBoundedQueueObject(queueName); log.info("éé: {} , è·åæ°æ®: {}", queueName, data); } while (data != null); return AjaxResult.success("æä½æå"); } } ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,79 @@ package com.ruoyi.demo.controller.queue; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.redis.QueueUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; /** * å»¶è¿éå æ¼ç¤ºæ¡ä¾ * <p> * è½»é级éå ééçº§æ°æ®é è¯·ä½¿ç¨ MQ * ä¾å¦: å建订å30åéåè¿æå¤ç * <p> * é群æµè¯éè¿ åä¸ä¸ªæ°æ®åªä¼è¢«æ¶è´¹ä¸æ¬¡ å好äºå¡è¡¥å¿ * é群æµè¯æµç¨ 两å°é群åå«å¼å¯è®¢é å¨å ¶ä¸ä¸å°åéæ°æ® è§å¯æ¥æ¶æ¶æ¯çè§å¾ * * @author Lion Li * @version 3.6.0 */ @Slf4j @Api(value = "å»¶è¿éå æ¼ç¤ºæ¡ä¾", tags = {"å»¶è¿éå"}) @RequiredArgsConstructor(onConstructor_ = @Autowired) @RestController @RequestMapping("/demo/queue/delayed") public class DelayedQueueController { @ApiOperation("订é éå") @GetMapping("/subscribe") public AjaxResult<Void> subscribe(@ApiParam("éåå") String queueName) { log.info("éé: {} çå¬ä¸......", queueName); // 项ç®åå§åè®¾ç½®ä¸æ¬¡å³å¯ QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> { // è§å¯æ¥æ¶æ¶é´ log.info("éé: {}, æ¶å°æ°æ®: {}", queueName, orderNum); }); return AjaxResult.success("æä½æå"); } @ApiOperation("æ·»å éåæ°æ®") @GetMapping("/add") public AjaxResult<Void> add(@ApiParam("éåå") String queueName, @ApiParam("订åå·") String orderNum, @ApiParam("å»¶è¿æ¶é´(ç§)") Long time) { QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS); // è§å¯åéæ¶é´ log.info("éé: {} , åéæ°æ®: {}", queueName, orderNum); return AjaxResult.success("æä½æå"); } @ApiOperation("å é¤éåæ°æ®") @GetMapping("/remove") public AjaxResult<Void> remove(@ApiParam("éåå") String queueName, @ApiParam("订åå·") String orderNum) { if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) { log.info("éé: {} , å 餿°æ®: {}", queueName, orderNum); } else { return AjaxResult.error("æä½å¤±è´¥"); } return AjaxResult.success("æä½æå"); } @ApiOperation("鿝éå") @GetMapping("/destroy") public AjaxResult<Void> destroy(@ApiParam("éåå") String queueName) { // ç¨å®äºä¸å®è¦éæ¯ å¦åä¼ä¸ç´åå¨ QueueUtils.destroyDelayedQueue(queueName); return AjaxResult.success("æä½æå"); } } ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,19 @@ package com.ruoyi.demo.controller.queue; import lombok.Data; import lombok.NoArgsConstructor; import lombok.experimental.Accessors; /** * å®ä½ç±» 注æä¸å 许使ç¨å é¨ç±» å¦å伿¾ä¸å°ç±» * * @author Lion Li * @version 3.6.0 */ @Data @Accessors(chain = true) @NoArgsConstructor public class PriorityDemo { private String name; private Integer orderNum; } ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,16 @@ package com.ruoyi.demo.controller.queue; import java.util.Comparator; /** * æ¯è¾å¨ 注æä¸å è®¸ä½¿ç¨ å é¨ç±»æå¿åç±»ælambdaè¡¨è¾¾å¼ ä¼æ¾ä¸å°ç±» * * @author Lion Li * @version 3.6.0 */ public class PriorityDemoComparator implements Comparator<PriorityDemo> { @Override public int compare(PriorityDemo pd1, PriorityDemo pd2) { return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum()); } } ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,85 @@ package com.ruoyi.demo.controller.queue; import cn.hutool.core.util.RandomUtil; import com.ruoyi.common.core.domain.AjaxResult; import com.ruoyi.common.utils.redis.QueueUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * ä¼å éå æ¼ç¤ºæ¡ä¾ * <p> * è½»é级éå ééçº§æ°æ®é è¯·ä½¿ç¨ MQ * <p> * é群æµè¯éè¿ åä¸ä¸ªæ¶æ¯åªä¼è¢«æ¶è´¹ä¸æ¬¡ å好äºå¡è¡¥å¿ * é群æµè¯æµç¨ å¨å ¶ä¸ä¸å°åéæ°æ® ä¸¤ç«¯åå«è°ç¨è·åæ¥å£ 䏿¬¡è·å䏿¡ * * @author Lion Li * @version 3.6.0 */ @Slf4j @Api(value = "ä¼å éå æ¼ç¤ºæ¡ä¾", tags = {"ä¼å éå"}) @RequiredArgsConstructor(onConstructor_ = @Autowired) @RestController @RequestMapping("/demo/queue/priority") public class PriorityQueueController { @ApiOperation("æ·»å éåæ°æ®") @GetMapping("/add") public AjaxResult<Void> add(@ApiParam("éåå") String queueName) { // ç¨å®äºä¸å®è¦éæ¯ å¦åä¼ä¸ç´åå¨ boolean b = QueueUtils.destroyPriorityQueueObject(queueName); log.info("éé: {} , å é¤: {}", queueName, b); // åå§åè®¾ç½®ä¸æ¬¡å³å¯ æ¤å¤æ³¨æ ä¸å 许ç¨å é¨ç±»æå¿åç±» boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator()); if (flag) { log.info("éé: {} , 设置æ¯è¾å¨æå", queueName); } else { log.info("éé: {} , 设置æ¯è¾å¨å¤±è´¥", queueName); return AjaxResult.error("æä½å¤±è´¥"); } for (int i = 0; i < 10; i++) { int randomNum = RandomUtil.randomInt(10); PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum); if (QueueUtils.addPriorityQueueObject(queueName, data)) { log.info("éé: {} , åéæ°æ®: {}", queueName, data); } else { log.info("éé: {} , åéæ°æ®: {}, åé失败", queueName, data); } } return AjaxResult.success("æä½æå"); } @ApiOperation("å é¤éåæ°æ®") @GetMapping("/remove") public AjaxResult<Void> remove(@ApiParam("éåå") String queueName, @ApiParam("对象å") String name, @ApiParam("æåºå·") Integer orderNum) { PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum); if (QueueUtils.removePriorityQueueObject(queueName, data)) { log.info("éé: {} , å 餿°æ®: {}", queueName, data); } else { return AjaxResult.error("æä½å¤±è´¥"); } return AjaxResult.success("æä½æå"); } @ApiOperation("è·åéåæ°æ®") @GetMapping("/get") public AjaxResult<Void> get(@ApiParam("éåå") String queueName) { PriorityDemo data; do { data = QueueUtils.getPriorityQueueObject(queueName); log.info("éé: {} , è·åæ°æ®: {}", queueName, data); } while (data != null); return AjaxResult.success("æä½æå"); } }