From 320c0c10a90140627b10a6fcf498e79d09785da6 Mon Sep 17 00:00:00 2001 From: zhuguifei <312353457@qq.com> Date: 星期三, 27 十一月 2024 13:42:58 +0800 Subject: [PATCH] 添加mqtt数据接口 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 625 ++++++++++++++++++++++++++++++-------------------------- 1 files changed, 338 insertions(+), 287 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index 1143056..068d415 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -22,324 +23,374 @@ import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; +import org.jeecg.modules.dry.vo.DryFaultRecordVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { - @Value(value = "${jeecg.mqtt.role}") - private String role; - @Autowired - private MqttUtil mqttUtil; - @Autowired - private EmqxApi emqxApi; - @Autowired - private BaseCommonService baseCommonService; - @Autowired - private RedisUtil redisUtil; + @Value(value = "${jeecg.mqtt.role}") + private String role; + @Autowired + private MqttUtil mqttUtil; + @Autowired + private EmqxApi emqxApi; + @Autowired + private BaseCommonService baseCommonService; + @Autowired + private RedisUtil redisUtil; - @Autowired - private IDryRealTimeDataService realTimeDataService; + @Autowired + private IDryRealTimeDataService realTimeDataService; - @Autowired - private IDryEquipmentService equipmentService; + @Autowired + private IDryEquipmentService equipmentService; - @Autowired - private IDryEqpTypeService eqpTypeService; + @Autowired + private IDryEqpTypeService eqpTypeService; - @Autowired - private IDryShopService dryShopService; + @Autowired + private IDryShopService dryShopService; - @Autowired - private IDryFaultRecordService faultRecordService; + @Autowired + private IDryFaultRecordService faultRecordService; + @Override + public void connectionLost(Throwable throwable) { + System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎"); + System.err.println("杩炴帴鏂紑锛氾細" + throwable.toString()); + } + + @Override + public void messageArrived(String topic, MqttMessage mqttMessage) { + System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" + + new String(mqttMessage.getPayload())); + + switch (role) { + // 绠$悊鍛� + case "admin": + String message = new String(mqttMessage.getPayload()); + JSONObject messageJson = JSONObject.parseObject(message); + + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { + JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); + if (client == null) { + JSONObject item = new JSONObject(); + //username + item.put("username", messageJson.get("username")); + //杩炴帴鏃堕棿 + Long st = messageJson.getLong("connected_at"); + String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); + item.put("connectedAt", upTime); + //clientid + String clientid = messageJson.getString("clientid"); + item.put("clientid", clientid); + //鏄惁杩炴帴 + item.put("connected", true); + //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) + try { + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + //item.put("code", info[2]); + } catch (Exception e) { + e.printStackTrace(); + } + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + } + + } + if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { + try { + String clientid = messageJson.getString("clientid"); + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid); + System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + } catch (Exception e) { + e.printStackTrace(); + } + + } + parseAdminCommand(topic, mqttMessage); + + break; + // 鏅�氱敤鎴� + case "user": + System.err.println("user"); + try { + parseUserCommand(topic, mqttMessage); + } catch (Exception e) { + e.printStackTrace(); + } + + break; + + } + + } - @Override - public void connectionLost(Throwable throwable) { - System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎"); - System.err.println("杩炴帴鏂紑锛氾細"+throwable.toString()); - } + @Override + public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { + System.err.println("娑堟伅浼犻�掓垚鍔�"); + } - @Override - public void messageArrived(String topic, MqttMessage mqttMessage) { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); - - switch (role) { - // 绠$悊鍛� - case "admin": + // 瑙f瀽admin瑙掕壊鎸囦护 + private void parseAdminCommand(String topic, MqttMessage mqttMessage) { String message = new String(mqttMessage.getPayload()); JSONObject messageJson = JSONObject.parseObject(message); - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { - JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); - if (client == null) { - JSONObject item = new JSONObject(); - //username - item.put("username", messageJson.get("username")); - //杩炴帴鏃堕棿 - Long st = messageJson.getLong("connected_at"); - String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); - item.put("connectedAt", upTime); - //clientid - String clientid = messageJson.getString("clientid"); - item.put("clientid", clientid); - //鏄惁杩炴帴 - item.put("connected", true); - //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) + //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) + StringBuilder req = new StringBuilder(); + if (messageJson.containsKey("req")) { + req.append(messageJson.get("req")); + } + //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� + if (messageJson.containsKey("timestamp")) { + messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); + } + // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� + if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { + // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); + } + + switch (topic) { + // 鏌ヨ璁惧鍦ㄧ嚎 + case MqttConstant.MOBILE_QUERY_EQU_STATU: + System.err.println("admin鏀跺埌" + topic); + // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� + String clientId = messageJson.getString("clientId"); + JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId); + + ThreadUtil.execute(() -> { + + if (client == null || client.isEmpty()) { + JSONObject res = new JSONObject(); + res.put("success", false); + res.put("msg", "鏌ヨ澶辫触"); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); + } catch (Exception e) { + e.printStackTrace(); + } + return; + } + + client.put("success", true); + client.put("msg", "鏌ヨ鎴愬姛"); + try { + MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); + baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + }); + break; + + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: + ThreadUtil.execute(() -> { + try { + RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { + e.printStackTrace(); + } + }); + + break; + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: + MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { + }); + //鏁呴殰鏁版嵁 + Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); + //绉熸埛id + String tentId = realFaultMessage.getTentId(); + //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis + //杞崲涓� Map<String, Object> + Map<String, Object> objectMap = dryFaultMap.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> (Object) entry.getValue() + )); + redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap); + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if(dryFaultMap.isEmpty()){ + return; + } + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); + //鍙戦�佸箍鎾� + System.err.println("骞挎挱缁欙細" + recTopic); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); + + break; + // 鎺ユ敹璁惧鎶ヨ鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: + ThreadUtil.execute(() -> { + try { + MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { + }); + // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); + System.err.println(faultMessage.toString()); + faultRecordService.saveBatch(faultMessage.getData()); + + } catch (Exception e) { + e.printStackTrace(); + } + }); + + break; + + + case MqttConstant.TENANT_UP_PREFIX_EQU: + ThreadUtil.execute(() -> { + try { + Object equObj = messageJson.get("equipment"); + DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); + TenantContext.setTenant(equipment.getTenantId() + ""); + DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); + if (dryEquipment == null) { + equipmentService.save(equipment); + } + Object typeObj = messageJson.get("eqpType"); + DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); + DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); + if (dryEqpType == null) { + eqpTypeService.save(eqpType); + } + // 鑾峰彇璁惧鎵�灞炶溅闂� + Object shopObj = messageJson.get("shop"); + DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); + DryShop dryShop = dryShopService.getById(shop.getId()); + if (dryShop == null) { + dryShopService.save(shop); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); + + } + + } + + + // 瑙f瀽user瑙掕壊鎸囦护 + private void parseUserCommand(String topic, MqttMessage mqttMessage) { + + String message = new String(mqttMessage.getPayload()); + JSONObject messageJson = JSONObject.parseObject(message); + + //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) + StringBuilder req = new StringBuilder(); + if (messageJson.containsKey("req")) { + req.append(messageJson.get("req")); + } + //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� + if (messageJson.containsKey("timestamp")) { + messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); + } + + switch (topic) { + case MqttConstant.MOBILE_REQ_EQU_CMD: + System.err.println("user鏀跺埌" + topic); + System.err.println(message); + ThreadUtil.execute(() -> { + //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅 + JSONObject res = new JSONObject(); + res.put("success", true); + res.put("msg", "鎿嶄綔鎴愬姛"); + try { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); + baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + + }); + + + break; + case MqttConstant.SERVICE_REQ_PREFIX: + log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); + ThreadUtil.execute(() -> { + String tenantId = messageJson.getString("tenantId"); + String clientId = mqttUtil.getMqttClient().getClientId(); + String tenant = clientId.substring(clientId.lastIndexOf("_") + 1); + if (tenantId != null && tenantId.equals(tenant)) { + TenantContext.setTenant(tenantId); + // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 + String code = messageJson.getString("code"); + DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code); + // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 + DryShop shop = dryShopService.getById(equipmentVo.getShopId()); + // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅 + DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); + + JSONObject res = new JSONObject(); + + res.put("tenant", tenantId); + res.put("equipment", equipmentVo); + res.put("shop", shop); + res.put("eqpType", eqpType); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); + // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + + break; + + } + + + + } + + /** + * 鍙戦�佹秷鎭� + * @param topic 璁㈤槄 + * @param mqMessage 娑堟伅浣� + */ + private void sendMqttMessage(String topic, MqMessage mqMessage){ + ThreadUtil.execute(() -> { try { - String[] info = clientid.split("-"); - item.put("type", info[0]); - item.put("tenantId", info[1]); - //item.put("code", info[2]); + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); }catch (Exception e){ - e.printStackTrace(); + e.printStackTrace(); } - redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); - } - - } - if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { - try { - String clientid = messageJson.getString("clientid"); - redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); - System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); - } catch (Exception e) { - e.printStackTrace(); - } - - } - parseAdminCommand(topic, mqttMessage); - - break; - // 鏅�氱敤鎴� - case "user": - System.err.println("user"); - try { - parseUserCommand(topic, mqttMessage); - } catch (Exception e) { - e.printStackTrace(); - } - - break; - - } - - } - - - @Override - public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { - System.err.println("娑堟伅浼犻�掓垚鍔�"); - } - - // 瑙f瀽admin瑙掕壊鎸囦护 - private void parseAdminCommand(String topic, MqttMessage mqttMessage) { - String message = new String(mqttMessage.getPayload()); - JSONObject messageJson = JSONObject.parseObject(message); - - //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) - StringBuilder req = new StringBuilder(); - if (messageJson.containsKey("req")) { - req.append(messageJson.get("req")); - } - //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� - if (messageJson.containsKey("timestamp")) { - messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); - } - // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� - if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ - baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); - } - - switch (topic) { - // 鏌ヨ璁惧鍦ㄧ嚎 - case MqttConstant.MOBILE_QUERY_EQU_STATU: - System.err.println("admin鏀跺埌" + topic); - // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� - String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); - - ThreadUtil.execute(() -> { - - if (client == null || client.isEmpty()) { - JSONObject res = new JSONObject(); - res.put("success", false); - res.put("msg", "鏌ヨ澶辫触"); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - } catch (Exception e) { - e.printStackTrace(); - } - return; - } - - client.put("success", true); - client.put("msg", "鏌ヨ鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); - baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } }); - break; - - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 - case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - ThreadUtil.execute(() -> { - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } - }); - - break; - // 鎺ユ敹璁惧鎶ヨ鏁版嵁 - case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: - ThreadUtil.execute(() -> { - try { - MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { - }); - // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); - System.err.println(listMqMessage.toString()); - faultRecordService.saveBatch(listMqMessage.getData()); - - } catch (Exception e) { - e.printStackTrace(); - } - }); - - break; - - - case MqttConstant.TENANT_UP_PREFIX_EQU: - ThreadUtil.execute(() -> { - try { - Object equObj = messageJson.get("equipment"); - DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); - TenantContext.setTenant(equipment.getTenantId()+""); - DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); - if (dryEquipment == null) { - equipmentService.save(equipment); - } - Object typeObj = messageJson.get("eqpType"); - DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); - DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); - if (dryEqpType == null) { - eqpTypeService.save(eqpType); - } - // 鑾峰彇璁惧鎵�灞炶溅闂� - Object shopObj = messageJson.get("shop"); - DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); - DryShop dryShop = dryShopService.getById(shop.getId()); - if (dryShop == null) { - dryShopService.save(shop); - } - } catch (Exception e) { - e.printStackTrace(); - } - }); - } - } - - - // 瑙f瀽user瑙掕壊鎸囦护 - private void parseUserCommand(String topic, MqttMessage mqttMessage) { - - String message = new String(mqttMessage.getPayload()); - JSONObject messageJson = JSONObject.parseObject(message); - - //璇锋眰鐨勫鎴风(鏈嶅姟绔彧鎺ㄩ�佹暟鎹埌璇锋眰鐨勫鎴风) - StringBuilder req = new StringBuilder(); - if (messageJson.containsKey("req")) { - req.append(messageJson.get("req")); - } - //鍓嶇浼犲弬鏃堕棿鎴宠浆鎹� - if (messageJson.containsKey("timestamp")) { - messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); - } - - switch (topic) { - case MqttConstant.MOBILE_REQ_EQU_CMD: - System.err.println("user鏀跺埌" + topic); - System.err.println(message); - ThreadUtil.execute(() -> { - //TODO 鍚慞LC鍙戦�佸紑鍏虫満鎿嶄綔锛屽苟杩斿洖淇℃伅 - JSONObject res = new JSONObject(); - res.put("success", true); - res.put("msg", "鎿嶄綔鎴愬姛"); - try { - MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); - baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - - }); - - - break; - case MqttConstant.SERVICE_REQ_PREFIX: - log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); - ThreadUtil.execute(() -> { - String tenantId = messageJson.getString("tenantId"); - String clientId = mqttUtil.getMqttClient().getClientId(); - String tenant = clientId.substring(clientId.lastIndexOf("_")+1); - if (tenantId!=null && tenantId.equals(tenant)) { - TenantContext.setTenant(tenantId); - // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 - String code = messageJson.getString("code"); - DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code); - // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 - DryShop shop = dryShopService.getById(equipmentVo.getShopId()); - // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅 - DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); - - JSONObject res = new JSONObject(); - - res.put("tenant", tenantId); - res.put("equipment", equipmentVo); - res.put("shop", shop); - res.put("eqpType", eqpType); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); - // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - - - - break; - - } - - } } -- Gitblit v1.9.3