| | |
| | | //服务端下行指令前缀(返回给移动端) |
| | | String SERVICE_DOWN_PREFIX = "service/down/res"; |
| | | //返回移动端查询设备状态 |
| | | String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu"; |
| | | String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/equ/statu/%s"; |
| | | //返回移动端远程请求指令 |
| | | String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd"; |
| | | |
| | |
| | | String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"; |
| | | //服务端向移动端回复一次设备实时故障告警 |
| | | String SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s"; |
| | | // 监测客户端连接或断开,推送消息到租户内所有移动端提醒更新干燥设备连接信息(%s-租户) |
| | | String SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU = SERVICE_BROADCAST_PREFIX + "/update/equ/statu/%s"; |
| | | |
| | | //服务端向移动端发送干燥设备实时数据(%s-租户) |
| | | String SERVICE_BROADCAST_TENANT_REAL_DATA = SERVICE_BROADCAST_PREFIX + "/real/data/%s"; |
| | | |
| | | |
| | | |
| | |
| | | |
| | | String TENANT_UP_PREFIX = "tenant/up"; |
| | | String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; |
| | | String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp"; |
| | | String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp/test"; |
| | | String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data"; |
| | | String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data"; |
| | | |
| | |
| | | |
| | | /**************************start*******************************/ |
| | | /**************************end*******************************/ |
| | | //redis缓存 |
| | | |
| | | //所有租户的实时报警(%s:租户id) |
| | | String MQTT_REAL_FAULT = "mqtt:real:fault:%s"; |
| | | |
| | | |
| | | |
| | | |
| | | //service(cloud) |
| | | //在线客户端 |
| | | String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s"; |
| | |
| | | private Integer eCount; |
| | | //设备名称 |
| | | private String equName; |
| | | //设备编码 |
| | | private String equCode; |
| | | //租户名称 |
| | | private String tenantName; |
| | | //故障时间 |
| | |
| | | super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime()); |
| | | this.eCount = count; |
| | | } |
| | | public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) { |
| | | public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount,String equCode, String equName, String tenantName) { |
| | | super(orderId, tenantId, faultName, faultType, startTime, endTime); |
| | | this.eCount = eCount; |
| | | this.equName = equName; |
| | | this.equCode = equCode; |
| | | this.tenantName = tenantName; |
| | | } |
| | | } |
| | |
| | | String st = client.getString("connectedAt"); |
| | | vo.setUpTime(st); |
| | | vo.setClientId(clientid); |
| | | }else{ |
| | | vo.setClientId(clientid); |
| | | vo.setOnline(false); |
| | | } |
| | | return vo; |
| | | }).collect(Collectors.toList()); |
| | | //排序 |
| | | collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder()))); |
| | | collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder()))); |
| | | collect.sort( |
| | | Comparator.comparing( |
| | | MoEquVo::getOnline, |
| | | Comparator.nullsLast(Comparator.reverseOrder()) // true 在前,false 在后,null 最后 |
| | | ) |
| | | .thenComparing( |
| | | DryEquipment::getCode, |
| | | Comparator.nullsLast(Comparator.naturalOrder()) // code 升序,null 最后 |
| | | ) |
| | | ); |
| | | BeanUtils.copyProperties(pageList, page); |
| | | page.setRecords(collect); |
| | | } |
| | |
| | | mqttConnOpt.setAutomaticReconnect(false);//设置是否自动重连 |
| | | |
| | | //遗嘱消息 TODO qos2需要在设备上线时做清除消息操作 |
| | | mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false); |
| | | //mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false); |
| | | try { |
| | | |
| | | MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence); |
| | |
| | | String clientid = obj.getString("clientid"); |
| | | item.put("clientid", clientid); |
| | | //TODO 校验租户id是否存在 |
| | | |
| | | |
| | | if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue; |
| | | //username |
| | | item.put("username", obj.get("username")); |
| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.apache.poi.ss.formula.functions.T; |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | | import org.eclipse.paho.client.mqttv3.MqttCallback; |
| | |
| | | public class MqttSampleCallback implements MqttCallback { |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | |
| | | |
| | | |
| | | @Autowired |
| | |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) { |
| | | JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); |
| | | if (client == null) { |
| | | JSONObject item = new JSONObject(); |
| | |
| | | String clientid = messageJson.getString("clientid"); |
| | | item.put("clientid", clientid); |
| | | // 不符合的设备不进行管理 |
| | | if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; |
| | | if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; |
| | | //是否连接 |
| | | item.put("connected", true); |
| | | //根据clientid解析(注意配置文件中clientid格式 例:client-1000) |
| | |
| | | item.put("code", info[2]); |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); |
| | | System.err.println(String.format("设备: %s上线", clientid)); |
| | | |
| | | // 推送到移动端 |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId")); |
| | | MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { |
| | | try { |
| | | String clientid = messageJson.getString("clientid"); |
| | | redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); |
| | | // 不符合的设备不进行管理 |
| | | if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return; |
| | | String tenantId = clientid.split("-")[1]; |
| | | redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid); |
| | | System.err.println(String.format("设备: %s下线", clientid)); |
| | | |
| | | //推送到移动端 |
| | | JSONObject item = new JSONObject(); |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | item.put("code", info[2]); |
| | | item.put("connected", false); |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId); |
| | | MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | switch (topic) { |
| | | // 查询设备在线 |
| | | case MqttConstant.MOBILE_QUERY_EQU_STATU: |
| | | System.err.println("admin收到" + topic); |
| | | // 根据设备id查询设备mqtt在线状态 |
| | | String clientId = messageJson.getString("clientId"); |
| | | JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); |
| | | log.info("admin收到MQTT请求,topic: {}", topic); // 改用更规范的日志记录 |
| | | |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | |
| | | if (client == null || client.isEmpty()) { |
| | | JSONObject res = new JSONObject(); |
| | | res.put("success", false); |
| | | res.put("msg", "查询失败"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | // 1. 参数提取 |
| | | String clientId = messageJson.getString("clientId"); |
| | | if (StringUtils.isEmpty(clientId)) { |
| | | return; |
| | | } |
| | | String deviceKey = clientId.split("-")[1]; // 提取设备标识 |
| | | |
| | | client.put("success", true); |
| | | client.put("msg", "查询成功"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | // 2. 查询设备状态 |
| | | String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey); |
| | | JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId); |
| | | |
| | | // 3. 异步处理响应 |
| | | ThreadUtil.execute(() -> { |
| | | JSONObject response = new JSONObject(); |
| | | |
| | | // 3.1 处理查询结果 |
| | | if (client == null || client.isEmpty()) { |
| | | response.put("success", false); |
| | | response.put("msg", "查询失败,设备不存在或离线"); |
| | | } else { |
| | | response = client; // 复用查询结果 |
| | | response.put("success", true); |
| | | response.put("msg", "查询成功"); |
| | | } |
| | | |
| | | // 3.2 发送MQTT响应 |
| | | try { |
| | | String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req); |
| | | MqMessage<JSONObject> mqMessage = new MqMessage<>( |
| | | response, |
| | | response.getString("tenantId"), |
| | | resTopic |
| | | ); |
| | | |
| | | sendMqttMessage(resTopic, mqMessage, 2); |
| | | log.debug("设备状态响应发送成功: {}", response); |
| | | |
| | | } catch (Exception e) { |
| | | log.error("MQTT响应发送失败", e); |
| | | } |
| | | }); |
| | | |
| | | } catch (Exception e) { |
| | | log.error("处理设备状态查询异常", e); |
| | | } |
| | | break; |
| | | |
| | | // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP |
| | |
| | | case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | |
| | | RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class); |
| | | synchronized (realTimeDataService) { |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | } |
| | | // 向各租户移动端发送数据 |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid()); |
| | | MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1); |
| | | |
| | | |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | import java.text.DecimalFormat; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Slf4j |
| | |
| | | |
| | | @Value(value = "${jeecg.mqtt.enable}") |
| | | private boolean mqttEnable; |
| | | |
| | | private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>(); |
| | | |
| | | private ReentrantLock getLock(String tenantId, String type) { |
| | | String lockKey = tenantId + ":" + type; |
| | | return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock()); |
| | | } |
| | | |
| | | public String getTemporaryToken() { |
| | | if (token == null) { |
| | |
| | | } |
| | | |
| | | if (realTimeDataParentVo.getFault() != null) { |
| | | fitFaultRecord(realTimeDataParentVo); |
| | | ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault"); |
| | | faultLock.lock(); |
| | | try { |
| | | fitFaultRecord(realTimeDataParentVo); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | faultLock.unlock(); |
| | | } |
| | | |
| | | } |
| | | return Result.ok(); |
| | | } |
| | |
| | | * @return |
| | | */ |
| | | private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() +""); |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); |
| | | DryOrderVo orderVo; |
| | | LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); |
| | |
| | | * @return |
| | | */ |
| | | private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() +""); |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); |
| | | DryOrderVo orderVo; |
| | | |
| | | // 查询设备 |
| | |
| | | new LambdaQueryWrapper<DryEqpType>() |
| | | .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) |
| | | ); |
| | | if(eqpType == null){ |
| | | log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() ); |
| | | if (eqpType == null) { |
| | | log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid()); |
| | | return null; |
| | | } |
| | | |
| | | Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); |
| | | |
| | | if (!equipmentService.save(addEqu)) { |
| | | log.error("新增设备失败:数据库保存异常!equipment={}", addEqu); |
| | | return null; |
| | | // 设备新增 |
| | | ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment"); |
| | | |
| | | equipmentLock.lock(); |
| | | try { |
| | | if (!equipmentService.save(addEqu)) { |
| | | log.error("新增设备失败:数据库保存异常!equipment={}", addEqu); |
| | | return null; |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | equipmentLock.unlock(); |
| | | } |
| | | equ = addEqu; |
| | | |
| | |
| | | log.error("未找到药材:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid()); |
| | | return null; |
| | | } |
| | | // 创建新工单 |
| | | orderVo = new DryOrderVo(realTimeDataVo); |
| | | |
| | | orderVo.setHerbId(herbFormula.getId()); |
| | | orderVo.setEquId(equ.getId()); |
| | | DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); |
| | | boolean save = dryOrderService.save(dryOrder); |
| | | return orderVo; |
| | | // 工单新增 |
| | | ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order"); |
| | | orderLock.lock(); |
| | | try { |
| | | // 创建新工单 |
| | | orderVo = new DryOrderVo(realTimeDataVo); |
| | | orderVo.setHerbId(herbFormula.getId()); |
| | | orderVo.setEquId(equ.getId()); |
| | | DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); |
| | | boolean save = dryOrderService.save(dryOrder); |
| | | return orderVo; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } finally { |
| | | orderLock.unlock(); |
| | | } |
| | | |
| | | |
| | | return null; |
| | | } |
| | | |
| | | |
| | |
| | | object.put("tenantId", realTimeDataVo.getTenantid()); |
| | | mqttMessage.setPayload(object.toJSONString().getBytes()); |
| | | try { |
| | | if(mqttEnable){ |
| | | mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); |
| | | } |
| | | if (mqttEnable) { |
| | | mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); |
| | | } |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | if (one == null) { |
| | | one = new DryHerbFormula(realTimeDataVo); |
| | | DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); |
| | | if (dryEquipment!=null&&dryEquipment.getType()!=null) { |
| | | if (dryEquipment != null && dryEquipment.getType() != null) { |
| | | one.setEqpType(dryEquipment.getType()); |
| | | } |
| | | |
| | |
| | | @Override |
| | | public void fitFaultRecord(RealTimeDataParentVo vo) { |
| | | TenantContext.setTenant(vo.getTenantid() + ""); |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | //解析存储报警数据 |
| | | List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); |
| | | List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); |
| | | if(!errorList.isEmpty()){ |
| | | log.error("保存故障:{}", errorList.toString()); |
| | | } |
| | | if(!warnList.isEmpty()){ |
| | | log.error("保存告警:{}", warnList.toString()); |
| | | } |
| | | //解析存储报警数据 |
| | | List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); |
| | | List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); |
| | | if (!errorList.isEmpty()) { |
| | | log.error("保存故障:{}", errorList.toString()); |
| | | } |
| | | if (!warnList.isEmpty()) { |
| | | log.error("保存告警:{}", warnList.toString()); |
| | | } |
| | | |
| | | //以下为云服务处理故障,厂内本地服务无需处理 |
| | | if(!mqttEnable)return; |
| | | //以下为云服务处理故障,厂内本地服务无需处理 |
| | | if (!mqttEnable) return; |
| | | |
| | | |
| | | |
| | | //处理结束后,将redis中实时数据发送至云服务器 key = tenantId + machineId + eqpFault |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); |
| | | //处理结束后,将redis中实时数据发送至云服务器 key = tenantId + machineId + eqpFault |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); |
| | | |
| | | |
| | | Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), |
| | | entry -> (DryFaultRecordVo)entry.getValue() |
| | | )); |
| | | Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), |
| | | entry -> (DryFaultRecordVo) entry.getValue() |
| | | )); |
| | | |
| | | String tenantId = vo.getTenantid() +""; |
| | | String tenantId = vo.getTenantid() + ""; |
| | | |
| | | //广播发送给各租户下移动设备 |
| | | if (dryFaultMap.isEmpty()) { |
| | | return; |
| | | } |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | //发送广播 |
| | | log.error("广播给:{}" , recTopic); |
| | | mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | //广播发送给各租户下移动设备 |
| | | if (dryFaultMap.isEmpty()) { |
| | | return; |
| | | } |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | //发送广播 |
| | | log.error("广播给:{}", recTopic); |
| | | mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | } |
| | | |
| | |
| | | // addFauMap.put(redisKey,faultRecord); |
| | | Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); |
| | | String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); |
| | | DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); |
| | | DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName); |
| | | addFauMap.put(redisKey, vo); |
| | | } else { |
| | | //TODO 特殊情况,如果redis的故障和新 |
| | | |
| | | |
| | | |
| | | |
| | | //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) |
| | | if (rFault.getECount() != null && rFault.getECount() > 1) { |
| | | rFault.setECount(1); |