zhuguifei
2025-04-28 442928123f63ee497d766f9a7a14f0a6ee067e25
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
package org.jeecg.boot.starter.rabbitmq.client;
 
 
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.boot.starter.rabbitmq.event.EventObj;
import org.jeecg.boot.starter.rabbitmq.event.JeecgRemoteApplicationEvent;
import org.jeecg.boot.starter.rabbitmq.exchange.DelayExchangeBuilder;
import org.jeecg.common.annotation.RabbitComponent;
import org.jeecg.common.base.BaseMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 
/**
 * 消息队列客户端
 */
@Slf4j
@Configuration
public class RabbitMqClient {
 
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqClient.class);
 
    private final RabbitAdmin rabbitAdmin;
 
    private final RabbitTemplate rabbitTemplate;
 
 
    @Resource
    private SimpleMessageListenerContainer messageListenerContainer;
 
    @Resource
    BusProperties busProperties;
    @Resource
    private ApplicationEventPublisher publisher;
 
 
    @Resource
    private ApplicationContext applicationContext;
 
 
    @Bean
    public void initQueue() {
        Map<String, Object> beansWithRqbbitComponentMap = this.applicationContext.getBeansWithAnnotation(RabbitComponent.class);
        Class<? extends Object> clazz = null;
        for (Map.Entry<String, Object> entry : beansWithRqbbitComponentMap.entrySet()) {
            log.info("初始化队列............");
            //获取到实例对象的class信息
            clazz = entry.getValue().getClass();
            Method[] methods = clazz.getMethods();
            RabbitListener rabbitListener = clazz.getAnnotation(RabbitListener.class);
            if (ObjectUtil.isNotEmpty(rabbitListener)) {
                createQueue(rabbitListener);
            }
            for (Method method : methods) {
                RabbitListener methodRabbitListener = method.getAnnotation(RabbitListener.class);
                if (ObjectUtil.isNotEmpty(methodRabbitListener)) {
                    createQueue(methodRabbitListener);
                }
            }
 
        }
    }
 
    /**
     * 初始化队列
     *
     * @param rabbitListener
     */
    private void createQueue(RabbitListener rabbitListener) {
        String[] queues = rabbitListener.queues();
        DirectExchange directExchange = createExchange(DelayExchangeBuilder.DELAY_EXCHANGE);
        //创建交换机
        rabbitAdmin.declareExchange(directExchange);
        if (ObjectUtil.isNotEmpty(queues)) {
            for (String queueName : queues) {
                Properties result = rabbitAdmin.getQueueProperties(queueName);
                if (ObjectUtil.isEmpty(result)) {
                    Queue queue = new Queue(queueName);
                    addQueue(queue);
                    Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);
                    rabbitAdmin.declareBinding(binding);
                    log.info("创建队列:" + queueName);
                }else{
                    log.info("已有队列:" + queueName);
                }
            }
        }
    }
 
 
    private Map sentObj = new HashMap<>();
 
 
    @Autowired
    public RabbitMqClient(RabbitAdmin rabbitAdmin, RabbitTemplate rabbitTemplate) {
        this.rabbitAdmin = rabbitAdmin;
        this.rabbitTemplate = rabbitTemplate;
    }
 
 
    /**
     * 发送远程事件
     *
     * @param handlerName
     * @param baseMap
     */
    public void publishEvent(String handlerName, BaseMap baseMap) {
        EventObj eventObj = new EventObj();
        eventObj.setHandlerName(handlerName);
        eventObj.setBaseMap(baseMap);
        publisher.publishEvent(new JeecgRemoteApplicationEvent(eventObj, busProperties.getId()));
    }
 
    /**
     * 转换Message对象
     *
     * @param messageType 返回消息类型 MessageProperties类中常量
     * @param msg
     * @return
     */
    public Message getMessage(String messageType, Object msg) {
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(messageType);
        Message message = new Message(msg.toString().getBytes(), messageProperties);
        return message;
    }
 
    /**
     * 有绑定Key的Exchange发送
     *
     * @param routingKey
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, String routingKey, Object msg) {
        Message message = getMessage(MessageProperties.CONTENT_TYPE_JSON, msg);
        rabbitTemplate.send(topicExchange.getName(), routingKey, message);
    }
 
    /**
     * 没有绑定KEY的Exchange发送
     *
     * @param exchange
     * @param msg
     */
    public void sendMessageToExchange(TopicExchange topicExchange, AbstractExchange exchange, String msg) {
        addExchange(exchange);
        logger.info("RabbitMQ send " + exchange.getName() + "->" + msg);
        rabbitTemplate.convertAndSend(topicExchange.getName(), msg);
    }
 
 
    /**
     * 发送消息
     *
     * @param queueName 队列名称
     * @param params    消息内容map
     */
    public void sendMessage(String queueName, Object params) {
        //调用 rabbitMqClient.sendMessage
        log.info("发送消息到mq");
        try {
            rabbitTemplate.convertAndSend(DelayExchangeBuilder.DELAY_EXCHANGE, queueName, params, message -> {
                return message;
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 发送消息
     *
     * @param queueName 队列名称
     */
    public void sendMessage(String queueName) {
        this.send(queueName, this.sentObj, 0);
        this.sentObj.clear();
    }
 
 
    public RabbitMqClient put(String key, Object value) {
        this.sentObj.put(key, value);
        return this;
    }
 
    /**
     * 延迟发送消息
     *
     * @param queueName  队列名称
     * @param params     消息内容params
     * @param expiration 延迟时间 单位毫秒
     */
    public void sendMessage(String queueName, Object params, Integer expiration) {
        this.send(queueName, params, expiration);
    }
 
    private void send(String queueName, Object params, Integer expiration) {
        Queue queue = new Queue(queueName);
        addQueue(queue);
        CustomExchange customExchange = DelayExchangeBuilder.buildExchange();
        rabbitAdmin.declareExchange(customExchange);
        Binding binding = BindingBuilder.bind(queue).to(customExchange).with(queueName).noargs();
        rabbitAdmin.declareBinding(binding);
        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("发送时间:" + sf.format(new Date()));
        messageListenerContainer.setQueueNames(queueName);
/*        messageListenerContainer.setMessageListener(new MqListener<Message>() {
            @Override
            public void onMessage(Message message, Channel channel) {
                MqListener messageListener = SpringContextHolder.getHandler(queueName + "Listener", MqListener.class);
                if (ObjectUtil.isNotEmpty(messageListener)) {
                    messageListener.onMessage(message, channel);
                }
            }
        });*/
        rabbitTemplate.convertAndSend(DelayExchangeBuilder.DEFAULT_DELAY_EXCHANGE, queueName, params, message -> {
            if (expiration != null && expiration > 0) {
                message.getMessageProperties().setHeader("x-delay", expiration);
            }
            return message;
        });
    }
 
 
    /**
     * 给queue发送消息
     *
     * @param queueName
     */
    public String receiveFromQueue(String queueName) {
        return receiveFromQueue(DirectExchange.DEFAULT, queueName);
    }
 
    /**
     * 给direct交换机指定queue发送消息
     *
     * @param directExchange
     * @param queueName
     */
    public String receiveFromQueue(DirectExchange directExchange, String queueName) {
        Queue queue = new Queue(queueName);
        addQueue(queue);
        Binding binding = BindingBuilder.bind(queue).to(directExchange).withQueueName();
        rabbitAdmin.declareBinding(binding);
        String messages = (String) rabbitTemplate.receiveAndConvert(queueName);
        System.out.println("Receive:" + messages);
        return messages;
    }
 
    /**
     * 创建Exchange
     *
     * @param exchange
     */
    public void addExchange(AbstractExchange exchange) {
        rabbitAdmin.declareExchange(exchange);
    }
 
    /**
     * 删除一个Exchange
     *
     * @param exchangeName
     */
    public boolean deleteExchange(String exchangeName) {
        return rabbitAdmin.deleteExchange(exchangeName);
    }
 
 
    /**
     * 声明其名称自动命名的队列。它是用exclusive=true、autoDelete=true和 durable = false
     *
     * @return Queue
     */
    public Queue addQueue() {
        return rabbitAdmin.declareQueue();
    }
 
    /**
     * 创建一个指定的Queue
     *
     * @param queue
     * @return queueName
     */
    public String addQueue(Queue queue) {
        return rabbitAdmin.declareQueue(queue);
    }
 
    /**
     * 删除一个队列
     *
     * @param queueName the name of the queue.
     * @param unused    true if the queue should be deleted only if not in use.
     * @param empty     true if the queue should be deleted only if empty.
     */
    public void deleteQueue(String queueName, boolean unused, boolean empty) {
        rabbitAdmin.deleteQueue(queueName, unused, empty);
    }
 
    /**
     * 删除一个队列
     *
     * @param queueName
     * @return true if the queue existed and was deleted.
     */
    public boolean deleteQueue(String queueName) {
        return rabbitAdmin.deleteQueue(queueName);
    }
 
    /**
     * 绑定一个队列到一个匹配型交换器使用一个routingKey
     *
     * @param queue
     * @param exchange
     * @param routingKey
     */
    public void addBinding(Queue queue, TopicExchange exchange, String routingKey) {
        Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
        rabbitAdmin.declareBinding(binding);
    }
 
    /**
     * 绑定一个Exchange到一个匹配型Exchange 使用一个routingKey
     *
     * @param exchange
     * @param topicExchange
     * @param routingKey
     */
    public void addBinding(Exchange exchange, TopicExchange topicExchange, String routingKey) {
        Binding binding = BindingBuilder.bind(exchange).to(topicExchange).with(routingKey);
        rabbitAdmin.declareBinding(binding);
    }
 
    /**
     * 去掉一个binding
     *
     * @param binding
     */
    public void removeBinding(Binding binding) {
        rabbitAdmin.removeBinding(binding);
    }
 
    /**
     * 创建交换器
     *
     * @param exchangeName
     * @return
     */
    public DirectExchange createExchange(String exchangeName) {
        return new DirectExchange(exchangeName, true, false);
    }
}