From 320c0c10a90140627b10a6fcf498e79d09785da6 Mon Sep 17 00:00:00 2001
From: zhuguifei <312353457@qq.com>
Date: 星期三, 27 十一月 2024 13:42:58 +0800
Subject: [PATCH] 添加mqtt数据接口
---
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java | 15 +
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 625 ++++++++++++++++++++++++++++-----------------------
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 22 +
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java | 23 +
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 28 +
5 files changed, 412 insertions(+), 301 deletions(-)
diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 54530d6..f7867b5 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -31,10 +31,9 @@
String MOBILE_QUERY_EQU_STATU = MOBILE_UP_PREFIX + "/query/equ/statu";
//绉诲姩绔繙绋嬭姹傛寚浠�
String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd";
+
+
/**************************绉诲姩绔悜鏈嶅姟绔姹傛寚浠nd*******************************/
-
-
-
@@ -47,7 +46,26 @@
String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
//杩斿洖绉诲姩绔繙绋嬭姹傛寚浠�
String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
+
+
+
/**************************鏈嶅姟绔悜绉诲姩绔搷搴旀寚浠nd*******************************/
+
+
+ /**************************鏈嶅姟绔悜绉诲姩绔彂閫佸箍鎾璼tart*******************************/
+ //骞挎挱绫诲瀷鎺ㄩ�佹棤鍏崇Щ鍔ㄧ璁惧id锛屽悜鎵�鏈夊湪绾跨Щ鍔ㄧ鍙戦��
+ String SERVICE_BROADCAST_PREFIX = "service/broadcast";
+
+ //鏈嶅姟绔悜鍚勭鎴峰鎴风鍙戦�佸疄鏃舵晠闅滃箍鎾�
+ String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s" ;
+
+
+
+ /**************************鏈嶅姟绔悜绉诲姩绔彂閫佸箍鎾璭nd*******************************/
+
+
+
+
/**************************绉熸埛绔悜鏈嶅姟绔彂閫佹暟鎹畇tart*******************************/
@@ -80,9 +98,9 @@
//service(cloud)
//鍦ㄧ嚎瀹㈡埛绔�
- String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
+ String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
//鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
- String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:";
+ String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
index d67b9d5..feea7b1 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -18,7 +18,9 @@
import org.jeecg.config.mybatis.MybatisPlusSaasConfig;
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.entity.DryEquipment;
+import org.jeecg.modules.dry.entity.DryFaultRecord;
import org.jeecg.modules.dry.service.IDryEquipmentService;
+import org.jeecg.modules.dry.service.IDryFaultRecordService;
import org.jeecg.modules.dry.vo.MoEquVo;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +42,8 @@
public class MobileController {
@Autowired
private IDryEquipmentService dryEquipmentService;
+ @Autowired
+ private IDryFaultRecordService faultRecordService;
@Autowired
private RedisUtil redisUtil;
@@ -64,18 +68,27 @@
return Result.OK(voPage);
}
+ @ApiOperation(value = "璁惧鎶ヨ鏁版嵁", notes = "璁惧鎶ヨ鏁版嵁鍒楄〃鏌ヨ")
+ @GetMapping(value = "/fault/list")
+ public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){
+ int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
+ QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
+ Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize);
+ IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper);
+ return Result.OK(pageList);
+ }
+
+
+
private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
-
-
//褰撳墠绉熸埛id
int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
-
-
List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
MoEquVo vo = new MoEquVo();
BeanUtils.copyProperties(item, vo);
String clientid = "client-" + tenantId + "-" + item.getCode();
- JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
+ //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId);
+ JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid);
//缁勮鐘舵�佹暟鎹�
if (client != null) {
vo.setOnline(true);
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
index 892bf70..685b416 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
@@ -10,6 +10,7 @@
public class MqMessage<T> {
private T data;
private String tentId;
+ private String topic;
public MqMessage() {
@@ -19,4 +20,18 @@
this.data = data;
this.tentId = tentId;
}
+ public MqMessage(T data, String tentId,String topic) {
+ this.data = data;
+ this.tentId = tentId;
+ this.topic = topic;
+ }
+
+ @Override
+ public String toString() {
+ return "MqMessage{" +
+ "data=" + data +
+ ", tentId='" + tentId + '\'' +
+ ", topic='" + topic + '\'' +
+ '}';
+ }
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index 41d3db4..43c838f 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.core.RedisTemplate;
import java.util.*;
@@ -44,6 +45,8 @@
private RedisUtil redisUtil;
@Autowired
private EmqxApi emqxApi;
+ @Autowired
+ private RedisTemplate redisTemplate;
@Bean
@@ -98,6 +101,8 @@
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
// 璁㈤槄绉熸埛鎶ヨ鏁版嵁
+ mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
+ System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
@@ -153,7 +158,14 @@
* 鏈嶅姟绔紙admin瑙掕壊锛夊惎鍔ㄦ椂鏌ヨ鎵�鏈夎澶囧苟缂撳瓨鍒皉edis
*/
private void initClients() {
- redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT);
+ //鍒濆鍖栨椂鍏堝垹闄ゆ墍鏈夊湪绾胯澶�
+ Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
+ if (keys != null && !keys.isEmpty()) {
+ keys.forEach(key -> System.out.println("鍒濆鍖栧垹闄ゅ湪绾胯澶�: " + key));
+ redisTemplate.delete(keys);
+ } else {
+ System.out.println("鍒濆鍖栨棤鍦ㄧ嚎璁惧: " + MqttConstant.MQTT_ONLINE_CLIENT);
+ }
JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
//TODO 鏍规嵁emqx杩斿洖缂栧啓瀹炰綋绫�
@@ -180,13 +192,15 @@
item.put("type", info[0]);
item.put("tenantId", info[1]);
//item.put("code", info[2]);
+
+ if (connected) {
+ redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
+ }
}catch (Exception e){
e.printStackTrace();
}
- if (connected) {
- redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
- }
+
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 1143056..068d415 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -22,324 +23,374 @@
import org.jeecg.modules.dry.entity.DryShop;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
+import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
@Slf4j
@Component
@Scope("prototype")
public class MqttSampleCallback implements MqttCallback {
- @Value(value = "${jeecg.mqtt.role}")
- private String role;
- @Autowired
- private MqttUtil mqttUtil;
- @Autowired
- private EmqxApi emqxApi;
- @Autowired
- private BaseCommonService baseCommonService;
- @Autowired
- private RedisUtil redisUtil;
+ @Value(value = "${jeecg.mqtt.role}")
+ private String role;
+ @Autowired
+ private MqttUtil mqttUtil;
+ @Autowired
+ private EmqxApi emqxApi;
+ @Autowired
+ private BaseCommonService baseCommonService;
+ @Autowired
+ private RedisUtil redisUtil;
- @Autowired
- private IDryRealTimeDataService realTimeDataService;
+ @Autowired
+ private IDryRealTimeDataService realTimeDataService;
- @Autowired
- private IDryEquipmentService equipmentService;
+ @Autowired
+ private IDryEquipmentService equipmentService;
- @Autowired
- private IDryEqpTypeService eqpTypeService;
+ @Autowired
+ private IDryEqpTypeService eqpTypeService;
- @Autowired
- private IDryShopService dryShopService;
+ @Autowired
+ private IDryShopService dryShopService;
- @Autowired
- private IDryFaultRecordService faultRecordService;
+ @Autowired
+ private IDryFaultRecordService faultRecordService;
+ @Override
+ public void connectionLost(Throwable throwable) {
+ System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎");
+ System.err.println("杩炴帴鏂紑锛氾細" + throwable.toString());
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage mqttMessage) {
+ System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�"
+ + new String(mqttMessage.getPayload()));
+
+ switch (role) {
+ // 绠$悊鍛�
+ case "admin":
+ String message = new String(mqttMessage.getPayload());
+ JSONObject messageJson = JSONObject.parseObject(message);
+
+ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
+ JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
+ if (client == null) {
+ JSONObject item = new JSONObject();
+ //username
+ item.put("username", messageJson.get("username"));
+ //杩炴帴鏃堕棿
+ Long st = messageJson.getLong("connected_at");
+ String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
+ item.put("connectedAt", upTime);
+ //clientid
+ String clientid = messageJson.getString("clientid");
+ item.put("clientid", clientid);
+ //鏄惁杩炴帴
+ item.put("connected", true);
+ //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000)
+ try {
+ String[] info = clientid.split("-");
+ item.put("type", info[0]);
+ item.put("tenantId", info[1]);
+ //item.put("code", info[2]);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
+ System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+ }
+
+ }
+ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
+ try {
+ String clientid = messageJson.getString("clientid");
+ redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid);
+ System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+ parseAdminCommand(topic, mqttMessage);
+
+ break;
+ // 鏅�氱敤鎴�
+ case "user":
+ System.err.println("user");
+ try {
+ parseUserCommand(topic, mqttMessage);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ break;
+
+ }
+
+ }
- @Override
- public void connectionLost(Throwable throwable) {
- System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎");
- System.err.println("杩炴帴鏂紑锛氾細"+throwable.toString());
- }
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+ System.err.println("娑堟伅浼犻�掓垚鍔�");
+ }
- @Override
- public void messageArrived(String topic, MqttMessage mqttMessage) {
- System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�"
- + new String(mqttMessage.getPayload()));
-
- switch (role) {
- // 绠$悊鍛�
- case "admin":
+ // 瑙f瀽admin瑙掕壊鎸囦护
+ private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
String message = new String(mqttMessage.getPayload());
JSONObject messageJson = JSONObject.parseObject(message);
- if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
- JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
- if (client == null) {
- JSONObject item = new JSONObject();
- //username
- item.put("username", messageJson.get("username"));
- //杩炴帴鏃堕棿
- Long st = messageJson.getLong("connected_at");
- String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
- item.put("connectedAt", upTime);
- //clientid
- String clientid = messageJson.getString("clientid");
- item.put("clientid", clientid);
- //鏄惁杩炴帴
- item.put("connected", true);
- //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000)
+ //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
+ StringBuilder req = new StringBuilder();
+ if (messageJson.containsKey("req")) {
+ req.append(messageJson.get("req"));
+ }
+ //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
+ if (messageJson.containsKey("timestamp")) {
+ messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
+ }
+ // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
+ if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
+ // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
+ }
+
+ switch (topic) {
+ // 鏌ヨ璁惧鍦ㄧ嚎
+ case MqttConstant.MOBILE_QUERY_EQU_STATU:
+ System.err.println("admin鏀跺埌" + topic);
+ // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
+ String clientId = messageJson.getString("clientId");
+ JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
+
+ ThreadUtil.execute(() -> {
+
+ if (client == null || client.isEmpty()) {
+ JSONObject res = new JSONObject();
+ res.put("success", false);
+ res.put("msg", "鏌ヨ澶辫触");
+ try {
+ MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return;
+ }
+
+ client.put("success", true);
+ client.put("msg", "鏌ヨ鎴愬姛");
+ try {
+ MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
+ baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+ break;
+
+ // 鎺ユ敹璁惧瀹炴椂鏁版嵁
+ case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
+ ThreadUtil.execute(() -> {
+ try {
+ RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
+ realTimeDataService.realTimeDataHandle(vo);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ break;
+ //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁
+ case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
+ MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
+ });
+ //鏁呴殰鏁版嵁
+ Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
+ //绉熸埛id
+ String tentId = realFaultMessage.getTentId();
+ //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis
+ //杞崲涓� Map<String, Object>
+ Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> (Object) entry.getValue()
+ ));
+ redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
+ //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+ if(dryFaultMap.isEmpty()){
+ return;
+ }
+ String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
+ //鏁版嵁杞崲
+ List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+ MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
+ //鍙戦�佸箍鎾�
+ System.err.println("骞挎挱缁欙細" + recTopic);
+ sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
+
+ break;
+ // 鎺ユ敹璁惧鎶ヨ鏁版嵁
+ case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
+ ThreadUtil.execute(() -> {
+ try {
+ MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
+ });
+ // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
+ System.err.println(faultMessage.toString());
+ faultRecordService.saveBatch(faultMessage.getData());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ break;
+
+
+ case MqttConstant.TENANT_UP_PREFIX_EQU:
+ ThreadUtil.execute(() -> {
+ try {
+ Object equObj = messageJson.get("equipment");
+ DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
+ TenantContext.setTenant(equipment.getTenantId() + "");
+ DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
+ if (dryEquipment == null) {
+ equipmentService.save(equipment);
+ }
+ Object typeObj = messageJson.get("eqpType");
+ DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
+ DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
+ if (dryEqpType == null) {
+ eqpTypeService.save(eqpType);
+ }
+ // 鑾峰彇璁惧鎵�灞炶溅闂�
+ Object shopObj = messageJson.get("shop");
+ DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
+ DryShop dryShop = dryShopService.getById(shop.getId());
+ if (dryShop == null) {
+ dryShopService.save(shop);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ }
+
+ }
+
+
+ // 瑙f瀽user瑙掕壊鎸囦护
+ private void parseUserCommand(String topic, MqttMessage mqttMessage) {
+
+ String message = new String(mqttMessage.getPayload());
+ JSONObject messageJson = JSONObject.parseObject(message);
+
+ //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
+ StringBuilder req = new StringBuilder();
+ if (messageJson.containsKey("req")) {
+ req.append(messageJson.get("req"));
+ }
+ //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
+ if (messageJson.containsKey("timestamp")) {
+ messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
+ }
+
+ switch (topic) {
+ case MqttConstant.MOBILE_REQ_EQU_CMD:
+ System.err.println("user鏀跺埌" + topic);
+ System.err.println(message);
+ ThreadUtil.execute(() -> {
+ //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅
+ JSONObject res = new JSONObject();
+ res.put("success", true);
+ res.put("msg", "鎿嶄綔鎴愬姛");
+ try {
+ MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
+ baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ });
+
+
+ break;
+ case MqttConstant.SERVICE_REQ_PREFIX:
+ log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
+ ThreadUtil.execute(() -> {
+ String tenantId = messageJson.getString("tenantId");
+ String clientId = mqttUtil.getMqttClient().getClientId();
+ String tenant = clientId.substring(clientId.lastIndexOf("_") + 1);
+ if (tenantId != null && tenantId.equals(tenant)) {
+ TenantContext.setTenant(tenantId);
+ // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
+ String code = messageJson.getString("code");
+ DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code);
+ // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
+ DryShop shop = dryShopService.getById(equipmentVo.getShopId());
+ // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅
+ DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
+
+ JSONObject res = new JSONObject();
+
+ res.put("tenant", tenantId);
+ res.put("equipment", equipmentVo);
+ res.put("shop", shop);
+ res.put("eqpType", eqpType);
+ try {
+ MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
+ // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+
+ break;
+
+ }
+
+
+
+ }
+
+ /**
+ * 鍙戦�佹秷鎭�
+ * @param topic 璁㈤槄
+ * @param mqMessage 娑堟伅浣�
+ */
+ private void sendMqttMessage(String topic, MqMessage mqMessage){
+ ThreadUtil.execute(() -> {
try {
- String[] info = clientid.split("-");
- item.put("type", info[0]);
- item.put("tenantId", info[1]);
- //item.put("code", info[2]);
+ MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
+ sendMessage.setQos(0);
+ mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
}catch (Exception e){
- e.printStackTrace();
+ e.printStackTrace();
}
- redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
- System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
- }
-
- }
- if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
- try {
- String clientid = messageJson.getString("clientid");
- redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
- System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- parseAdminCommand(topic, mqttMessage);
-
- break;
- // 鏅�氱敤鎴�
- case "user":
- System.err.println("user");
- try {
- parseUserCommand(topic, mqttMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- break;
-
- }
-
- }
-
-
- @Override
- public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
- System.err.println("娑堟伅浼犻�掓垚鍔�");
- }
-
- // 瑙f瀽admin瑙掕壊鎸囦护
- private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
- String message = new String(mqttMessage.getPayload());
- JSONObject messageJson = JSONObject.parseObject(message);
-
- //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
- StringBuilder req = new StringBuilder();
- if (messageJson.containsKey("req")) {
- req.append(messageJson.get("req"));
- }
- //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
- if (messageJson.containsKey("timestamp")) {
- messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
- }
- // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇�
- if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){
- baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
- }
-
- switch (topic) {
- // 鏌ヨ璁惧鍦ㄧ嚎
- case MqttConstant.MOBILE_QUERY_EQU_STATU:
- System.err.println("admin鏀跺埌" + topic);
- // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
- String clientId = messageJson.getString("clientId");
- JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
-
- ThreadUtil.execute(() -> {
-
- if (client == null || client.isEmpty()) {
- JSONObject res = new JSONObject();
- res.put("success", false);
- res.put("msg", "鏌ヨ澶辫触");
- try {
- MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return;
- }
-
- client.put("success", true);
- client.put("msg", "鏌ヨ鎴愬姛");
- try {
- MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
- baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
- } catch (Exception e) {
- e.printStackTrace();
- }
});
- break;
-
- // 鎺ユ敹璁惧瀹炴椂鏁版嵁
- case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
- ThreadUtil.execute(() -> {
- try {
- RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
- realTimeDataService.realTimeDataHandle(vo);
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- break;
- // 鎺ユ敹璁惧鎶ヨ鏁版嵁
- case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
- ThreadUtil.execute(() -> {
- try {
- MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
- });
- // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
- System.err.println(listMqMessage.toString());
- faultRecordService.saveBatch(listMqMessage.getData());
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- break;
-
-
- case MqttConstant.TENANT_UP_PREFIX_EQU:
- ThreadUtil.execute(() -> {
- try {
- Object equObj = messageJson.get("equipment");
- DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
- TenantContext.setTenant(equipment.getTenantId()+"");
- DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
- if (dryEquipment == null) {
- equipmentService.save(equipment);
- }
- Object typeObj = messageJson.get("eqpType");
- DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
- DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
- if (dryEqpType == null) {
- eqpTypeService.save(eqpType);
- }
- // 鑾峰彇璁惧鎵�灞炶溅闂�
- Object shopObj = messageJson.get("shop");
- DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
- DryShop dryShop = dryShopService.getById(shop.getId());
- if (dryShop == null) {
- dryShopService.save(shop);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
}
- }
-
-
- // 瑙f瀽user瑙掕壊鎸囦护
- private void parseUserCommand(String topic, MqttMessage mqttMessage) {
-
- String message = new String(mqttMessage.getPayload());
- JSONObject messageJson = JSONObject.parseObject(message);
-
- //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风)
- StringBuilder req = new StringBuilder();
- if (messageJson.containsKey("req")) {
- req.append(messageJson.get("req"));
- }
- //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹�
- if (messageJson.containsKey("timestamp")) {
- messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
- }
-
- switch (topic) {
- case MqttConstant.MOBILE_REQ_EQU_CMD:
- System.err.println("user鏀跺埌" + topic);
- System.err.println(message);
- ThreadUtil.execute(() -> {
- //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅
- JSONObject res = new JSONObject();
- res.put("success", true);
- res.put("msg", "鎿嶄綔鎴愬姛");
- try {
- MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
- baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- });
-
-
- break;
- case MqttConstant.SERVICE_REQ_PREFIX:
- log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰");
- ThreadUtil.execute(() -> {
- String tenantId = messageJson.getString("tenantId");
- String clientId = mqttUtil.getMqttClient().getClientId();
- String tenant = clientId.substring(clientId.lastIndexOf("_")+1);
- if (tenantId!=null && tenantId.equals(tenant)) {
- TenantContext.setTenant(tenantId);
- // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅
- String code = messageJson.getString("code");
- DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code);
- // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅
- DryShop shop = dryShopService.getById(equipmentVo.getShopId());
- // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅
- DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
-
- JSONObject res = new JSONObject();
-
- res.put("tenant", tenantId);
- res.put("equipment", equipmentVo);
- res.put("shop", shop);
- res.put("eqpType", eqpType);
- try {
- MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
- // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
-
-
-
- break;
-
- }
-
- }
}
--
Gitblit v1.9.3