From 765f87b17ff1709d5307a6c01bbc40bcfd1c4d48 Mon Sep 17 00:00:00 2001 From: bsw215583320 <baoshiwei121@163.com> Date: 星期一, 18 十一月 2024 16:58:41 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/herb' into herb --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 164 +++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 112 insertions(+), 52 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index cb5861d..1efee35 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -2,7 +2,9 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -14,17 +16,19 @@ import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; +import org.jeecg.modules.dry.entity.DryEqpType; import org.jeecg.modules.dry.entity.DryEquipment; +import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.entity.DryShop; -import org.jeecg.modules.dry.service.IDryEquipmentService; -import org.jeecg.modules.dry.service.IDryRealTimeDataService; -import org.jeecg.modules.dry.service.IDryShopService; +import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; + +import java.util.List; @Slf4j @Component @@ -49,7 +53,13 @@ private IDryEquipmentService equipmentService; @Autowired + private IDryEqpTypeService eqpTypeService; + + @Autowired private IDryShopService dryShopService; + + @Autowired + private IDryFaultRecordService faultRecordService; @@ -57,10 +67,11 @@ @Override public void connectionLost(Throwable throwable) { System.err.println("杩炴帴鏂紑锛氾細鎺夌嚎"); + System.err.println("杩炴帴鏂紑锛氾細"+throwable.toString()); } @Override - public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { + public void messageArrived(String topic, MqttMessage mqttMessage) { System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" + new String(mqttMessage.getPayload())); @@ -85,21 +96,29 @@ item.put("clientid", clientid); //鏄惁杩炴帴 item.put("connected", true); - // - String[] info = clientid.split("-"); - item.put("type", info[0]); - item.put("tenantId", info[1]); - item.put("code", info[2]); - + //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) + try { + String[] info = clientid.split("-"); + item.put("type", info[0]); + item.put("tenantId", info[1]); + //item.put("code", info[2]); + }catch (Exception e){ + e.printStackTrace(); + } redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); } } if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { - String clientid = messageJson.getString("clientid"); - redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); - System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + try { + String clientid = messageJson.getString("clientid"); + redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); + System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); + } catch (Exception e) { + e.printStackTrace(); + } + } parseAdminCommand(topic, mqttMessage); @@ -140,7 +159,7 @@ messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� - if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){ + if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } @@ -183,34 +202,62 @@ // 鎺ユ敹璁惧瀹炴椂鏁版嵁 case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } + ThreadUtil.execute(() -> { + try { + RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); + realTimeDataService.realTimeDataHandle(vo); + } catch (Exception e) { + e.printStackTrace(); + } + }); break; - case MqttConstant.TENANT_UP_PREFIX_EQU: - try { - DryEquipment equipment = (DryEquipment) messageJson.get("equipment"); + // 鎺ユ敹璁惧鎶ヨ鏁版嵁 + case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: + ThreadUtil.execute(() -> { + try { - DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); + JSONObject jsonObject = JSON.parseObject(message); + List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); + System.err.println(faultRecords.toString()); + faultRecordService.saveBatch(faultRecords); - if (dryEquipment == null) { - equipmentService.save(equipment); + } catch (Exception e) { + e.printStackTrace(); } + }); + + break; + case MqttConstant.TENANT_UP_PREFIX_EQU: + ThreadUtil.execute(() -> { + try { + Object equObj = messageJson.get("equipment"); + DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); + TenantContext.setTenant(equipment.getTenantId()+""); + DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); + if (dryEquipment == null) { + equipmentService.save(equipment); + } + Object typeObj = messageJson.get("eqpType"); + DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); + DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); + if (dryEqpType == null) { + eqpTypeService.save(eqpType); + } + // 鑾峰彇璁惧鎵�灞炶溅闂� + Object shopObj = messageJson.get("shop"); + DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); + DryShop dryShop = dryShopService.getById(shop.getId()); + if (dryShop == null) { + dryShopService.save(shop); + } + } catch (Exception e) { + e.printStackTrace(); + } + }); - // 鑾峰彇璁惧鎵�灞炶溅闂� - DryShop shop = (DryShop) messageJson.get("shop"); - shop.setTenantId(equipment.getTenantId()); - dryShopService.save(shop); - - } catch (Exception e) { - e.printStackTrace(); - } } } @@ -254,27 +301,40 @@ break; - case MqttConstant.SERVICE_REQ_EQU_TOPIC: + case MqttConstant.SERVICE_REQ_PREFIX: log.debug("鏀跺埌璁惧璇︾粏淇℃伅鏌ヨ璇锋眰"); - // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 - String code = messageJson.getString("code"); - DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(TenantContext.getTenant(),code); - // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 - DryShop shop = dryShopService.getById(equipmentVo.getShopId()); + ThreadUtil.execute(() -> { + String tenantId = messageJson.getString("tenantId"); + String clientId = mqttUtil.getMqttClient().getClientId(); + String tenant = clientId.substring(clientId.lastIndexOf("_")+1); + if (tenantId!=null && tenantId.equals(tenant)) { + TenantContext.setTenant(tenantId); + // 鏍规嵁璁惧缂栫爜鏌ヨ璁惧淇℃伅 + String code = messageJson.getString("code"); + DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code); + // 鏍规嵁璁惧杞﹂棿id鏌ヨ杞﹂棿淇℃伅 + DryShop shop = dryShopService.getById(equipmentVo.getShopId()); + // 鏍规嵁璁惧绫诲瀷ID鏌ヨ璁惧绫诲瀷淇℃伅 + DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); - JSONObject res = new JSONObject(); + JSONObject res = new JSONObject(); - res.put("tenant", TenantContext.getTenant()); - res.put("equipment", equipmentVo); - res.put("shop", shop); - try { - MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); - // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); - } catch (Exception e) { - e.printStackTrace(); - } + res.put("tenant", tenantId); + res.put("equipment", equipmentVo); + res.put("shop", shop); + res.put("eqpType", eqpType); + try { + MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); + // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + break; -- Gitblit v1.9.3