From b38019aae593a66c16f7e75d6e37d14eb8d2c42e Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期二, 22 七月 2025 08:55:15 +0800 Subject: [PATCH] 修改接收实时数据接口-故障处理 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 371 +++++++++++++++++++-------------- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 91 ++++--- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 15 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java | 135 +++++------- jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 7 jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java | 29 ++ 6 files changed, 366 insertions(+), 282 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java index 6aa48fe..6264a3b 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java @@ -97,16 +97,15 @@ /**************************start*******************************/ /**************************end*******************************/ //redis缂撳瓨 - //client - String MQTT_REAL_FAULT = "mqtt:real:fault"; + //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛� + String MQTT_REAL_FAULT = "mqtt:real:fault:%s"; //service(cloud) //鍦ㄧ嚎瀹㈡埛绔� String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s"; - //鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛� - String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s"; +// String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s"; diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java index 3344466..0f1b58f 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java @@ -70,25 +70,24 @@ private IDryEquipmentService dryEquipmentService; - - @ApiOperation(value="娴嬭瘯", notes="杩斿洖Hello") + @ApiOperation(value = "娴嬭瘯", notes = "杩斿洖Hello") @GetMapping("/hello") public Result<?> sayHello() { return Result.ok("Hello"); } - @ApiOperation(value="鎺ユ敹瀹炴椂鏁版嵁Json", notes="璁惧瀹炴椂鏁版嵁涓婁紶") + @ApiOperation(value = "鎺ユ敹瀹炴椂鏁版嵁Json", notes = "璁惧瀹炴椂鏁版嵁涓婁紶") @PostMapping("/sendRealTimeDataJson") - public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo) { + public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo) { try { - if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){ + if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataVo).getBytes()); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA, mqttMessage); } - if ("user".equals(mqttConfig.getRole())){ + if ("user".equals(mqttConfig.getRole())) { //澶勭悊鏁呴殰淇℃伅 dryRealTimeDataService.fitFaultRecord(realTimeDataVo); } @@ -100,51 +99,32 @@ return dryRealTimeDataService.realTimeDataHandle(realTimeDataVo); } - @ApiOperation(value="鎺ユ敹瀹炴椂鏁版嵁Json", notes="璁惧瀹炴椂鏁版嵁涓婁紶") + @ApiOperation(value = "鎺ユ敹瀹炴椂鏁版嵁Json", notes = "璁惧瀹炴椂鏁版嵁涓婁紶") @PostMapping("/sendRealTimeDataJson2") - public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo) { - try { - if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){ - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(0); - mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataParentVo).getBytes()); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage); - //澶勭悊鏁呴殰淇℃伅 - dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo); - } - - if ("user".equals(mqttConfig.getRole()) && realTimeDataParentVo.getFault() != null){ - //澶勭悊鏁呴殰淇℃伅 - dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo); - } - - } catch (MqttException e) { - e.printStackTrace(); - } - + public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo) { return dryRealTimeDataService.realTimeDataHandle(realTimeDataParentVo); + } - - @ApiOperation(value="鑾峰彇璁惧瀹炴椂鏁版嵁", notes="閫氳繃绉熸埛ID鍜岃澶囩紪鐮佽幏鍙栧疄鏃舵暟鎹�") + @ApiOperation(value = "鑾峰彇璁惧瀹炴椂鏁版嵁", notes = "閫氳繃绉熸埛ID鍜岃澶囩紪鐮佽幏鍙栧疄鏃舵暟鎹�") @GetMapping("/getRealTimeData") public Result<?> queryMachineRealTimeData(RealTimeDataVo realTimeDataVo) { return dryRealTimeDataService.queryMachineRealTImeData(realTimeDataVo); } - @ApiOperation(value="鑾峰彇杞﹂棿缁熻鏁版嵁", notes="閫氳繃绉熸埛ID鑾峰彇杞﹂棿缁熻鏁版嵁") + @ApiOperation(value = "鑾峰彇杞﹂棿缁熻鏁版嵁", notes = "閫氳繃绉熸埛ID鑾峰彇杞﹂棿缁熻鏁版嵁") @GetMapping("/workshopStatistics") public Result<?> workshopStatistics(RealTimeDataVo realTimeDataVo) { return dryRealTimeDataService.queryWorkshopStatistics(realTimeDataVo); } - @ApiOperation(value="鑾峰彇鎵�鏈夋満鍙�", notes="閫氳繃绉熸埛ID鑾峰彇鎵�鏈夋満鍙版暟鎹�") + @ApiOperation(value = "鑾峰彇鎵�鏈夋満鍙�", notes = "閫氳繃绉熸埛ID鑾峰彇鎵�鏈夋満鍙版暟鎹�") @GetMapping("/queryAllEqps") public Result<?> queryAllEqps(DryEquipment equipment) { List<DryEquipment> dryEquipments = dryEquipmentService.queryEqusByTenantId(equipment); - return Result.OK(dryEquipments); + return Result.OK(dryEquipments); } @@ -157,7 +137,7 @@ * 1013 鐑鍚姩 1014 寮�闂ㄨ瀵� * 1015 鍑烘枡鎸夐挳 */ - @ApiOperation(value="鍙戦�佹帶鍒舵寚浠�", notes="鍚戞湇鍔$鍙戦�佹帶鍒舵寚浠わ紝鐢辨湇鍔$閫氳繃socket杞彂缁欐帶鍒舵ā鍧�") + @ApiOperation(value = "鍙戦�佹帶鍒舵寚浠�", notes = "鍚戞湇鍔$鍙戦�佹帶鍒舵寚浠わ紝鐢辨湇鍔$閫氳繃socket杞彂缁欐帶鍒舵ā鍧�") @PostMapping("/sendCommand") public Result<?> sendCommand(@RequestBody CommandMessageVo msgVo) { return dryRealTimeDataService.sendSocketMsg(msgVo); @@ -204,19 +184,20 @@ /** * 鏍规嵁璁惧鍜岀鎴锋煡璇㈣璁惧绫诲瀷鐨勫共鐕ラ厤鏂癸紝灏嗛厤鏂硅浆鎴恱ml鏍煎紡锛屼互瀛楃涓叉柟寮忚繑鍥� + * * @param tenantId * @param eqpCode * @return * @throws JAXBException */ - @ApiOperation(value="骞茬嚗閰嶆柟鑾峰彇", notes="骞茬嚗閰嶆柟涓嬪彂") + @ApiOperation(value = "骞茬嚗閰嶆柟鑾峰彇", notes = "骞茬嚗閰嶆柟涓嬪彂") @GetMapping(value = "/queryFormula") public Result<String> queryFormulaByEqpType(Integer tenantId, String eqpCode) throws JAXBException { //鑾峰彇request HttpServletRequest request = SpringContextUtils.getHttpServletRequest(); // 鑾峰彇璇锋眰涓绘満鐨処P鍦板潃 String ip = IpUtils.getIpAddr(request); - DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId+ "", eqpCode); + DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId + "", eqpCode); if (dryEquipment != null) { if (dryEquipment.getIp().equals(ip)) { } else { @@ -225,46 +206,46 @@ } else { return Result.error("璁惧涓嶅瓨鍦�"); } - LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>(); - queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType()) - .eq(DryHerbFormula::getTenantId, tenantId); - List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper); - Formulas formulas = new Formulas(); - list.forEach(item -> { - DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId()); - if (byId!=null) { - item.setPinyin(byId.getPinyin()); - item.setName(byId.getName()); - } - Formula formula = new Formula(); - BaseParam baseParam = new BaseParam(); - WaterParam waterParam = new WaterParam(); - TypeParam typeParam = new TypeParam(); - OffsetParam offsetParam = new OffsetParam(); - baseParam.setCode(item.getCode()); - baseParam.setIndex(item.getCode()); - baseParam.setName(item.getName()); - baseParam.setAb(item.getName()); - baseParam.setTyp(item.getCategory()); - waterParam.setDelay(Double.valueOf(item.getDelay())); - waterParam.setMoisture3(item.getTarget()); - waterParam.setWeight1(Double.valueOf(item.getFeed())); - waterParam.setTimes(item.getEt()); - waterParam.setTemp1(item.getWindTemp()); - waterParam.setTemp2(item.getEnvTemp()); - waterParam.setTemp3(item.getEnvHum()); - waterParam.setTurntime(item.getTurn()); - typeParam.setMtype(Integer.valueOf(item.getCategory())); - offsetParam.setMoisoffset(item.getMoisOffset()); - offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration())); + LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>(); + queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType()) + .eq(DryHerbFormula::getTenantId, tenantId); + List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper); + Formulas formulas = new Formulas(); + list.forEach(item -> { + DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId()); + if (byId != null) { + item.setPinyin(byId.getPinyin()); + item.setName(byId.getName()); + } + Formula formula = new Formula(); + BaseParam baseParam = new BaseParam(); + WaterParam waterParam = new WaterParam(); + TypeParam typeParam = new TypeParam(); + OffsetParam offsetParam = new OffsetParam(); + baseParam.setCode(item.getCode()); + baseParam.setIndex(item.getCode()); + baseParam.setName(item.getName()); + baseParam.setAb(item.getName()); + baseParam.setTyp(item.getCategory()); + waterParam.setDelay(Double.valueOf(item.getDelay())); + waterParam.setMoisture3(item.getTarget()); + waterParam.setWeight1(Double.valueOf(item.getFeed())); + waterParam.setTimes(item.getEt()); + waterParam.setTemp1(item.getWindTemp()); + waterParam.setTemp2(item.getEnvTemp()); + waterParam.setTemp3(item.getEnvHum()); + waterParam.setTurntime(item.getTurn()); + typeParam.setMtype(Integer.valueOf(item.getCategory())); + offsetParam.setMoisoffset(item.getMoisOffset()); + offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration())); - formula.setBaseParam(baseParam); - formula.setWaterParam(waterParam); - formula.setTypeParam(typeParam); - formula.setOffsetParam(offsetParam); - formulas.getDryFormulaList().add(formula); - }); - // 鎶妚os杞崲鎴恱ml + formula.setBaseParam(baseParam); + formula.setWaterParam(waterParam); + formula.setTypeParam(typeParam); + formula.setOffsetParam(offsetParam); + formulas.getDryFormulaList().add(formula); + }); + // 鎶妚os杞崲鎴恱ml // 鍒涘缓JAXBContext瀹炰緥 JAXBContext jaxbContext = JAXBContext.newInstance(Formulas.class); @@ -278,12 +259,12 @@ StringWriter writer = new StringWriter(); marshaller.marshal(formulas, writer); - return Result.OK("璇锋眰鎴愬姛",writer.toString()); + return Result.OK("璇锋眰鎴愬姛", writer.toString()); } - @ApiOperation(value="骞茬嚗閰嶆柟涓婃姤", notes="骞茬嚗閰嶆柟璁板綍涓婃姤") + @ApiOperation(value = "骞茬嚗閰嶆柟涓婃姤", notes = "骞茬嚗閰嶆柟璁板綍涓婃姤") @PostMapping(value = "/sendFormulaHistory") public Result<?> sendFormulaHistory(DryHerbFormulaHisVo hisVo) { //鑾峰彇request diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java index f669d79..a490a0a 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java @@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; import org.springframework.data.redis.core.RedisTemplate; import java.util.*; @@ -101,8 +102,8 @@ mqttClient.subscribe(MqttConstant.MOBILE_UP); System.err.println("admin璁㈤槄" + MqttConstant.MOBILE_UP); // 璁㈤槄绉熸埛瀹炴椂鏁版嵁 - mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); - System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); + mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); + System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); // 璁㈤槄绉熸埛鎶ヨ鏁版嵁 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); System.out.println("admin璁㈤槄" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); @@ -178,15 +179,17 @@ for (int i = 0; i < data.size(); i++) { JSONObject obj = data.getJSONObject(i); JSONObject item = new JSONObject(); + //clientid + String clientid = obj.getString("clientid"); + item.put("clientid", clientid); + //TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪 + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; //username item.put("username", obj.get("username")); //杩炴帴鏃堕棿 String st = obj.getString("connected_at"); String upTime = DateUtils.zone2Str(st); item.put("connectedAt", upTime); - //clientid - String clientid = obj.getString("clientid"); - item.put("clientid", clientid); //鏄惁杩炴帴 Boolean connected = obj.getBoolean("connected"); item.put("connected", connected); @@ -195,7 +198,7 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); - //item.put("code", info[2]); + item.put("code", info[2]); if (connected) { redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index b381df7..4558603 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -42,6 +42,9 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; + + + @Autowired private MqttUtil mqttUtil; @Autowired @@ -76,8 +79,8 @@ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { - System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" - + new String(mqttMessage.getPayload())); +// System.out.println("鏀跺埌娑堟伅: \n topic锛�" + topic + "\n Qos锛�" + mqttMessage.getQos() + "\n payload锛�" +// + new String(mqttMessage.getPayload())); switch (role) { // 绠$悊鍛� @@ -98,6 +101,8 @@ //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); + // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞� + if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //鏄惁杩炴帴 item.put("connected", true); //鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000) @@ -105,12 +110,14 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); - //item.put("code", info[2]); + item.put("code", info[2]); + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); + System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); } catch (Exception e) { e.printStackTrace(); } - redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); - System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); + + } } @@ -204,16 +211,16 @@ }); break; - // 鎺ユ敹璁惧瀹炴椂鏁版嵁 + // 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: - ThreadUtil.execute(() -> { - try { - RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(vo); - } catch (Exception e) { - e.printStackTrace(); - } - }); +// ThreadUtil.execute(() -> { +// try { +// RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); +// realTimeDataService.realTimeDataHandle(vo); +// } catch (Exception e) { +// e.printStackTrace(); +// } +// }); break; // 鎺ユ敹璁惧瀹炴椂鏁版嵁-鏈哄彴 @@ -221,40 +228,42 @@ ThreadUtil.execute(() -> { try { RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); - realTimeDataService.realTimeDataHandle(vo); + synchronized (realTimeDataService) { + realTimeDataService.realTimeDataHandle(vo); + } } catch (Exception e) { e.printStackTrace(); } }); break; - //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 + //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 TODO 20250721鏆備笉浣跨敤锛岀粺涓�浣跨敤TENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { - MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { - }); - //鏁呴殰鏁版嵁 - Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); - //绉熸埛id - String tenantId = realFaultMessage.getTentId(); +// MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { +// }); +// //鏁呴殰鏁版嵁 +// Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); +// //绉熸埛id +// String tenantId = realFaultMessage.getTentId(); //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis - //杞崲涓� Map<String, Object> - Map<String, Object> objectMap = dryFaultMap.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> (Object) entry.getValue() - )); - redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); - //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if (dryFaultMap.isEmpty()) { - return; - } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); - //鏁版嵁杞崲 - List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); - //鍙戦�佸箍鎾� - System.err.println("骞挎挱缁欙細" + recTopic); - sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); +// //杞崲涓� Map<String, Object> +// Map<String, Object> objectMap = dryFaultMap.entrySet().stream() +// .collect(Collectors.toMap( +// Map.Entry::getKey, +// entry -> (Object) entry.getValue() +// )); +// redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); +// //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 +// if (dryFaultMap.isEmpty()) { +// return; +// } +// String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); +// //鏁版嵁杞崲 +// List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); +// MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); +// //鍙戦�佸箍鎾� +// System.err.println("骞挎挱缁欙細" + recTopic); +// sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } break; //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 @@ -263,7 +272,7 @@ if (req.toString().isEmpty() || tenantId == null) { return; } - Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); //杞崲涓� Map<String, DryFaultRecordVo> Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() .collect(Collectors.toMap( diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java index b744404..bdfb9f8 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java @@ -1,7 +1,10 @@ package org.jeecg.modules.dry.mqtt; +import cn.hutool.core.thread.ThreadUtil; +import com.alibaba.fastjson.JSONObject; import lombok.Data; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Component @@ -9,4 +12,30 @@ public class MqttUtil { public MqttClient mqttClient; + /** + * 鍙戦�佹秷鎭� + * + * @param topic 璁㈤槄 + * @param mqMessage 娑堟伅浣� + * @param type 1-鍙戦�佺粰绉熸埛 2-鍙戦�佺粰鍥哄畾id + */ + public void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { + ThreadUtil.execute(() -> { + try { + if (type == 1) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttClient.publish(String.format(topic, mqMessage.getTentId()), sendMessage); + } else if (type == 2) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttClient.publish(topic, sendMessage); + } + + } catch (Exception e) { + e.printStackTrace(); + } + }); + } + } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 58b10ca..9178cb4 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.session.IoSession; @@ -23,6 +24,7 @@ import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqMessage; +import org.jeecg.modules.dry.mqtt.MqttConfig; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; @@ -54,6 +56,9 @@ private IDryEquipmentService equipmentService; @Autowired + private IDryEqpTypeService dryEqpTypeService; + + @Autowired private RedisUtil redisUtil; @Autowired @@ -67,6 +72,7 @@ @Value(value = "${jeecg.mqtt.role}") private String role; + @Autowired private MqttUtil mqttUtil; @@ -90,24 +96,24 @@ @Override @Transactional public Result<?> realTimeDataHandle(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid()+""); - log.info("瀹炴椂鏁版嵁锛�"+realTimeDataVo.toString()); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); + log.info("瀹炴椂鏁版嵁锛�" + realTimeDataVo.toString()); // 1 鏌ヨ鎴栧垱寤哄伐鍗� // 1.1 浠巖edis鍙栧嚭宸ュ崟缂撳瓨 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), - realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid()); + realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid()); // 1.2 濡傛灉鏈夌紦瀛樿褰� - if(orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) { + if (orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) { - // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱 + // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱 } else { // 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉″啀杩斿洖 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { - log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細"+realTimeDataVo.getWorkorder()+",璁惧锛�" + realTimeDataVo.getMachineid() +",鑽潗锛�" + realTimeDataVo.getName()); + log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細" + realTimeDataVo.getWorkorder() + ",璁惧锛�" + realTimeDataVo.getMachineid() + ",鑽潗锛�" + realTimeDataVo.getName()); return Result.error("宸ュ崟涓嶅瓨鍦�"); } @@ -147,27 +153,25 @@ orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2()); // 2.3 鏇存柊鍒皉edis缂撳瓨 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), - realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60); + realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60); return Result.ok(); } - - @Override @Transactional public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) { - TenantContext.setTenant(realTimeDataParentVo.getTenantid()+""); - log.info("瀹炴椂鏁版嵁锛�"+realTimeDataParentVo.toString()); + TenantContext.setTenant(realTimeDataParentVo.getTenantid() + ""); + log.info("瀹炴椂鏁版嵁锛�" + realTimeDataParentVo.toString()); if (realTimeDataParentVo.getRealTime() != null) { RealTimeDataVo realTimeDataVo = realTimeDataParentVo.getRealTime(); // 1 鏌ヨ鎴栧垱寤哄伐鍗� // 1.1 浠巖edis鍙栧嚭宸ュ崟缂撳瓨 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), - realTimeDataParentVo.getTenantid()+"_"+realTimeDataParentVo.getMachineid()); + realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid()); // 1.2 濡傛灉鏈夌紦瀛樿褰� - if(orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { + if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { // 1.3 娌℃湁缂撳瓨璁板綍鍐嶆煡璇㈡暟鎹簱 } else { @@ -175,7 +179,7 @@ orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { - log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細"+realTimeDataParentVo.getWorkorder()+",璁惧锛�" + realTimeDataParentVo.getMachineid() +",鑽潗锛�" + realTimeDataVo.getName()); + log.error("宸ュ崟涓嶅瓨鍦紝宸ュ崟鍙凤細" + realTimeDataParentVo.getWorkorder() + ",璁惧锛�" + realTimeDataParentVo.getMachineid() + ",鑽潗锛�" + realTimeDataVo.getName()); return Result.error("宸ュ崟涓嶅瓨鍦�"); } @@ -214,18 +218,22 @@ // 2.3 鏇存柊鍒皉edis缂撳瓨 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), - realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60); + realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60); } if (realTimeDataParentVo.getReport() != null) { saveReport(realTimeDataParentVo); + } + + if (realTimeDataParentVo.getFault() != null) { + fitFaultRecord(realTimeDataParentVo); } return Result.ok(); } private void saveReport(RealTimeDataParentVo realTimeDataParentVo) { RealTimeReportVo report = realTimeDataParentVo.getReport(); - if(report.getReport_flag()) { + if (report.getReport_flag()) { DryProdRecord prodRecord = new DryProdRecord(); prodRecord.setReportHeadName(report.getReport_head_name()); prodRecord.setReportHeadBatch(report.getReport_head_batch()); @@ -236,24 +244,24 @@ prodRecord.setReportHeadLeader(report.getReport_head_leader()); prodRecord.setReportHeadTecher(report.getReport_head_techer()); - prodRecord.setReportCheckField(report.getReport_check_field()?1:0); - prodRecord.setReportCheckFile(report.getReport_check_file()?1:0); - prodRecord.setReportCheckTag(report.getReport_check_tag()?1:0); - prodRecord.setReportCheckTool(report.getReport_check_tool()?1:0); + prodRecord.setReportCheckField(report.getReport_check_field() ? 1 : 0); + prodRecord.setReportCheckFile(report.getReport_check_file() ? 1 : 0); + prodRecord.setReportCheckTag(report.getReport_check_tag() ? 1 : 0); + prodRecord.setReportCheckTool(report.getReport_check_tool() ? 1 : 0); prodRecord.setReportCheckMan(report.getReport_check_man()); - prodRecord.setReportCheckStatus(report.getReport_check_status()?1:0); + prodRecord.setReportCheckStatus(report.getReport_check_status() ? 1 : 0); prodRecord.setReportCheckQa(report.getReport_check_qa()); prodRecord.setReportCheckRecord(report.getReport_check_record()); - prodRecord.setReportProductView(report.getReport_product_view()?1:0); - prodRecord.setReportProductWind(report.getReport_product_wind()?1:0); - prodRecord.setReportProductSun(report.getReport_product_sun()?1:0); - prodRecord.setReportProductLowDry(report.getReport_product_low_dry()?1:0); - prodRecord.setReportProductDry(report.getReport_product_dry()?1:0); + prodRecord.setReportProductView(report.getReport_product_view() ? 1 : 0); + prodRecord.setReportProductWind(report.getReport_product_wind() ? 1 : 0); + prodRecord.setReportProductSun(report.getReport_product_sun() ? 1 : 0); + prodRecord.setReportProductLowDry(report.getReport_product_low_dry() ? 1 : 0); + prodRecord.setReportProductDry(report.getReport_product_dry() ? 1 : 0); prodRecord.setReportProductStart(report.getReport_product_start()); prodRecord.setReportProductEnd(report.getReport_product_end()); prodRecord.setReportProductTotal(report.getReport_product_total()); - prodRecord.setReportProductCheck(report.getReport_product_check()?1:0); + prodRecord.setReportProductCheck(report.getReport_product_check() ? 1 : 0); prodRecord.setReportProductMan1(report.getReport_product_man1()); prodRecord.setReportProductMan2(report.getReport_product_man2()); prodRecord.setReportProductWeight(report.getReport_product_weight()); @@ -261,15 +269,15 @@ prodRecord.setReportProductUse(report.getReport_product_use()); prodRecord.setReportProductQa(report.getReport_product_qa()); - prodRecord.setReportCleanMachine(report.getReport_clean_machine()?1:0); - prodRecord.setReportCleanWaste(report.getReport_clean_waste()?1:0); - prodRecord.setReportCleanTool(report.getReport_clean_tool()?1:0); - prodRecord.setReportCleanDoor(report.getReport_clean_door()?1:0); - prodRecord.setReportCleanBox(report.getReport_clean_box()?1:0); - prodRecord.setReportCleanRecord(report.getReport_clean_record()?1:0); + prodRecord.setReportCleanMachine(report.getReport_clean_machine() ? 1 : 0); + prodRecord.setReportCleanWaste(report.getReport_clean_waste() ? 1 : 0); + prodRecord.setReportCleanTool(report.getReport_clean_tool() ? 1 : 0); + prodRecord.setReportCleanDoor(report.getReport_clean_door() ? 1 : 0); + prodRecord.setReportCleanBox(report.getReport_clean_box() ? 1 : 0); + prodRecord.setReportCleanRecord(report.getReport_clean_record() ? 1 : 0); prodRecord.setReportCleanDate(report.getReport_clean_date()); prodRecord.setReportCleanMan(report.getReport_clean_man()); - prodRecord.setReportCleanConfirm(report.getReport_clean_confirm()?1:0); + prodRecord.setReportCleanConfirm(report.getReport_clean_confirm() ? 1 : 0); prodRecord.setReportCleanQa(report.getReport_clean_qa()); prodRecordService.save(prodRecord); } @@ -278,10 +286,12 @@ /** * 鏍规嵁绉熸埛id鍜屽伐鍗曞彿鏌ヨ鏁版嵁搴撴槸鍚︽湁璁板綍锛屾湁鍒欒繑鍥烇紝娌℃湁鍒欐柊澧炰竴鏉� + * * @param realTimeDataVo * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { + TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); @@ -293,7 +303,8 @@ // 杞崲涓虹紦瀛樻暟鎹粨鏋� orderVo = BeanUtil.toBean(one, DryOrderVo.class); if (one.getTemps() != null) { - Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){}); + Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() { + }); orderVo.setBellowsTemp(map); } // 鏌ヨ绉伴噸璁板綍锛屾坊鍔犲埌缂撳瓨鏁版嵁缁撴瀯 @@ -303,7 +314,7 @@ orderVo.setTrendVo(oldVo); orderVo.setDetailList(trendVos); } - // 3 鏁版嵁搴撴病鏈夊垯鏂板涓�鏉℃暟鎹� + // 3 鏁版嵁搴撴病鏈夊垯鏂板涓�鏉℃暟鎹� } else { orderVo = saveNewOrder(realTimeDataVo); @@ -313,23 +324,60 @@ /** * 淇濆瓨鏂板伐鍗� + * * @param realTimeDataVo * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { + TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; // 鏌ヨ璁惧 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); if (equ == null) { - log.error("鏈壘鍒拌澶囷細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid()); - return null; + log.error("鏈壘鍒拌澶囷細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid()); + log.error("鏂板璁惧锛�"); + if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) { + log.error("鏂板璁惧澶辫触锛氳澶嘔D鎴栫鎴稩D涓虹┖锛乵achineid={}, tenantid={}", + realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid()); + return null; + } + + DryEquipment addEqu = new DryEquipment(realTimeDataVo); + + try { + String digits = StringUtils.getDigits(realTimeDataVo.getMachineid()); + addEqu.setName(Integer.parseInt(digits) + "#骞茬嚗璁惧"); + } catch (NumberFormatException e) { + log.error("璁惧ID鏍煎紡閿欒锛屾棤娉曟彁鍙栨暟瀛楅儴鍒嗭細machineid={}", realTimeDataVo.getMachineid(), e); + return null; + } + + DryEqpType eqpType = dryEqpTypeService.getOne( + new LambdaQueryWrapper<DryEqpType>() + .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) + ); + if(eqpType == null){ + log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() ); + return null; + } + + Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); + + if (!equipmentService.save(addEqu)) { + log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu); + return null; + } + equ = addEqu; + + log.info("鏂板璁惧鎴愬姛锛歟quipmentId={}", addEqu.getId()); + } // 鏌ヨ鑽潗 - DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); + DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); if (herbFormula == null) { - log.error("鏈壘鍒拌嵂鏉愶細"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",鏈哄彴锛�" + realTimeDataVo.getMachineid()); + log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid()); return null; } // 鍒涘缓鏂板伐鍗� @@ -345,6 +393,7 @@ /** * 鏌ヨ璁惧锛屾柊璁惧鍒欐坊鍔犲埌璁惧涓绘暟鎹� + * * @param realTimeDataVo * @return */ @@ -354,7 +403,7 @@ queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); DryEquipment one = equipmentService.getOne(queryWrapper); if (one == null) { - log.error(role+"淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�"+realTimeDataVo.getMachineid()); + log.error(role + "淇濆瓨瀹炴椂鏁版嵁锛屾湭鎵惧埌璁惧锛�" + realTimeDataVo.getMachineid()); // one = new DryEquipment(realTimeDataVo); // equipmentService.save(one); if (MqttConstant.ROLE_ADMIN.equals(role)) { @@ -365,8 +414,10 @@ object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { - mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage); - }catch (MqttException e) { + if(mqttEnable){ + mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); + } + } catch (MqttException e) { e.printStackTrace(); } @@ -378,6 +429,7 @@ /** * 鏌ヨ鑽潗锛屾柊鑽潗娣诲姞鍒版暟鎹簱 + * * @param realTimeDataVo * @return */ @@ -389,7 +441,10 @@ if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); - one.setEqpType(dryEquipment.getType()); + if (dryEquipment!=null&&dryEquipment.getType()!=null) { + one.setEqpType(dryEquipment.getType()); + } + dryHerbFormulaService.save(one); } return one; @@ -397,14 +452,15 @@ /** * 淇濆瓨鍚按鐜囧彉鍖栬褰� + * * @param trendVo * @param orderVo */ private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) { //鍒ゆ柇 瀹炴椂鍚按鐜� 鎴� 瀹炴椂閲嶉噺鏈夋病鏈夊彉鍖栵紝鏈夊彉鍖栧垯鏇存柊 - if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 - || orderVo.getTrendVo()!=null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() - ) { + if (orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 + || orderVo.getTrendVo() != null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() + ) { DryOrder byId = dryOrderService.getById(orderVo.getId()); // 灏嗘渶鏂扮粨鏋滄洿鏂板埌宸ュ崟 if (byId != null) { @@ -422,12 +478,13 @@ /** * 鏌ヨ鏈哄彴瀹炴椂鏁版嵁 + * * @param realTimeDataVo * @return */ @Override public Result<?> queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) { - TenantContext.setTenant(realTimeDataVo.getTenantid()+""); + TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); // 鏌ヨ鎵�鏈夋満鍙�,鏌ヨ璇彞缁勮 LambdaQueryWrapper<DryEquipment> queryWrapper = new LambdaQueryWrapper<>(); @@ -449,13 +506,13 @@ dryEquipments.stream().forEach(item -> { // 鑾峰彇宸ュ崟 DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); - list.add(item.getName().substring(0, item.getName().indexOf('#')+1)); + list.add(item.getName().substring(0, item.getName().indexOf('#') + 1)); if (order != null) { // 璁$畻骞茬嚗鏁堢巼锛岀敤浜庡姣� DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1); double v = order.getOriginWeight() - dryOrderTrendVo.getWeight(); - if (v > 0 && dryOrderTrendVo.getTotalTime()>0) { + if (v > 0 && dryOrderTrendVo.getTotalTime() > 0) { DecimalFormat df = new DecimalFormat("#.00"); dList.add(Double.valueOf(df.format(v / dryOrderTrendVo.getTotalTime() * 60))); } else { @@ -477,7 +534,7 @@ // 鏌ヨ杩戝崄娆℃晥鐜囧拰鑳借兘鑰楀钩鍧� dryOrderService.queryRecentOrderAvg(orderVo); } - }catch (Exception e) { + } catch (Exception e) { e.printStackTrace(); } return Result.ok(orderVo); @@ -491,27 +548,27 @@ DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(msgVo.getTenantId() + "", msgVo.getMachineId()); log.info("鑾峰彇璁惧锛�" + dryEquipment.toString()); - // managedSessions.keySet().forEach(addr -> { - // ObjectOutputStream oos = null; - try { + // managedSessions.keySet().forEach(addr -> { + // ObjectOutputStream oos = null; + try { // Socket socket = SocketServerConfig.clientMap.get(addr); - IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp()); - if (session == null) { - return Result.error("鏈幏鍙栧埌session,璇锋鏌ュ鎴风閰嶇疆鎴栬澶噄p閰嶇疆鏄惁姝e父"); - } - SocketMsgVo smv = new SocketMsgVo(msgVo); - session.write(JSONObject.toJSONString(smv)); + IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp()); + if (session == null) { + return Result.error("鏈幏鍙栧埌session,璇锋鏌ュ鎴风閰嶇疆鎴栬澶噄p閰嶇疆鏄惁姝e父"); + } + SocketMsgVo smv = new SocketMsgVo(msgVo); + session.write(JSONObject.toJSONString(smv)); // oos = new ObjectOutputStream(socket.getOutputStream()); // String s = JSONObject.toJSONString(new SocketMsgVo(msgVo)); // oos.writeUTF(s); // oos.flush(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { + } catch (Exception e) { + throw new RuntimeException(e); + } finally { - } - // }); + } + // }); return Result.OK(); } @@ -530,15 +587,15 @@ orderVo.setEnvHum(order.getEnvHum()); orderVo.setEnvTemp(order.getEnvTemp()); double watt = order.getWatt() - order.getDetailList().get(0).getWatt(); - orderVo.setWatt(orderVo.getWatt()==null? watt : orderVo.getWatt() + watt); + orderVo.setWatt(orderVo.getWatt() == null ? watt : orderVo.getWatt() + watt); double steam = order.getSteam() - order.getDetailList().get(0).getSteam(); - orderVo.setSteam(orderVo.getSteam()==null? steam : orderVo.getSteam() + steam); - orderVo.setOriginWeight(orderVo.getOriginWeight()==null? order.getOriginWeight(): orderVo.getOriginWeight() + order.getOriginWeight()); + orderVo.setSteam(orderVo.getSteam() == null ? steam : orderVo.getSteam() + steam); + orderVo.setOriginWeight(orderVo.getOriginWeight() == null ? order.getOriginWeight() : orderVo.getOriginWeight() + order.getOriginWeight()); - double yield = order.getOriginWeight()*(1-(order.getInitial()/100))/(1-(order.getTarget()/100)); - orderVo.setYield(orderVo.getYield()==null? yield: orderVo.getYield() + yield); + double yield = order.getOriginWeight() * (1 - (order.getInitial() / 100)) / (1 - (order.getTarget() / 100)); + orderVo.setYield(orderVo.getYield() == null ? yield : orderVo.getYield() + yield); double sub = order.getOriginWeight() - order.getYield(); - orderVo.setReduce(orderVo.getReduce()==null? sub: orderVo.getReduce() + sub); + orderVo.setReduce(orderVo.getReduce() == null ? sub : orderVo.getReduce() + sub); } @@ -555,7 +612,7 @@ @Override public Result<?> fitFaultRecord(RealTimeDataVo vo) { - TenantContext.setTenant(vo.getTenantid()+""); + TenantContext.setTenant(vo.getTenantid() + ""); ThreadUtil.execute(() -> { try { //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 @@ -565,30 +622,28 @@ //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 - Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); - if(mqttEnable && !toCloudFaultMap.isEmpty()){ - MqMessage< Map<Object, Object>> message = new MqMessage<>(); - message.setData(toCloudFaultMap); - message.setTentId(vo.getTenantid()+""); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(0); - mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); - } - - - //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰 - if(!faultRecords1.isEmpty()){ - MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); - message.setData(faultRecords1); - message.setTentId(vo.getTenantid()+""); + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); + if (mqttEnable && !toCloudFaultMap.isEmpty()) { + MqMessage<Map<Object, Object>> message = new MqMessage<>(); + message.setData(toCloudFaultMap); + message.setTentId(vo.getTenantid() + ""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); - mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); + mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA, mqttMessage); } + //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰 + if (!faultRecords1.isEmpty()) { + MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); + message.setData(faultRecords1); + message.setTentId(vo.getTenantid() + ""); + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(0); + mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA, mqttMessage); + } } catch (Exception e) { @@ -601,41 +656,47 @@ @Override public void fitFaultRecord(RealTimeDataParentVo vo) { - TenantContext.setTenant(vo.getTenantid()+""); + TenantContext.setTenant(vo.getTenantid() + ""); ThreadUtil.execute(() -> { try { //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 - List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); - List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); - faultRecords1.addAll(faultRecords2); - - - //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 - Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); - if(mqttEnable && !toCloudFaultMap.isEmpty()){ - MqMessage< Map<Object, Object>> message = new MqMessage<>(); - message.setData(toCloudFaultMap); - message.setTentId(vo.getTenantid()+""); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(0); - mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); + List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); + List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); + if(!errorList.isEmpty()){ + log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString()); + } + if(!warnList.isEmpty()){ + log.error("淇濆瓨鍛婅锛歿}", warnList.toString()); } + //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊 + if(!mqttEnable)return; - //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰 - if(!faultRecords1.isEmpty()){ - MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); - message.setData(faultRecords1); - message.setTentId(vo.getTenantid()+""); - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(0); - mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); + + + //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); + + + Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo)entry.getValue() + )); + + String tenantId = vo.getTenantid() +""; + + //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 + if (dryFaultMap.isEmpty()) { + return; } - - - + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); + //鍙戦�佸箍鎾� + log.error("骞挎挱缁欙細{}" , recTopic); + mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } catch (Exception e) { e.printStackTrace(); @@ -648,20 +709,21 @@ /** * 瑙f瀽瀛樺偍鏁呴殰鏁版嵁 * TODO 淇濊瘉鍘熷瓙鎬� - * @param fault 鏁呴殰鏁版嵁 - * @param orderId 宸ュ崟 - * @param tenantId 绉熸埛 + * + * @param fault 鏁呴殰鏁版嵁 + * @param orderId 宸ュ崟 + * @param tenantId 绉熸埛 * @param machineId 璁惧 * @param faultType 鏁呴殰绫诲瀷 * @return 缁勮濂芥晠闅滄暟鎹� */ - private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){ + private List<DryFaultRecord> fitFault(String fault, String orderId, Integer tenantId, String machineId, Integer faultType) { List<DryFaultRecord> result = new ArrayList<>(); - if(StringUtils.isEmpty(fault))return result; + if (StringUtils.isEmpty(fault)) return result; //鏁版嵁鏍锋湰锛�"eqp_fault": "婊氱瓛闄嶈秴鏃�-鎶ヨ,椋庢満杩囨祦鎶ヨ,婊氱瓛鍗囪秴鏃�-鎶ヨ,椋庣鍗囨姤璀�", - System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime()+"--"+fault); + System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "--" + fault); //redis涓殑鏁呴殰 - Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); + Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); Map<String, Object> redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // 閿浆鎹负瀛楃涓� @@ -669,42 +731,45 @@ )); //娌℃湁鐢熸垚宸ュ崟鐨勬晠闅滄暟鎹笉瀛樺偍 - if(StringUtils.isEmpty(orderId)){ + if (StringUtils.isEmpty(orderId)) { return result; } - if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){ + if (StringUtils.isEmpty(fault) && rFauMap.isEmpty()) { return result; } //1.瑙f瀽鏁版嵁 String[] eqpFaults = fault.split(","); - Map<String,DryFaultRecord> addFauMap = new HashMap<>(); - Map<String,DryFaultRecord> realFauMap = new HashMap<>(); + Map<String, DryFaultRecord> addFauMap = new HashMap<>(); + Map<String, DryFaultRecord> realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //閬垮厤绌哄瓧绗︿覆 - if(StringUtils.isEmpty(eqpFault.trim())) continue; + if (StringUtils.isEmpty(eqpFault.trim())) continue; //1.1妫�鏌qtt涓槸鍚﹀凡瀛樺湪杩欎釜鏁呴殰 - String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim(); + String redisKey = String.format("%s_%s_%s", tenantId, machineId, eqpFault).trim(); realFauMap.put(redisKey, new DryFaultRecord()); - DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); + DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey); //1.2濡傛灉redis涓嶅瓨鍦ㄥ垯瀛樺叆锛堝瓨鏁呴殰寮�濮嬶級 - if(rFault ==null){ + if (rFault == null) { //缁勮缂撳瓨鏁版嵁 // DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); // addFauMap.put(redisKey,faultRecord); Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); - DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); - addFauMap.put(redisKey,vo); - }else { + DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); + addFauMap.put(redisKey, vo); + } else { + //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂� + + //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級 - if(rFault.getECount()!=null && rFault.getECount() > 1){ + if (rFault.getECount() != null && rFault.getECount() > 1) { rFault.setECount(1); - redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); - System.err.println("鎶ヨ娆℃暟閲嶇疆 clear clear 锛宬ey-"+redisKey); + redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault); + System.err.println("鎶ヨ娆℃暟閲嶇疆 clear clear 锛宬ey-" + redisKey); } } @@ -715,40 +780,38 @@ //鍚堝苟鏁版嵁 addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //娌℃湁鏂版晠闅滄暟鎹笉鐢ㄨ鐩� - if(!addFauMap.isEmpty()){ - redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); + if (!addFauMap.isEmpty()) { + redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap); } //2妫�娴嬪凡缁撴潫鐨勬晠闅� //2.1濡傛灉瀹炴椂鏁版嵁涓嶅瓨鍦╮edis瀛樺湪鍒欎唬琛ㄦ晠闅滅粨鏉燂紝瀛樺叆鏁版嵁搴� - Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); + Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); curFauMap.keySet().stream() //鐗瑰埆娉ㄦ剰锛屽涓姤璀︾被鍨嬪叡鐢ㄦ柟娉曢渶瑕佸尯鍒嗙被鍨� - .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) + .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { - DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); - vo.setECount(vo.getECount()+1); - if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ + DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key); + vo.setECount(vo.getECount() + 1); + if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { //鏇存柊娆℃暟 - redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); - System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-"+key); + redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); + System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-" + key); } - if(vo.getECount()>=3){ + if (vo.getECount() >= 3) { vo.setEndTime(new Date()); //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹� faultRecordService.save(vo); - redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); + redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); result.add(vo); - System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime()+"瀛樺叆鏁版嵁搴�"); + System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime() + "瀛樺叆鏁版嵁搴�"); } }); - return result; } - } -- Gitblit v1.9.3