jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -97,16 +97,15 @@ /**************************start*******************************/ /**************************end*******************************/ //redis缓存 //client String MQTT_REAL_FAULT = "mqtt:real:fault"; //所有租户的实时报警(%s:租户id) String MQTT_REAL_FAULT = "mqtt:real:fault:%s"; //service(cloud) //在线客户端 String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s"; //所有租户的实时报警(%s:租户id) String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s"; // String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s"; jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
@@ -70,7 +70,6 @@ private IDryEquipmentService dryEquipmentService; @ApiOperation(value="测试", notes="返回Hello") @GetMapping("/hello") public Result<?> sayHello() { @@ -103,28 +102,9 @@ @ApiOperation(value="接收实时数据Json", notes="设备实时数据上传") @PostMapping("/sendRealTimeDataJson2") public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo) { try { if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){ MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataParentVo).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage); //处理故障信息 dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo); } if ("user".equals(mqttConfig.getRole()) && realTimeDataParentVo.getFault() != null){ //处理故障信息 dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo); } } catch (MqttException e) { e.printStackTrace(); } return dryRealTimeDataService.realTimeDataHandle(realTimeDataParentVo); } } @ApiOperation(value="获取设备实时数据", notes="通过租户ID和设备编码获取实时数据") @@ -204,6 +184,7 @@ /** * 根据设备和租户查询该设备类型的干燥配方,将配方转成xml格式,以字符串方式返回 * * @param tenantId * @param eqpCode * @return jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.data.redis.core.RedisTemplate; import java.util.*; @@ -101,8 +102,8 @@ mqttClient.subscribe(MqttConstant.MOBILE_UP); System.err.println("admin订阅" + MqttConstant.MOBILE_UP); // 订阅租户实时数据 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP); // 订阅租户报警数据 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); @@ -178,15 +179,17 @@ for (int i = 0; i < data.size(); i++) { JSONObject obj = data.getJSONObject(i); JSONObject item = new JSONObject(); //clientid String clientid = obj.getString("clientid"); item.put("clientid", clientid); //TODO 校验租户id是否存在 if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; //username item.put("username", obj.get("username")); //连接时间 String st = obj.getString("connected_at"); String upTime = DateUtils.zone2Str(st); item.put("connectedAt", upTime); //clientid String clientid = obj.getString("clientid"); item.put("clientid", clientid); //是否连接 Boolean connected = obj.getBoolean("connected"); item.put("connected", connected); @@ -195,7 +198,7 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); item.put("code", info[2]); if (connected) { redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -42,6 +42,9 @@ public class MqttSampleCallback implements MqttCallback { @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Autowired @@ -76,8 +79,8 @@ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" + new String(mqttMessage.getPayload())); // System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" // + new String(mqttMessage.getPayload())); switch (role) { // 管理员 @@ -98,6 +101,8 @@ //clientid String clientid = messageJson.getString("clientid"); item.put("clientid", clientid); // 不符合的设备不进行管理 if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; //是否连接 item.put("connected", true); //根据clientid解析(注意配置文件中clientid格式 例:client-1000) @@ -105,12 +110,14 @@ String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); item.put("code", info[2]); redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } catch (Exception e) { e.printStackTrace(); } redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } } @@ -204,16 +211,16 @@ }); break; // 接收设备实时数据 // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: ThreadUtil.execute(() -> { try { RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); realTimeDataService.realTimeDataHandle(vo); } catch (Exception e) { e.printStackTrace(); } }); // ThreadUtil.execute(() -> { // try { // RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); // realTimeDataService.realTimeDataHandle(vo); // } catch (Exception e) { // e.printStackTrace(); // } // }); break; // 接收设备实时数据-机台 @@ -221,40 +228,42 @@ ThreadUtil.execute(() -> { try { RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); synchronized (realTimeDataService) { realTimeDataService.realTimeDataHandle(vo); } } catch (Exception e) { e.printStackTrace(); } }); break; //各租户上传的实时报警数据 //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { }); //故障数据 Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); //租户id String tenantId = realFaultMessage.getTentId(); // MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { // }); // //故障数据 // Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); // //租户id // String tenantId = realFaultMessage.getTentId(); //收到租户实时报警数据存入redis //转换为 Map<String, Object> Map<String, Object> objectMap = dryFaultMap.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> (Object) entry.getValue() )); redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); //广播发送给各租户下移动设备 if (dryFaultMap.isEmpty()) { return; } String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); //数据转换 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); //发送广播 System.err.println("广播给:" + recTopic); sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); // //转换为 Map<String, Object> // Map<String, Object> objectMap = dryFaultMap.entrySet().stream() // .collect(Collectors.toMap( // Map.Entry::getKey, // entry -> (Object) entry.getValue() // )); // redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); // //广播发送给各租户下移动设备 // if (dryFaultMap.isEmpty()) { // return; // } // String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); // //数据转换 // List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); // MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); // //发送广播 // System.err.println("广播给:" + recTopic); // sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } break; //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据) @@ -263,7 +272,7 @@ if (req.toString().isEmpty() || tenantId == null) { return; } Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); //转换为 Map<String, DryFaultRecordVo> Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() .collect(Collectors.toMap( jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
@@ -1,7 +1,10 @@ package org.jeecg.modules.dry.mqtt; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSONObject; import lombok.Data; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Component @@ -9,4 +12,30 @@ public class MqttUtil { public MqttClient mqttClient; /** * 发送消息 * * @param topic 订阅 * @param mqMessage 消息体 * @param type 1-发送给租户 2-发送给固定id */ public void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { ThreadUtil.execute(() -> { try { if (type == 1) { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); sendMessage.setQos(0); mqttClient.publish(String.format(topic, mqMessage.getTentId()), sendMessage); } else if (type == 2) { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); sendMessage.setQos(0); mqttClient.publish(topic, sendMessage); } } catch (Exception e) { e.printStackTrace(); } }); } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.session.IoSession; @@ -23,6 +24,7 @@ import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqMessage; import org.jeecg.modules.dry.mqtt.MqttConfig; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; @@ -54,6 +56,9 @@ private IDryEquipmentService equipmentService; @Autowired private IDryEqpTypeService dryEqpTypeService; @Autowired private RedisUtil redisUtil; @Autowired @@ -67,6 +72,7 @@ @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @@ -152,8 +158,6 @@ } @Override @Transactional public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) { @@ -220,6 +224,10 @@ if (realTimeDataParentVo.getReport() != null) { saveReport(realTimeDataParentVo); } if (realTimeDataParentVo.getFault() != null) { fitFaultRecord(realTimeDataParentVo); } return Result.ok(); } @@ -278,10 +286,12 @@ /** * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条 * * @param realTimeDataVo * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); @@ -293,7 +303,8 @@ // 转换为缓存数据结构 orderVo = BeanUtil.toBean(one, DryOrderVo.class); if (one.getTemps() != null) { Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){}); Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() { }); orderVo.setBellowsTemp(map); } // 查询称重记录,添加到缓存数据结构 @@ -313,17 +324,54 @@ /** * 保存新工单 * * @param realTimeDataVo * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; // 查询设备 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); if (equ == null) { log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); log.error("新增设备:"); if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) { log.error("新增设备失败:设备ID或租户ID为空!machineid={}, tenantid={}", realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid()); return null; } DryEquipment addEqu = new DryEquipment(realTimeDataVo); try { String digits = StringUtils.getDigits(realTimeDataVo.getMachineid()); addEqu.setName(Integer.parseInt(digits) + "#干燥设备"); } catch (NumberFormatException e) { log.error("设备ID格式错误,无法提取数字部分:machineid={}", realTimeDataVo.getMachineid(), e); return null; } DryEqpType eqpType = dryEqpTypeService.getOne( new LambdaQueryWrapper<DryEqpType>() .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) ); if(eqpType == null){ log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() ); return null; } Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); if (!equipmentService.save(addEqu)) { log.error("新增设备失败:数据库保存异常!equipment={}", addEqu); return null; } equ = addEqu; log.info("新增设备成功:equipmentId={}", addEqu.getId()); } // 查询药材 DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); @@ -345,6 +393,7 @@ /** * 查询设备,新设备则添加到设备主数据 * * @param realTimeDataVo * @return */ @@ -365,7 +414,9 @@ object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { if(mqttEnable){ mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage); } }catch (MqttException e) { e.printStackTrace(); } @@ -378,6 +429,7 @@ /** * 查询药材,新药材添加到数据库 * * @param realTimeDataVo * @return */ @@ -389,7 +441,10 @@ if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); if (dryEquipment!=null&&dryEquipment.getType()!=null) { one.setEqpType(dryEquipment.getType()); } dryHerbFormulaService.save(one); } return one; @@ -397,6 +452,7 @@ /** * 保存含水率变化记录 * * @param trendVo * @param orderVo */ @@ -422,6 +478,7 @@ /** * 查询机台实时数据 * * @param realTimeDataVo * @return */ @@ -565,7 +622,7 @@ //处理结束后,将redis中实时数据发送至云服务器 Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); if(mqttEnable && !toCloudFaultMap.isEmpty()){ MqMessage< Map<Object, Object>> message = new MqMessage<>(); message.setData(toCloudFaultMap); @@ -587,8 +644,6 @@ mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); } } catch (Exception e) { @@ -605,37 +660,43 @@ ThreadUtil.execute(() -> { try { //解析存储报警数据 List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); //处理结束后,将redis中实时数据发送至云服务器 Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); if(mqttEnable && !toCloudFaultMap.isEmpty()){ MqMessage< Map<Object, Object>> message = new MqMessage<>(); message.setData(toCloudFaultMap); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); if(!errorList.isEmpty()){ log.error("保存故障:{}", errorList.toString()); } if(!warnList.isEmpty()){ log.error("保存告警:{}", warnList.toString()); } //以下为云服务处理故障,厂内本地服务无需处理 if(!mqttEnable)return; //要保存的历史故障 if(!faultRecords1.isEmpty()){ MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); message.setData(faultRecords1); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); //处理结束后,将redis中实时数据发送至云服务器 key = tenantId + machineId + eqpFault Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), entry -> (DryFaultRecordVo)entry.getValue() )); String tenantId = vo.getTenantid() +""; //广播发送给各租户下移动设备 if (dryFaultMap.isEmpty()) { return; } String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); //数据转换 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); //发送广播 log.error("广播给:{}" , recTopic); mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } catch (Exception e) { e.printStackTrace(); @@ -648,6 +709,7 @@ /** * 解析存储故障数据 * TODO 保证原子性 * * @param fault 故障数据 * @param orderId 工单 * @param tenantId 租户 @@ -661,7 +723,7 @@ //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警", System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"--"+fault); //redis中的故障 Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); Map<String, Object> redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // 键转换为字符串 @@ -689,7 +751,7 @@ realFauMap.put(redisKey, new DryFaultRecord()); DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey); //1.2如果redis不存在则存入(存故障开始) if(rFault ==null){ //组装缓存数据 @@ -700,10 +762,13 @@ DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); addFauMap.put(redisKey,vo); }else { //TODO 特殊情况,如果redis的故障和新 //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) if(rFault.getECount()!=null && rFault.getECount() > 1){ rFault.setECount(1); redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault); System.err.println("报警次数重置 clear clear ,key-"+redisKey); } @@ -716,21 +781,21 @@ addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //没有新故障数据不用覆盖 if(!addFauMap.isEmpty()){ redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap); } //2检测已结束的故障 //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库 Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); curFauMap.keySet().stream() //特别注意,多个报警类型共用方法需要区分类型 .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); vo.setECount(vo.getECount()+1); if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { //更新次数 redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); System.err.println("报警次数更新,key-"+key); } @@ -738,17 +803,15 @@ vo.setEndTime(new Date()); //TODO 结束超过某个时间区间判定为错误数据 faultRecordService.save(vo); redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); result.add(vo); System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"存入数据库"); } }); return result; } }