疯狂的狮子li
2021-12-23 08e0ed4fc6b53d80feebc5add86d1fd70e75b952
[重磅更新] 增加 轻量级 分布式队列 支持
已修改1个文件
已添加6个文件
498 ■■■■■ 文件已修改
README.md 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java 215 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -33,6 +33,7 @@
| åºåˆ—化框架 | Jackson | [Jackson官网](https://github.com/FasterXML/jackson) | ç»Ÿä¸€ä½¿ç”¨ jackson é«˜æ•ˆå¯é  |
| Redis客户端 | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | æ”¯æŒå•机、集群配置 |
| åˆ†å¸ƒå¼é™æµ | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | å…¨å±€ã€è¯·æ±‚IP、集群ID å¤šç§é™æµ |
| åˆ†å¸ƒå¼é˜Ÿåˆ— | Redisson | [Redisson文档](https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95) | æ™®é€šé˜Ÿåˆ—、延迟队列、优先队列 ç­‰ |
| åˆ†å¸ƒå¼é” | Lock4j | [Lock4j官网](https://gitee.com/baomidou/lock4j) | æ³¨è§£é”ã€å·¥å…·é” å¤šç§å¤šæ · |
| åˆ†å¸ƒå¼å¹‚ç­‰ | Redisson | [Lock4j文档](https://gitee.com/baomidou/lock4j) | æ‹¦æˆªé‡å¤æäº¤ |
| åˆ†å¸ƒå¼æ—¥å¿— | TLog | [TLog文档](https://yomahub.com/tlog/docs) | æ”¯æŒè·Ÿè¸ªé“¾è·¯æ—¥å¿—记录、性能分析、链路排查 |
ruoyi-common/src/main/java/com/ruoyi/common/utils/redis/QueueUtils.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,215 @@
package com.ruoyi.common.utils.redis;
import com.ruoyi.common.utils.spring.SpringUtils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.redisson.api.*;
import java.util.Comparator;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
 * åˆ†å¸ƒå¼é˜Ÿåˆ—工具
 * è½»é‡çº§é˜Ÿåˆ— é‡é‡çº§æ•°æ®é‡ è¯·ä½¿ç”¨ MQ
 * è¦æ±‚ redis 5.X ä»¥ä¸Š
 *
 * @author Lion Li
 * @version 3.6.0 æ–°å¢ž
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class QueueUtils {
    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
    /**
     * èŽ·å–å®¢æˆ·ç«¯å®žä¾‹
     */
    public static RedissonClient getClient() {
        return CLIENT;
    }
    /**
     * æ·»åŠ å»¶è¿Ÿé˜Ÿåˆ—æ•°æ® é»˜è®¤æ¯«ç§’
     *
     * @param queueName é˜Ÿåˆ—名
     * @param data      æ•°æ®
     * @param time      å»¶è¿Ÿæ—¶é—´
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time) {
        addDelayedQueueObject(queueName, data, time, TimeUnit.MILLISECONDS);
    }
    /**
     * æ·»åŠ å»¶è¿Ÿé˜Ÿåˆ—æ•°æ®
     *
     * @param queueName é˜Ÿåˆ—名
     * @param data      æ•°æ®
     * @param time      å»¶è¿Ÿæ—¶é—´
     * @param timeUnit  å•位
     */
    public static <T> void addDelayedQueueObject(String queueName, T data, long time, TimeUnit timeUnit) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        // å·²å­˜åœ¨åˆ™æ— è§†
        if (delayedQueue.contains(data)) {
            return;
        }
        delayedQueue.offer(data, time, timeUnit);
    }
    /**
     * åˆ é™¤å»¶è¿Ÿé˜Ÿåˆ—数据
     */
    public static <T> boolean removeDelayedQueueObject(String queueName, T data) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        return delayedQueue.remove(data);
    }
    /**
     * é”€æ¯å»¶è¿Ÿé˜Ÿåˆ— æ‰€æœ‰é˜»å¡žç›‘听 æŠ¥é”™
     */
    public static <T> void destroyDelayedQueue(String queueName) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = CLIENT.getDelayedQueue(queue);
        delayedQueue.destroy();
    }
    /**
     * å°è¯•设置 ä¼˜å…ˆé˜Ÿåˆ—比较器 ç”¨äºŽæŽ’序优先级
     *
     * @param queueName  é˜Ÿåˆ—名
     * @param comparator æ¯”较器
     */
    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.trySetComparator(comparator);
    }
    /**
     * å°è¯•设置 ä¼˜å…ˆé˜Ÿåˆ—比较器 ç”¨äºŽæŽ’序优先级
     *
     * @param queueName  é˜Ÿåˆ—名
     * @param comparator æ¯”较器
     * @param destroy    å·²å­˜åœ¨æ˜¯å¦é”€æ¯
     */
    public static <T> boolean trySetPriorityQueueComparator(String queueName, Comparator<T> comparator, boolean destroy) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        if (priorityBlockingQueue.isExists() && destroy) {
            destroyPriorityQueueObject(queueName);
        }
        return priorityBlockingQueue.trySetComparator(comparator);
    }
    /**
     * æ·»åŠ ä¼˜å…ˆé˜Ÿåˆ—æ•°æ®
     *
     * @param queueName é˜Ÿåˆ—名
     * @param data      æ•°æ®
     */
    public static <T> boolean addPriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.offer(data);
    }
    /**
     * èŽ·å–ä¸€ä¸ªä¼˜å…ˆé˜Ÿåˆ—æ•°æ® æ²¡æœ‰æ•°æ®è¿”回 null
     *
     * @param queueName é˜Ÿåˆ—名
     */
    public static <T> T getPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.poll();
    }
    /**
     * åˆ é™¤ä¼˜å…ˆé˜Ÿåˆ—数据
     */
    public static <T> boolean removePriorityQueueObject(String queueName, T data) {
        RPriorityBlockingQueue<T> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.remove(data);
    }
    /**
     * é”€æ¯ä¼˜å…ˆé˜Ÿåˆ—
     */
    public static boolean destroyPriorityQueueObject(String queueName) {
        RPriorityBlockingQueue<?> priorityBlockingQueue = CLIENT.getPriorityBlockingQueue(queueName);
        return priorityBlockingQueue.delete();
    }
    /**
     * å°è¯•设置 æœ‰ç•Œé˜Ÿåˆ— å®¹é‡ ç”¨äºŽé™åˆ¶æ•°é‡
     *
     * @param queueName é˜Ÿåˆ—名
     * @param capacity  å®¹é‡
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.trySetCapacity(capacity);
    }
    /**
     * å°è¯•设置 æœ‰ç•Œé˜Ÿåˆ— å®¹é‡ ç”¨äºŽé™åˆ¶æ•°é‡
     *
     * @param queueName é˜Ÿåˆ—名
     * @param capacity  å®¹é‡
     * @param destroy   å·²å­˜åœ¨æ˜¯å¦é”€æ¯
     */
    public static <T> boolean trySetBoundedQueueCapacity(String queueName, int capacity, boolean destroy) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        if (boundedBlockingQueue.isExists() && destroy) {
            destroyBoundedQueueObject(queueName);
        }
        return boundedBlockingQueue.trySetCapacity(capacity);
    }
    /**
     * æ·»åŠ æœ‰ç•Œé˜Ÿåˆ—æ•°æ®
     *
     * @param queueName é˜Ÿåˆ—名
     * @param data      æ•°æ®
     * @return æ·»åŠ æˆåŠŸ true å·²è¾¾åˆ°ç•Œé™ false
     */
    public static <T> boolean addBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.offer(data);
    }
    /**
     * èŽ·å–ä¸€ä¸ªæœ‰ç•Œé˜Ÿåˆ—æ•°æ® æ²¡æœ‰æ•°æ®è¿”回 null
     *
     * @param queueName é˜Ÿåˆ—名
     */
    public static <T> T getBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.poll();
    }
    /**
     * åˆ é™¤æœ‰ç•Œé˜Ÿåˆ—数据
     */
    public static <T> boolean removeBoundedQueueObject(String queueName, T data) {
        RBoundedBlockingQueue<T> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.remove(data);
    }
    /**
     * é”€æ¯æœ‰ç•Œé˜Ÿåˆ—
     */
    public static boolean destroyBoundedQueueObject(String queueName) {
        RBoundedBlockingQueue<?> boundedBlockingQueue = CLIENT.getBoundedBlockingQueue(queueName);
        return boundedBlockingQueue.delete();
    }
    /**
     * è®¢é˜…阻塞队列(可订阅所有实现类 ä¾‹å¦‚: å»¶è¿Ÿ ä¼˜å…ˆ æœ‰ç•Œ ç­‰)
     */
    public static <T> void subscribeBlockingQueue(String queueName, Consumer<T> consumer) {
        RBlockingQueue<T> queue = CLIENT.getBlockingQueue(queueName);
        queue.subscribeOnElements(consumer);
    }
}
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/BoundedQueueController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,83 @@
package com.ruoyi.demo.controller.queue;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.redis.QueueUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * æœ‰ç•Œé˜Ÿåˆ— æ¼”示案例
 * <p>
 * è½»é‡çº§é˜Ÿåˆ— é‡é‡çº§æ•°æ®é‡ è¯·ä½¿ç”¨ MQ
 * <p>
 * é›†ç¾¤æµ‹è¯•通过 åŒä¸€ä¸ªæ•°æ®åªä¼šè¢«æ¶ˆè´¹ä¸€æ¬¡ åšå¥½äº‹åŠ¡è¡¥å¿
 * é›†ç¾¤æµ‹è¯•流程 åœ¨å…¶ä¸­ä¸€å°å‘送数据 ä¸¤ç«¯åˆ†åˆ«è°ƒç”¨èŽ·å–æŽ¥å£ ä¸€æ¬¡èŽ·å–ä¸€æ¡
 *
 * @author Lion Li
 * @version 3.6.0
 */
@Slf4j
@Api(value = "有界队列 æ¼”示案例", tags = {"有界队列"})
@RequiredArgsConstructor(onConstructor_ = @Autowired)
@RestController
@RequestMapping("/demo/queue/bounded")
public class BoundedQueueController {
    @ApiOperation("添加队列数据")
    @GetMapping("/add")
    public AjaxResult<Void> add(@ApiParam("队列名") String queueName,
                                @ApiParam("容量") int capacity) {
        // ç”¨å®Œäº†ä¸€å®šè¦é”€æ¯ å¦åˆ™ä¼šä¸€ç›´å­˜åœ¨
        boolean b = QueueUtils.destroyBoundedQueueObject(queueName);
        log.info("通道: {} , åˆ é™¤: {}", queueName, b);
        // åˆå§‹åŒ–设置一次即可
        if (QueueUtils.trySetBoundedQueueCapacity(queueName, capacity)) {
            log.info("通道: {} , è®¾ç½®å®¹é‡: {}", queueName, capacity);
        } else {
            log.info("通道: {} , è®¾ç½®å®¹é‡å¤±è´¥", queueName);
            return AjaxResult.error("操作失败");
        }
        for (int i = 0; i < 11; i++) {
            String data = "data-" + i;
            boolean flag = QueueUtils.addBoundedQueueObject(queueName, data);
            if (flag == false) {
                log.info("通道: {} , å‘送数据: {} å¤±è´¥, é€šé“已满", queueName, data);
            } else {
                log.info("通道: {} , å‘送数据: {}", queueName, data);
            }
        }
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("删除队列数据")
    @GetMapping("/remove")
    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName) {
        String data = "data-" + 5;
        if (QueueUtils.removeBoundedQueueObject(queueName, data)) {
            log.info("通道: {} , åˆ é™¤æ•°æ®: {}", queueName, data);
        } else {
            return AjaxResult.error("操作失败");
        }
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("获取队列数据")
    @GetMapping("/get")
    public AjaxResult<Void> get(@ApiParam("队列名") String queueName) {
        String data;
        do {
            data = QueueUtils.getBoundedQueueObject(queueName);
            log.info("通道: {} , èŽ·å–æ•°æ®: {}", queueName, data);
        } while (data != null);
        return AjaxResult.success("操作成功");
    }
}
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/DelayedQueueController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,79 @@
package com.ruoyi.demo.controller.queue;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.redis.QueueUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
 * å»¶è¿Ÿé˜Ÿåˆ— æ¼”示案例
 * <p>
 * è½»é‡çº§é˜Ÿåˆ— é‡é‡çº§æ•°æ®é‡ è¯·ä½¿ç”¨ MQ
 * ä¾‹å¦‚: åˆ›å»ºè®¢å•30分钟后过期处理
 * <p>
 * é›†ç¾¤æµ‹è¯•通过 åŒä¸€ä¸ªæ•°æ®åªä¼šè¢«æ¶ˆè´¹ä¸€æ¬¡ åšå¥½äº‹åŠ¡è¡¥å¿
 * é›†ç¾¤æµ‹è¯•流程 ä¸¤å°é›†ç¾¤åˆ†åˆ«å¼€å¯è®¢é˜… åœ¨å…¶ä¸­ä¸€å°å‘送数据 è§‚察接收消息的规律
 *
 * @author Lion Li
 * @version 3.6.0
 */
@Slf4j
@Api(value = "延迟队列 æ¼”示案例", tags = {"延迟队列"})
@RequiredArgsConstructor(onConstructor_ = @Autowired)
@RestController
@RequestMapping("/demo/queue/delayed")
public class DelayedQueueController {
    @ApiOperation("订阅队列")
    @GetMapping("/subscribe")
    public AjaxResult<Void> subscribe(@ApiParam("队列名") String queueName) {
        log.info("通道: {} ç›‘听中......", queueName);
        // é¡¹ç›®åˆå§‹åŒ–设置一次即可
        QueueUtils.subscribeBlockingQueue(queueName, (String orderNum) -> {
            // è§‚察接收时间
            log.info("通道: {}, æ”¶åˆ°æ•°æ®: {}", queueName, orderNum);
        });
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("添加队列数据")
    @GetMapping("/add")
    public AjaxResult<Void> add(@ApiParam("队列名") String queueName,
                                @ApiParam("订单号") String orderNum,
                                @ApiParam("延迟时间(秒)") Long time) {
        QueueUtils.addDelayedQueueObject(queueName, orderNum, time, TimeUnit.SECONDS);
        // è§‚察发送时间
        log.info("通道: {} , å‘送数据: {}", queueName, orderNum);
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("删除队列数据")
    @GetMapping("/remove")
    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName,
                                   @ApiParam("订单号") String orderNum) {
        if (QueueUtils.removeDelayedQueueObject(queueName, orderNum)) {
            log.info("通道: {} , åˆ é™¤æ•°æ®: {}", queueName, orderNum);
        } else {
            return AjaxResult.error("操作失败");
        }
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("销毁队列")
    @GetMapping("/destroy")
    public AjaxResult<Void> destroy(@ApiParam("队列名") String queueName) {
        // ç”¨å®Œäº†ä¸€å®šè¦é”€æ¯ å¦åˆ™ä¼šä¸€ç›´å­˜åœ¨
        QueueUtils.destroyDelayedQueue(queueName);
        return AjaxResult.success("操作成功");
    }
}
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemo.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,19 @@
package com.ruoyi.demo.controller.queue;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
/**
 * å®žä½“ç±» æ³¨æ„ä¸å…è®¸ä½¿ç”¨å†…部类 å¦åˆ™ä¼šæ‰¾ä¸åˆ°ç±»
 *
 * @author Lion Li
 * @version 3.6.0
 */
@Data
@Accessors(chain = true)
@NoArgsConstructor
public class PriorityDemo {
    private String name;
    private Integer orderNum;
}
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityDemoComparator.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,16 @@
package com.ruoyi.demo.controller.queue;
import java.util.Comparator;
/**
 * æ¯”较器 æ³¨æ„ä¸å…è®¸ä½¿ç”¨ å†…部类或匿名类或lambda表达式 ä¼šæ‰¾ä¸åˆ°ç±»
 *
 * @author Lion Li
 * @version 3.6.0
 */
public class PriorityDemoComparator implements Comparator<PriorityDemo> {
    @Override
    public int compare(PriorityDemo pd1, PriorityDemo pd2) {
        return Integer.compare(pd1.getOrderNum(), pd2.getOrderNum());
    }
}
ruoyi-demo/src/main/java/com/ruoyi/demo/controller/queue/PriorityQueueController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,85 @@
package com.ruoyi.demo.controller.queue;
import cn.hutool.core.util.RandomUtil;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.redis.QueueUtils;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * ä¼˜å…ˆé˜Ÿåˆ— æ¼”示案例
 * <p>
 * è½»é‡çº§é˜Ÿåˆ— é‡é‡çº§æ•°æ®é‡ è¯·ä½¿ç”¨ MQ
 * <p>
 * é›†ç¾¤æµ‹è¯•通过 åŒä¸€ä¸ªæ¶ˆæ¯åªä¼šè¢«æ¶ˆè´¹ä¸€æ¬¡ åšå¥½äº‹åŠ¡è¡¥å¿
 * é›†ç¾¤æµ‹è¯•流程 åœ¨å…¶ä¸­ä¸€å°å‘送数据 ä¸¤ç«¯åˆ†åˆ«è°ƒç”¨èŽ·å–æŽ¥å£ ä¸€æ¬¡èŽ·å–ä¸€æ¡
 *
 * @author Lion Li
 * @version 3.6.0
 */
@Slf4j
@Api(value = "优先队列 æ¼”示案例", tags = {"优先队列"})
@RequiredArgsConstructor(onConstructor_ = @Autowired)
@RestController
@RequestMapping("/demo/queue/priority")
public class PriorityQueueController {
    @ApiOperation("添加队列数据")
    @GetMapping("/add")
    public AjaxResult<Void> add(@ApiParam("队列名") String queueName) {
        // ç”¨å®Œäº†ä¸€å®šè¦é”€æ¯ å¦åˆ™ä¼šä¸€ç›´å­˜åœ¨
        boolean b = QueueUtils.destroyPriorityQueueObject(queueName);
        log.info("通道: {} , åˆ é™¤: {}", queueName, b);
        // åˆå§‹åŒ–设置一次即可 æ­¤å¤„注意 ä¸å…è®¸ç”¨å†…部类或匿名类
        boolean flag = QueueUtils.trySetPriorityQueueComparator(queueName, new PriorityDemoComparator());
        if (flag) {
            log.info("通道: {} , è®¾ç½®æ¯”较器成功", queueName);
        } else {
            log.info("通道: {} , è®¾ç½®æ¯”较器失败", queueName);
            return AjaxResult.error("操作失败");
        }
        for (int i = 0; i < 10; i++) {
            int randomNum = RandomUtil.randomInt(10);
            PriorityDemo data = new PriorityDemo().setName("data-" + i).setOrderNum(randomNum);
            if (QueueUtils.addPriorityQueueObject(queueName, data)) {
                log.info("通道: {} , å‘送数据: {}", queueName, data);
            } else {
                log.info("通道: {} , å‘送数据: {}, å‘送失败", queueName, data);
            }
        }
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("删除队列数据")
    @GetMapping("/remove")
    public AjaxResult<Void> remove(@ApiParam("队列名") String queueName,
                                   @ApiParam("对象名") String name,
                                   @ApiParam("排序号") Integer orderNum) {
        PriorityDemo data = new PriorityDemo().setName(name).setOrderNum(orderNum);
        if (QueueUtils.removePriorityQueueObject(queueName, data)) {
            log.info("通道: {} , åˆ é™¤æ•°æ®: {}", queueName, data);
        } else {
            return AjaxResult.error("操作失败");
        }
        return AjaxResult.success("操作成功");
    }
    @ApiOperation("获取队列数据")
    @GetMapping("/get")
    public AjaxResult<Void> get(@ApiParam("队列名") String queueName) {
        PriorityDemo data;
        do {
            data = QueueUtils.getPriorityQueueObject(queueName);
            log.info("通道: {} , èŽ·å–æ•°æ®: {}", queueName, data);
        } while (data != null);
        return AjaxResult.success("操作成功");
    }
}