干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2 天以前 b38019aae593a66c16f7e75d6e37d14eb8d2c42e
修改接收实时数据接口-故障处理
已修改6个文件
312 ■■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java 149 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -97,16 +97,15 @@
  /**************************start*******************************/
  /**************************end*******************************/
  //redis缓存
  //client
  String MQTT_REAL_FAULT = "mqtt:real:fault";
  //所有租户的实时报警(%s:租户id)
  String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
  //service(cloud)
  //在线客户端
  String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
  //所有租户的实时报警(%s:租户id)
  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
//  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
@@ -70,7 +70,6 @@
    private IDryEquipmentService dryEquipmentService;
    @ApiOperation(value="测试", notes="返回Hello")
    @GetMapping("/hello")
    public Result<?> sayHello() {
@@ -103,28 +102,9 @@
    @ApiOperation(value="接收实时数据Json", notes="设备实时数据上传")
    @PostMapping("/sendRealTimeDataJson2")
    public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo)  {
        try {
            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(0);
                mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataParentVo).getBytes());
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage);
                //处理故障信息
                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
            }
            if ("user".equals(mqttConfig.getRole()) && realTimeDataParentVo.getFault() != null){
                //处理故障信息
                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
        return dryRealTimeDataService.realTimeDataHandle(realTimeDataParentVo);
    }
    }
    @ApiOperation(value="获取设备实时数据", notes="通过租户ID和设备编码获取实时数据")
@@ -204,6 +184,7 @@
    /**
     * 根据设备和租户查询该设备类型的干燥配方,将配方转成xml格式,以字符串方式返回
     *
     * @param tenantId
     * @param eqpCode
     * @return
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.*;
@@ -101,8 +102,8 @@
            mqttClient.subscribe(MqttConstant.MOBILE_UP);
            System.err.println("admin订阅" + MqttConstant.MOBILE_UP);
            // 订阅租户实时数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            // 订阅租户报警数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
@@ -178,15 +179,17 @@
      for (int i = 0; i < data.size(); i++) {
        JSONObject obj = data.getJSONObject(i);
        JSONObject item = new JSONObject();
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //TODO 校验租户id是否存在
        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
        //username
        item.put("username", obj.get("username"));
        //连接时间
        String st = obj.getString("connected_at");
        String upTime = DateUtils.zone2Str(st);
        item.put("connectedAt", upTime);
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //是否连接
        Boolean connected = obj.getBoolean("connected");
        item.put("connected", connected);
@@ -195,7 +198,7 @@
          String[] info = clientid.split("-");
          item.put("type", info[0]);
          item.put("tenantId", info[1]);
          //item.put("code", info[2]);
          item.put("code", info[2]);
          if (connected) {
            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -42,6 +42,9 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
@@ -76,8 +79,8 @@
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
//        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
//                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
@@ -98,6 +101,8 @@
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -105,12 +110,14 @@
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
@@ -204,16 +211,16 @@
                });
                break;
            // 接收设备实时数据
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
//                ThreadUtil.execute(() -> {
//                    try {
//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
//                        realTimeDataService.realTimeDataHandle(vo);
//                    } catch (Exception e) {
//                        e.printStackTrace();
//                    }
//                });
                break;
            // 接收设备实时数据-机台
@@ -221,40 +228,42 @@
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        synchronized (realTimeDataService) {
                        realTimeDataService.realTimeDataHandle(vo);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tenantId = realFaultMessage.getTentId();
//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
//                });
//                //故障数据
//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
//                //租户id
//                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
//                //转换为 Map<String, Object>
//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
//                        .collect(Collectors.toMap(
//                                Map.Entry::getKey,
//                                entry -> (Object) entry.getValue()
//                        ));
//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
//                //广播发送给各租户下移动设备
//                if (dryFaultMap.isEmpty()) {
//                    return;
//                }
//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
//                //数据转换
//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
//                //发送广播
//                System.err.println("广播给:" + recTopic);
//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
@@ -263,7 +272,7 @@
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
@@ -1,7 +1,10 @@
package org.jeecg.modules.dry.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
@@ -9,4 +12,30 @@
public class MqttUtil {
  public   MqttClient mqttClient;
  /**
   * 发送消息
   *
   * @param topic     订阅
   * @param mqMessage 消息体
   * @param type      1-发送给租户   2-发送给固定id
   */
  public void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
    ThreadUtil.execute(() -> {
      try {
        if (type == 1) {
          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
          sendMessage.setQos(0);
          mqttClient.publish(String.format(topic, mqMessage.getTentId()), sendMessage);
        } else if (type == 2) {
          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
          sendMessage.setQos(0);
          mqttClient.publish(topic, sendMessage);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.session.IoSession;
@@ -23,6 +24,7 @@
import org.jeecg.modules.dry.common.CacheConstants;
import org.jeecg.modules.dry.entity.*;
import org.jeecg.modules.dry.mqtt.MqMessage;
import org.jeecg.modules.dry.mqtt.MqttConfig;
import org.jeecg.modules.dry.mqtt.MqttUtil;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.socket.ServerHandler;
@@ -54,6 +56,9 @@
    private IDryEquipmentService equipmentService;
    @Autowired
    private IDryEqpTypeService dryEqpTypeService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
@@ -67,6 +72,7 @@
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
@@ -152,8 +158,6 @@
    }
    @Override
    @Transactional
    public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) {
@@ -220,6 +224,10 @@
        if (realTimeDataParentVo.getReport() != null) {
            saveReport(realTimeDataParentVo);
        }
        if (realTimeDataParentVo.getFault() != null) {
            fitFaultRecord(realTimeDataParentVo);
        }
        return Result.ok();
    }
@@ -278,10 +286,12 @@
    /**
     * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条
     *
     * @param realTimeDataVo
     * @return
     */
    private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        DryOrderVo orderVo;
        LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -293,7 +303,8 @@
            // 转换为缓存数据结构
            orderVo = BeanUtil.toBean(one, DryOrderVo.class);
            if (one.getTemps() != null) {
                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){});
                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() {
                });
                orderVo.setBellowsTemp(map);
            }
            // 查询称重记录,添加到缓存数据结构
@@ -313,17 +324,54 @@
    /**
     * 保存新工单
     *
     * @param realTimeDataVo
     * @return
     */
    private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        DryOrderVo orderVo;
        // 查询设备
        DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo);
        if (equ == null) {
            log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid());
            log.error("新增设备:");
            if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) {
                log.error("新增设备失败:设备ID或租户ID为空!machineid={}, tenantid={}",
                        realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid());
            return null;
            }
            DryEquipment addEqu = new DryEquipment(realTimeDataVo);
            try {
                String digits = StringUtils.getDigits(realTimeDataVo.getMachineid());
                addEqu.setName(Integer.parseInt(digits) + "#干燥设备");
            } catch (NumberFormatException e) {
                log.error("设备ID格式错误,无法提取数字部分:machineid={}", realTimeDataVo.getMachineid(), e);
                return null;
            }
            DryEqpType eqpType = dryEqpTypeService.getOne(
                    new LambdaQueryWrapper<DryEqpType>()
                            .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
            );
            if(eqpType == null){
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() );
                return null;
            }
            Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
            if (!equipmentService.save(addEqu)) {
                log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                return null;
            }
            equ = addEqu;
            log.info("新增设备成功:equipmentId={}", addEqu.getId());
        }
        // 查询药材
        DryHerbFormula herbFormula =  queryHerbByIndexTenant(realTimeDataVo);
@@ -345,6 +393,7 @@
    /**
     * 查询设备,新设备则添加到设备主数据
     *
     * @param realTimeDataVo
     * @return
     */
@@ -365,7 +414,9 @@
                object.put("tenantId", realTimeDataVo.getTenantid());
                mqttMessage.setPayload(object.toJSONString().getBytes());
                try {
                   if(mqttEnable){
                    mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage);
                   }
                }catch (MqttException e) {
                    e.printStackTrace();
                }
@@ -378,6 +429,7 @@
    /**
     * 查询药材,新药材添加到数据库
     *
     * @param realTimeDataVo
     * @return
     */
@@ -389,7 +441,10 @@
        if (one == null) {
            one = new DryHerbFormula(realTimeDataVo);
            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
            one.setEqpType(dryEquipment.getType());
            }
            dryHerbFormulaService.save(one);
        }
        return one;
@@ -397,6 +452,7 @@
    /**
     * 保存含水率变化记录
     *
     * @param trendVo
     * @param orderVo
     */
@@ -422,6 +478,7 @@
    /**
     * 查询机台实时数据
     *
     * @param realTimeDataVo
     * @return
     */
@@ -565,7 +622,7 @@
                //处理结束后,将redis中实时数据发送至云服务器
                    Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                    if(mqttEnable && !toCloudFaultMap.isEmpty()){
                        MqMessage< Map<Object, Object>> message = new MqMessage<>();
                        message.setData(toCloudFaultMap);
@@ -587,8 +644,6 @@
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                }
            } catch (Exception e) {
@@ -605,37 +660,43 @@
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                faultRecords1.addAll(faultRecords2);
                //处理结束后,将redis中实时数据发送至云服务器
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
                if(mqttEnable && !toCloudFaultMap.isEmpty()){
                    MqMessage< Map<Object, Object>> message = new MqMessage<>();
                    message.setData(toCloudFaultMap);
                    message.setTentId(vo.getTenantid()+"");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                if(!errorList.isEmpty()){
                   log.error("保存故障:{}", errorList.toString());
                }
                if(!warnList.isEmpty()){
                    log.error("保存告警:{}", warnList.toString());
                }
                //以下为云服务处理故障,厂内本地服务无需处理
                if(!mqttEnable)return;
                //要保存的历史故障
                if(!faultRecords1.isEmpty()){
                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
                    message.setData(faultRecords1);
                    message.setTentId(vo.getTenantid()+"");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo)entry.getValue()
                        ));
                String tenantId = vo.getTenantid() +"";
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                log.error("广播给:{}" , recTopic);
                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            } catch (Exception e) {
                e.printStackTrace();
@@ -648,6 +709,7 @@
    /**
     * 解析存储故障数据
     * TODO 保证原子性
     *
     * @param fault 故障数据
     * @param orderId 工单
     * @param tenantId 租户
@@ -661,7 +723,7 @@
        //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警",
        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"--"+fault);
        //redis中的故障
        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
        Map<String, Object> redFauMap = rFauMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),  // 键转换为字符串
@@ -689,7 +751,7 @@
            realFauMap.put(redisKey, new DryFaultRecord());
            DryFaultRecordVo  rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey);
            DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey);
            //1.2如果redis不存在则存入(存故障开始)
            if(rFault ==null){
                //组装缓存数据
@@ -700,10 +762,13 @@
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName);
                addFauMap.put(redisKey,vo);
            }else {
                //TODO 特殊情况,如果redis的故障和新
                //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数)
                if(rFault.getECount()!=null && rFault.getECount() > 1){
                    rFault.setECount(1);
                    redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault);
                    redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault);
                    System.err.println("报警次数重置 clear clear ,key-"+redisKey);
                }
@@ -716,21 +781,21 @@
        addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
        //没有新故障数据不用覆盖
        if(!addFauMap.isEmpty()){
            redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap);
            redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap);
        }
        //2检测已结束的故障
        //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库
        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
        curFauMap.keySet().stream()
                //特别注意,多个报警类型共用方法需要区分类型
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType)
                .forEach(key -> {
                    DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key);
                    vo.setECount(vo.getECount()+1);
                    if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){
                    if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) {
                        //更新次数
                        redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo);
                        redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo);
                        System.err.println("报警次数更新,key-"+key);
                    }
@@ -738,17 +803,15 @@
                        vo.setEndTime(new Date());
                        //TODO 结束超过某个时间区间判定为错误数据
                        faultRecordService.save(vo);
                        redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key);
                        result.add(vo);
                        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"存入数据库");
                    }
                });
        return result;
    }
}