干燥机配套车间生产管理系统/云平台服务端
zhuguifei
昨天 4a60ced80b215fcb2e2d4664b20cd744313ccc10
接收mqtt数据高并发处理
已修改6个文件
300 ■■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java 152 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -46,7 +46,7 @@
  //服务端下行指令前缀(返回给移动端)
  String SERVICE_DOWN_PREFIX = "service/down/res";
  //返回移动端查询设备状态
  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/equ/statu/%s";
  //返回移动端远程请求指令
  String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
@@ -63,6 +63,11 @@
  String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s";
  //服务端向移动端回复一次设备实时故障告警
  String  SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s";
  // 监测客户端连接或断开,推送消息到租户内所有移动端提醒更新干燥设备连接信息(%s-租户)
  String  SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU = SERVICE_BROADCAST_PREFIX + "/update/equ/statu/%s";
  //服务端向移动端发送干燥设备实时数据(%s-租户)
  String  SERVICE_BROADCAST_TENANT_REAL_DATA = SERVICE_BROADCAST_PREFIX + "/real/data/%s";
@@ -77,7 +82,7 @@
  String TENANT_UP_PREFIX = "tenant/up";
  String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
  String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp";
  String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp/test";
  String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data";
  String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data";
@@ -96,12 +101,13 @@
  /**************************start*******************************/
  /**************************end*******************************/
  //redis缓存
  //所有租户的实时报警(%s:租户id)
  String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
  //service(cloud)
  //在线客户端
  String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
@@ -17,6 +17,8 @@
    private Integer eCount;
    //设备名称
    private String equName;
    //设备编码
    private String equCode;
    //租户名称
    private String tenantName;
    //故障时间
@@ -28,10 +30,11 @@
        super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime());
        this.eCount = count;
    }
    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) {
    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount,String equCode, String equName, String tenantName) {
        super(orderId, tenantId, faultName, faultType, startTime, endTime);
        this.eCount = eCount;
        this.equName = equName;
        this.equCode = equCode;
        this.tenantName = tenantName;
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -167,12 +167,23 @@
                String st = client.getString("connectedAt");
                vo.setUpTime(st);
                vo.setClientId(clientid);
            }else{
                vo.setClientId(clientid);
                vo.setOnline(false);
            }
            return vo;
        }).collect(Collectors.toList());
        //排序
        collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
        collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
        collect.sort(
                Comparator.comparing(
                                MoEquVo::getOnline,
                                Comparator.nullsLast(Comparator.reverseOrder())  // true 在前,false 在后,null 最后
                        )
                        .thenComparing(
                                DryEquipment::getCode,
                                Comparator.nullsLast(Comparator.naturalOrder())  // code 升序,null 最后
                        )
        );
        BeanUtils.copyProperties(pageList, page);
        page.setRecords(collect);
    }
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -73,7 +73,7 @@
    mqttConnOpt.setAutomaticReconnect(false);//设置是否自动重连
    //遗嘱消息 TODO qos2需要在设备上线时做清除消息操作
    mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false);
    //mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false);
    try {
      MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence);
@@ -183,6 +183,8 @@
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //TODO 校验租户id是否存在
        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
        //username
        item.put("username", obj.get("username"));
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -42,7 +43,6 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
@@ -88,7 +88,7 @@
                String message = new String(mqttMessage.getPayload());
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
@@ -102,7 +102,7 @@
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -113,6 +113,12 @@
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                            // 推送到移动端
                            String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId"));
                            MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic);
                            sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
@@ -124,8 +130,23 @@
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        // 不符合的设备不进行管理
                        if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
                        String tenantId = clientid.split("-")[1];
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                        //推送到移动端
                        JSONObject item = new JSONObject();
                        String[] info = clientid.split("-");
                        item.put("type", info[0]);
                        item.put("tenantId", info[1]);
                        item.put("code", info[2]);
                        item.put("connected", false);
                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId);
                        MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic);
                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
@@ -177,38 +198,55 @@
        switch (topic) {
            // 查询设备在线
            case MqttConstant.MOBILE_QUERY_EQU_STATU:
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
                log.info("admin收到MQTT请求,topic: {}", topic);  // 改用更规范的日志记录
                ThreadUtil.execute(() -> {
                try {
                    if (client == null || client.isEmpty()) {
                        JSONObject res = new JSONObject();
                        res.put("success", false);
                        res.put("msg", "查询失败");
                        try {
                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
                            sendMessage.setQos(0);
                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    // 1. 参数提取
                    String clientId = messageJson.getString("clientId");
                    if (StringUtils.isEmpty(clientId)) {
                        return;
                    }
                    String deviceKey = clientId.split("-")[1];  // 提取设备标识
                    client.put("success", true);
                    client.put("msg", "查询成功");
                    try {
                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
                        sendMessage.setQos(0);
                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                    // 2. 查询设备状态
                    String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey);
                    JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId);
                    // 3. 异步处理响应
                    ThreadUtil.execute(() -> {
                        JSONObject response = new JSONObject();
                        // 3.1 处理查询结果
                        if (client == null || client.isEmpty()) {
                            response.put("success", false);
                            response.put("msg", "查询失败,设备不存在或离线");
                        } else {
                            response = client;  // 复用查询结果
                            response.put("success", true);
                            response.put("msg", "查询成功");
                        }
                        // 3.2 发送MQTT响应
                        try {
                            String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req);
                            MqMessage<JSONObject> mqMessage = new MqMessage<>(
                                    response,
                                    response.getString("tenantId"),
                                    resTopic
                            );
                            sendMqttMessage(resTopic, mqMessage, 2);
                            log.debug("设备状态响应发送成功: {}", response);
                        } catch (Exception e) {
                            log.error("MQTT响应发送失败", e);
                        }
                    });
                } catch (Exception e) {
                    log.error("处理设备状态查询异常", e);
                }
                break;
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
@@ -227,10 +265,16 @@
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        synchronized (realTimeDataService) {
                            realTimeDataService.realTimeDataHandle(vo);
                        }
                        // 向各租户移动端发送数据
                        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid());
                        MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic);
                        sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1);
                         realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
@@ -79,6 +81,13 @@
    @Value(value = "${jeecg.mqtt.enable}")
    private boolean mqttEnable;
    private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
    private ReentrantLock getLock(String tenantId, String type) {
        String lockKey = tenantId + ":" + type;
        return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
    }
    public String getTemporaryToken() {
        if (token == null) {
@@ -226,7 +235,16 @@
        }
        if (realTimeDataParentVo.getFault() != null) {
            fitFaultRecord(realTimeDataParentVo);
            ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
            faultLock.lock();
            try {
                fitFaultRecord(realTimeDataParentVo);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                faultLock.unlock();
            }
        }
        return Result.ok();
    }
@@ -291,7 +309,7 @@
     * @return
     */
    private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        DryOrderVo orderVo;
        LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +347,7 @@
     * @return
     */
    private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        DryOrderVo orderVo;
        // 查询设备
@@ -357,16 +375,26 @@
                    new LambdaQueryWrapper<DryEqpType>()
                            .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
            );
            if(eqpType == null){
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() );
            if (eqpType == null) {
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid());
                return null;
            }
            Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
            if (!equipmentService.save(addEqu)) {
                log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                return null;
            // 设备新增
            ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
            equipmentLock.lock();
            try {
                if (!equipmentService.save(addEqu)) {
                    log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                    return null;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                equipmentLock.unlock();
            }
            equ = addEqu;
@@ -380,14 +408,26 @@
            log.error("未找到药材:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid());
            return null;
        }
        // 创建新工单
        orderVo = new DryOrderVo(realTimeDataVo);
        orderVo.setHerbId(herbFormula.getId());
        orderVo.setEquId(equ.getId());
        DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
        boolean save = dryOrderService.save(dryOrder);
        return orderVo;
        // 工单新增
        ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
        orderLock.lock();
        try {
            // 创建新工单
            orderVo = new DryOrderVo(realTimeDataVo);
            orderVo.setHerbId(herbFormula.getId());
            orderVo.setEquId(equ.getId());
            DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
            boolean save = dryOrderService.save(dryOrder);
            return orderVo;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            orderLock.unlock();
        }
        return null;
    }
@@ -414,9 +454,9 @@
                object.put("tenantId", realTimeDataVo.getTenantid());
                mqttMessage.setPayload(object.toJSONString().getBytes());
                try {
                   if(mqttEnable){
                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
                   }
                    if (mqttEnable) {
                        mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
@@ -441,7 +481,7 @@
        if (one == null) {
            one = new DryHerbFormula(realTimeDataVo);
            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
            if (dryEquipment != null && dryEquipment.getType() != null) {
                one.setEqpType(dryEquipment.getType());
            }
@@ -657,51 +697,44 @@
    @Override
    public void fitFaultRecord(RealTimeDataParentVo vo) {
        TenantContext.setTenant(vo.getTenantid() + "");
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                if(!errorList.isEmpty()){
                   log.error("保存故障:{}", errorList.toString());
                }
                if(!warnList.isEmpty()){
                    log.error("保存告警:{}", warnList.toString());
                }
        //解析存储报警数据
        List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
        List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
        if (!errorList.isEmpty()) {
            log.error("保存故障:{}", errorList.toString());
        }
        if (!warnList.isEmpty()) {
            log.error("保存告警:{}", warnList.toString());
        }
                //以下为云服务处理故障,厂内本地服务无需处理
                if(!mqttEnable)return;
        //以下为云服务处理故障,厂内本地服务无需处理
        if (!mqttEnable) return;
                //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
        //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
        Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo)entry.getValue()
                        ));
        Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),
                        entry -> (DryFaultRecordVo) entry.getValue()
                ));
                String tenantId = vo.getTenantid() +"";
        String tenantId = vo.getTenantid() + "";
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                log.error("广播给:{}" , recTopic);
                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
        //广播发送给各租户下移动设备
        if (dryFaultMap.isEmpty()) {
            return;
        }
        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
        //数据转换
        List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
        MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
        //发送广播
        log.error("广播给:{}", recTopic);
        mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
@@ -759,12 +792,11 @@
//                addFauMap.put(redisKey,faultRecord);
                Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
                addFauMap.put(redisKey, vo);
            } else {
                //TODO 特殊情况,如果redis的故障和新
                //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数)
                if (rFault.getECount() != null && rFault.getECount() > 1) {
                    rFault.setECount(1);