From ff34638b445619d83740223514aa4de4a8e9a65f Mon Sep 17 00:00:00 2001 From: bsw215583320 <baoshiwei121@163.com> Date: 星期一, 17 二月 2025 12:45:05 +0800 Subject: [PATCH] feat(mqtt): 增加设备实时数据处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 631 ++++++++++++++++++++++++++++++++++----------------------- 1 files changed, 378 insertions(+), 253 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index e9e23cf..b381df7 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -2,8 +2,11 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -16,303 +19,425 @@ import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEqpType; import org.jeecg.modules.dry.entity.DryEquipment; +import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.entity.DryShop; -import org.jeecg.modules.dry.service.IDryEqpTypeService; -import org.jeecg.modules.dry.service.IDryEquipmentService; -import org.jeecg.modules.dry.service.IDryRealTimeDataService; -import org.jeecg.modules.dry.service.IDryShopService; +import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; +import org.jeecg.modules.dry.vo.DryFaultRecordVo; +import org.jeecg.modules.dry.vo.RealTimeDataParentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { - @Value(value = "${jeecg.mqtt.role}") - private String role; - @Autowired - private MqttUtil mqttUtil; - @Autowired - private EmqxApi emqxApi; - @Autowired - private BaseCommonService baseCommonService; - @Autowired - private RedisUtil redisUtil; + @Value(value = "${jeecg.mqtt.role}") + private String role; + @Autowired + private MqttUtil mqttUtil; + @Autowired + private EmqxApi emqxApi; + @Autowired + private BaseCommonService baseCommonService; + @Autowired + private RedisUtil redisUtil; - @Autowired - private IDryRealTimeDataService realTimeDataService; + @Autowired + private IDryRealTimeDataService realTimeDataService; - @Autowired - private IDryEquipmentService equipmentService; + @Autowired + private IDryEquipmentService equipmentService; - @Autowired - private IDryEqpTypeService eqpTypeService; + @Autowired + private IDryEqpTypeService eqpTypeService; - @Autowired - private IDryShopService dryShopService; + @Autowired + private IDryShopService dryShopService; + + @Autowired + private IDryFaultRecordService faultRecordService; + @Override + public void connectionLost(Throwable throwable) { + System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎"); + System.err.println("杩炴帴鏂紑锛氾細" + throwable.toString()); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" + + new String(mqttMessage.getPayload())); + + switch (role) { + // 绠$悊鍛� + case "admin": + String message = new String(mqttMessage.getPayload()); + JSONObject messageJson = JSONObject.parseObject(message); + + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { + JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); + if (client == null) { + JSONObject item = new JSONObject(); + //username + item.put("username", messageJson.get("username")); + //杩炴帴鏃堕棿 + Long st = messageJson.getLong("connected_at"); + String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); + item.put("connectedAt", upTime); + //clientid + String clientid = messageJson.getString("clientid"); + item.put("clientid", clientid); + //鏄惁杩炴帴 + item.put("connected", true); + //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) + try { + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + //item.put("code", info[2]); + } catch (Exception e) { + e.printStackTrace(); + } + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + } + + } + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { + try { + String clientid = messageJson.getString("clientid"); + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); + System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + } catch (Exception e) { + e.printStackTrace(); + } + + } + parseAdminCommand(topic, mqttMessage); + + break; + // 鏅�氱敤鎴� + case "user": + System.err.println("user"); + try { + parseUserCommand(topic, mqttMessage); + } catch (Exception e) { + e.printStackTrace(); + } + + break; + + } + + } - @Override - public void connectionLost(Throwable throwable) { - System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎"); - } + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + System.err.println("娑堟伅浼犻�掓垚鍔�"); + } - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); - - switch (role) { - // 绠$悊鍛� - case "admin": + // 瑙f瀽admin瑙掕壊鎸囦护 + private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { - JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); - if (client == null) { - JSONObject item = new JSONObject(); - //username - item.put("username", messageJson.get("username")); - //杩炴帴鏃堕棿 - Long st = messageJson.getLong("connected_at"); - String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); - item.put("connectedAt", upTime); - //clientid - String clientid = messageJson.getString("clientid"); - item.put("clientid", clientid); - //鏄惁杩炴帴 - item.put("connected", true); - // - String[] info = clientid.split("-"); - item.put("type", info[0]); - item.put("tenantId", info[1]); - item.put("code", info[2]); - - redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); - } - + //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) + StringBuilder req = new StringBuilder(); + if (messageJson.containsKey("req")) { + req.append(messageJson.get("req")); } - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { - try { - String clientid = messageJson.getString("clientid"); - redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); - System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); - } catch (Exception e) { - e.printStackTrace(); - } - + //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� + if (messageJson.containsKey("timestamp")) { + messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } - parseAdminCommand(topic, mqttMessage); - - break; - // 鏅�氱敤鎴� - case "user": - System.err.println("user"); - try { - parseUserCommand(topic, mqttMessage); - } catch (Exception e) { - e.printStackTrace(); + // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� + if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { + // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } - break; + switch (topic) { + // 鏌ヨ璁惧鍦ㄧ嚎 + case MqttConstant.MOBILE_QUERY_EQU_STATU: + System.err.println("admin鏀跺埌" + topic); + // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� + String clientId = messageJson.getString("clientId"); + JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); + + ThreadUtil.execute(() -> { + + if (client == null || client.isEmpty()) { + JSONObject res = new JSONObject(); + res.put("success", false); + res.put("msg", "鏌ヨ澶辫触"); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); + } catch (Exception e) { + e.printStackTrace(); + } + return; + } + + client.put("success", true); + client.put("msg", "鏌ヨ鎴愬姛"); + try { + MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); + baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + }); + break; + + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: + ThreadUtil.execute(() -> { + try { + RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + break; + // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴 + case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: + ThreadUtil.execute(() -> { + try { + RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { + e.printStackTrace(); + } + }); + break; + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { + + MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { + }); + //鏁呴殰鏁版嵁 + Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); + //绉熸埛id + String tenantId = realFaultMessage.getTentId(); + //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis + //杞崲涓� Map<String, Object> + Map<String, Object> objectMap = dryFaultMap.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> (Object) entry.getValue() + )); + redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if (dryFaultMap.isEmpty()) { + return; + } + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); + //鍙戦�佸箍鎾� + System.err.println("骞挎挱缁欙細" + recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); + } + break; + //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 + case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: { + String tenantId = (String) messageJson.get("tenantId"); + if (req.toString().isEmpty() || tenantId == null) { + return; + } + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); + //杞崲涓� Map<String, DryFaultRecordVo> + Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); + if (dryFaultMap.isEmpty()) { + return; + } + String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic); + //鍙戦�佽姹傝澶� + System.err.println("鍙戦�佺粰锛�" + resTopic); + sendMqttMessage(resTopic, mqMessage, 2); + + } + break; + // 鎺ユ敹璁惧鎶ヨ鍘嗗彶鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: + ThreadUtil.execute(() -> { + try { + MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { + }); + // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); + System.err.println(faultMessage.toString()); + faultRecordService.saveBatch(faultMessage.getData()); + + } catch (Exception e) { + e.printStackTrace(); + } + }); + + break; + + + case MqttConstant.TENANT_UP_PREFIX_EQU: + ThreadUtil.execute(() -> { + try { + Object equObj = messageJson.get("equipment"); + DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); + TenantContext.setTenant(equipment.getTenantId() + ""); + DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); + if (dryEquipment == null) { + equipmentService.save(equipment); + } + Object typeObj = messageJson.get("eqpType"); + DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); + DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); + if (dryEqpType == null) { + eqpTypeService.save(eqpType); + } + // 鑾峰彇璁惧鎵�灞炶溅闂� + Object shopObj = messageJson.get("shop"); + DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); + DryShop dryShop = dryShopService.getById(shop.getId()); + if (dryShop == null) { + dryShopService.save(shop); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + + } } - } + + // 瑙f瀽user瑙掕壊鎸囦护 + private void parseUserCommand(String topic, MqttMessage mqttMessage) { + + String message = new String(mqttMessage.getPayload()); + JSONObject messageJson = JSONObject.parseObject(message); + + //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) + StringBuilder req = new StringBuilder(); + if (messageJson.containsKey("req")) { + req.append(messageJson.get("req")); + } + //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� + if (messageJson.containsKey("timestamp")) { + messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); + } + + switch (topic) { + case MqttConstant.MOBILE_REQ_EQU_CMD: + System.err.println("user鏀跺埌" + topic); + System.err.println(message); + ThreadUtil.execute(() -> { + //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅 + JSONObject res = new JSONObject(); + res.put("success", true); + res.put("msg", "鎿嶄綔鎴愬姛"); + try { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); + baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + + }); - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.err.println("娑堟伅浼犻�掓垚鍔�"); - } + break; + case MqttConstant.SERVICE_REQ_PREFIX: + log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); + ThreadUtil.execute(() -> { + String tenantId = messageJson.getString("tenantId"); + String clientId = mqttUtil.getMqttClient().getClientId(); + String tenant = clientId.substring(clientId.lastIndexOf("_") + 1); + if (tenantId != null && tenantId.equals(tenant)) { + TenantContext.setTenant(tenantId); + // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 + String code = messageJson.getString("code"); + DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code); + // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 + DryShop shop = dryShopService.getById(equipmentVo.getShopId()); + // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅 + DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); - // 瑙f瀽admin瑙掕壊鎸囦护 - private void parseAdminCommand(String topic, MqttMessage mqttMessage) { - String message = new String(mqttMessage.getPayload()); - JSONObject messageJson = JSONObject.parseObject(message); + JSONObject res = new JSONObject(); - //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) - StringBuilder req = new StringBuilder(); - if (messageJson.containsKey("req")) { - req.append(messageJson.get("req")); - } - //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� - if (messageJson.containsKey("timestamp")) { - messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); - } - // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� - if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){ - baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); + res.put("tenant", tenantId); + res.put("equipment", equipmentVo); + res.put("shop", shop); + res.put("eqpType", eqpType); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); + // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + + break; + + } + + } - switch (topic) { - // 鏌ヨ璁惧鍦ㄧ嚎 - case MqttConstant.MOBILE_QUERY_EQU_STATU: - System.err.println("admin鏀跺埌" + topic); - // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� - String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); - + /** + * 鍙戦�佹秷鎭� + * + * @param topic 璁㈤槄 + * @param mqMessage 娑堟伅浣� + * @param type 1-鍙戦�佺粰绉熸埛 2-鍙戦�佺粰鍥哄畾id + */ + private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { ThreadUtil.execute(() -> { - - if (client == null || client.isEmpty()) { - JSONObject res = new JSONObject(); - res.put("success", false); - res.put("msg", "鏌ヨ澶辫触"); try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); + if (type == 1) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); + } else if (type == 2) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(topic, sendMessage); + } + } catch (Exception e) { - e.printStackTrace(); + e.printStackTrace(); } - return; - } - - client.put("success", true); - client.put("msg", "鏌ヨ鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } }); - break; - - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 - case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - ThreadUtil.execute(() -> { - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } - }); - - break; - case MqttConstant.TENANT_UP_PREFIX_EQU: - ThreadUtil.execute(() -> { - try { - Object equObj = messageJson.get("equipment"); - DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); - TenantContext.setTenant(equipment.getTenantId()+""); - DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); - if (dryEquipment == null) { - equipmentService.save(equipment); - } - Object typeObj = messageJson.get("eqpType"); - DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); - DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); - if (dryEqpType == null) { - eqpTypeService.save(eqpType); - } - // 鑾峰彇璁惧鎵�灞炶溅闂� - Object shopObj = messageJson.get("shop"); - DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); - DryShop dryShop = dryShopService.getById(shop.getId()); - if (dryShop == null) { - dryShopService.save(shop); - } - } catch (Exception e) { - e.printStackTrace(); - } - }); - } - } - - - // 瑙f瀽user瑙掕壊鎸囦护 - private void parseUserCommand(String topic, MqttMessage mqttMessage) { - - String message = new String(mqttMessage.getPayload()); - JSONObject messageJson = JSONObject.parseObject(message); - - //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) - StringBuilder req = new StringBuilder(); - if (messageJson.containsKey("req")) { - req.append(messageJson.get("req")); - } - //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� - if (messageJson.containsKey("timestamp")) { - messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); - } - - switch (topic) { - case MqttConstant.MOBILE_REQ_EQU_CMD: - System.err.println("user鏀跺埌" + topic); - System.err.println(message); - ThreadUtil.execute(() -> { - //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅 - JSONObject res = new JSONObject(); - res.put("success", true); - res.put("msg", "鎿嶄綔鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); - baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - - }); - - - break; - case MqttConstant.SERVICE_REQ_PREFIX: - log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); - ThreadUtil.execute(() -> { - String tenantId = messageJson.getString("tenantId"); - String clientId = mqttUtil.getMqttClient().getClientId(); - String tenant = clientId.substring(clientId.lastIndexOf("_")+1); - if (tenantId!=null && tenantId.equals(tenant)) { - TenantContext.setTenant(tenantId); - // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 - String code = messageJson.getString("code"); - DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code); - // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 - DryShop shop = dryShopService.getById(equipmentVo.getShopId()); - // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅 - DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); - - JSONObject res = new JSONObject(); - - res.put("tenant", tenantId); - res.put("equipment", equipmentVo); - res.put("shop", shop); - res.put("eqpType", eqpType); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); - // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - - - break; - - } - - } } -- Gitblit v1.9.3