干燥机配套车间生产管理系统/云平台服务端
zhuguifei
16 小时以前 b38019aae593a66c16f7e75d6e37d14eb8d2c42e
修改接收实时数据接口-故障处理
已修改6个文件
648 ■■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 91 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java 371 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -97,16 +97,15 @@
  /**************************start*******************************/
  /**************************end*******************************/
  //redis缓存
  //client
  String MQTT_REAL_FAULT = "mqtt:real:fault";
  //所有租户的实时报警(%s:租户id)
  String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
  //service(cloud)
  //在线客户端
  String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
  //所有租户的实时报警(%s:租户id)
  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
//  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
@@ -70,25 +70,24 @@
    private IDryEquipmentService dryEquipmentService;
    @ApiOperation(value="测试", notes="返回Hello")
    @ApiOperation(value = "测试", notes = "返回Hello")
    @GetMapping("/hello")
    public Result<?> sayHello() {
        return Result.ok("Hello");
    }
    @ApiOperation(value="接收实时数据Json", notes="设备实时数据上传")
    @ApiOperation(value = "接收实时数据Json", notes = "设备实时数据上传")
    @PostMapping("/sendRealTimeDataJson")
    public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo)  {
    public Result<?> realTimeDataJson(@RequestBody RealTimeDataVo realTimeDataVo) {
        try {
            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){
            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())) {
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(0);
                mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataVo).getBytes());
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage);
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA, mqttMessage);
            }
            if ("user".equals(mqttConfig.getRole())){
            if ("user".equals(mqttConfig.getRole())) {
                //处理故障信息
                dryRealTimeDataService.fitFaultRecord(realTimeDataVo);
            }
@@ -100,51 +99,32 @@
        return dryRealTimeDataService.realTimeDataHandle(realTimeDataVo);
    }
    @ApiOperation(value="接收实时数据Json", notes="设备实时数据上传")
    @ApiOperation(value = "接收实时数据Json", notes = "设备实时数据上传")
    @PostMapping("/sendRealTimeDataJson2")
    public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo)  {
        try {
            if (mqttConfig.isEnable() && "user".equals(mqttConfig.getRole())){
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(0);
                mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataParentVo).getBytes());
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage);
                //处理故障信息
                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
            }
            if ("user".equals(mqttConfig.getRole()) && realTimeDataParentVo.getFault() != null){
                //处理故障信息
                dryRealTimeDataService.fitFaultRecord(realTimeDataParentVo);
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    public Result<?> realTimeDataJson2(@RequestBody RealTimeDataParentVo realTimeDataParentVo) {
        return dryRealTimeDataService.realTimeDataHandle(realTimeDataParentVo);
    }
    @ApiOperation(value="获取设备实时数据", notes="通过租户ID和设备编码获取实时数据")
    @ApiOperation(value = "获取设备实时数据", notes = "通过租户ID和设备编码获取实时数据")
    @GetMapping("/getRealTimeData")
    public Result<?> queryMachineRealTimeData(RealTimeDataVo realTimeDataVo) {
        return dryRealTimeDataService.queryMachineRealTImeData(realTimeDataVo);
    }
    @ApiOperation(value="获取车间统计数据", notes="通过租户ID获取车间统计数据")
    @ApiOperation(value = "获取车间统计数据", notes = "通过租户ID获取车间统计数据")
    @GetMapping("/workshopStatistics")
    public Result<?> workshopStatistics(RealTimeDataVo realTimeDataVo) {
        return dryRealTimeDataService.queryWorkshopStatistics(realTimeDataVo);
    }
    @ApiOperation(value="获取所有机台", notes="通过租户ID获取所有机台数据")
    @ApiOperation(value = "获取所有机台", notes = "通过租户ID获取所有机台数据")
    @GetMapping("/queryAllEqps")
    public Result<?> queryAllEqps(DryEquipment equipment) {
        List<DryEquipment> dryEquipments = dryEquipmentService.queryEqusByTenantId(equipment);
        return  Result.OK(dryEquipments);
        return Result.OK(dryEquipments);
    }
@@ -157,7 +137,7 @@
     * 1013 热风启动    1014 开门观察
     * 1015 出料按钮
     */
    @ApiOperation(value="发送控制指令", notes="向服务端发送控制指令,由服务端通过socket转发给控制模块")
    @ApiOperation(value = "发送控制指令", notes = "向服务端发送控制指令,由服务端通过socket转发给控制模块")
    @PostMapping("/sendCommand")
    public Result<?> sendCommand(@RequestBody CommandMessageVo msgVo) {
        return dryRealTimeDataService.sendSocketMsg(msgVo);
@@ -204,19 +184,20 @@
    /**
     * 根据设备和租户查询该设备类型的干燥配方,将配方转成xml格式,以字符串方式返回
     *
     * @param tenantId
     * @param eqpCode
     * @return
     * @throws JAXBException
     */
    @ApiOperation(value="干燥配方获取", notes="干燥配方下发")
    @ApiOperation(value = "干燥配方获取", notes = "干燥配方下发")
    @GetMapping(value = "/queryFormula")
    public Result<String> queryFormulaByEqpType(Integer tenantId, String eqpCode) throws JAXBException {
        //获取request
        HttpServletRequest request = SpringContextUtils.getHttpServletRequest();
        // 获取请求主机的IP地址
        String ip = IpUtils.getIpAddr(request);
        DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId+ "", eqpCode);
        DryEquipment dryEquipment = dryEquipmentService.selectByTenantIdEquipmentId(tenantId + "", eqpCode);
        if (dryEquipment != null) {
            if (dryEquipment.getIp().equals(ip)) {
            } else {
@@ -225,46 +206,46 @@
        } else {
            return Result.error("设备不存在");
        }
            LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>();
            queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType())
                    .eq(DryHerbFormula::getTenantId, tenantId);
            List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper);
            Formulas formulas = new Formulas();
            list.forEach(item -> {
                DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId());
                if (byId!=null) {
                    item.setPinyin(byId.getPinyin());
                    item.setName(byId.getName());
                }
                Formula formula = new Formula();
                BaseParam baseParam = new BaseParam();
                WaterParam waterParam = new WaterParam();
                TypeParam typeParam = new TypeParam();
                OffsetParam offsetParam = new OffsetParam();
                baseParam.setCode(item.getCode());
                baseParam.setIndex(item.getCode());
                baseParam.setName(item.getName());
                baseParam.setAb(item.getName());
                baseParam.setTyp(item.getCategory());
                waterParam.setDelay(Double.valueOf(item.getDelay()));
                waterParam.setMoisture3(item.getTarget());
                waterParam.setWeight1(Double.valueOf(item.getFeed()));
                waterParam.setTimes(item.getEt());
                waterParam.setTemp1(item.getWindTemp());
                waterParam.setTemp2(item.getEnvTemp());
                waterParam.setTemp3(item.getEnvHum());
                waterParam.setTurntime(item.getTurn());
                typeParam.setMtype(Integer.valueOf(item.getCategory()));
                offsetParam.setMoisoffset(item.getMoisOffset());
                offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration()));
        LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<DryHerbFormula>();
        queryWrapper.eq(DryHerbFormula::getEqpType, dryEquipment.getType())
                .eq(DryHerbFormula::getTenantId, tenantId);
        List<DryHerbFormula> list = dryHerbFormulaService.list(queryWrapper);
        Formulas formulas = new Formulas();
        list.forEach(item -> {
            DryHerbInfo byId = dryHerbInfoService.getById(item.getHerbId());
            if (byId != null) {
                item.setPinyin(byId.getPinyin());
                item.setName(byId.getName());
            }
            Formula formula = new Formula();
            BaseParam baseParam = new BaseParam();
            WaterParam waterParam = new WaterParam();
            TypeParam typeParam = new TypeParam();
            OffsetParam offsetParam = new OffsetParam();
            baseParam.setCode(item.getCode());
            baseParam.setIndex(item.getCode());
            baseParam.setName(item.getName());
            baseParam.setAb(item.getName());
            baseParam.setTyp(item.getCategory());
            waterParam.setDelay(Double.valueOf(item.getDelay()));
            waterParam.setMoisture3(item.getTarget());
            waterParam.setWeight1(Double.valueOf(item.getFeed()));
            waterParam.setTimes(item.getEt());
            waterParam.setTemp1(item.getWindTemp());
            waterParam.setTemp2(item.getEnvTemp());
            waterParam.setTemp3(item.getEnvHum());
            waterParam.setTurntime(item.getTurn());
            typeParam.setMtype(Integer.valueOf(item.getCategory()));
            offsetParam.setMoisoffset(item.getMoisOffset());
            offsetParam.setColdwind(Double.valueOf(item.getCoolingDuration()));
                formula.setBaseParam(baseParam);
                formula.setWaterParam(waterParam);
                formula.setTypeParam(typeParam);
                formula.setOffsetParam(offsetParam);
                formulas.getDryFormulaList().add(formula);
            });
            // 把vos转换成xml
            formula.setBaseParam(baseParam);
            formula.setWaterParam(waterParam);
            formula.setTypeParam(typeParam);
            formula.setOffsetParam(offsetParam);
            formulas.getDryFormulaList().add(formula);
        });
        // 把vos转换成xml
        // 创建JAXBContext实例
        JAXBContext jaxbContext = JAXBContext.newInstance(Formulas.class);
@@ -278,12 +259,12 @@
        StringWriter writer = new StringWriter();
        marshaller.marshal(formulas, writer);
            return Result.OK("请求成功",writer.toString());
        return Result.OK("请求成功", writer.toString());
    }
    @ApiOperation(value="干燥配方上报", notes="干燥配方记录上报")
    @ApiOperation(value = "干燥配方上报", notes = "干燥配方记录上报")
    @PostMapping(value = "/sendFormulaHistory")
    public Result<?> sendFormulaHistory(DryHerbFormulaHisVo hisVo) {
        //获取request
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.*;
@@ -101,8 +102,8 @@
            mqttClient.subscribe(MqttConstant.MOBILE_UP);
            System.err.println("admin订阅" + MqttConstant.MOBILE_UP);
            // 订阅租户实时数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            // 订阅租户报警数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
@@ -178,15 +179,17 @@
      for (int i = 0; i < data.size(); i++) {
        JSONObject obj = data.getJSONObject(i);
        JSONObject item = new JSONObject();
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //TODO 校验租户id是否存在
        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
        //username
        item.put("username", obj.get("username"));
        //连接时间
        String st = obj.getString("connected_at");
        String upTime = DateUtils.zone2Str(st);
        item.put("connectedAt", upTime);
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //是否连接
        Boolean connected = obj.getBoolean("connected");
        item.put("connected", connected);
@@ -195,7 +198,7 @@
          String[] info = clientid.split("-");
          item.put("type", info[0]);
          item.put("tenantId", info[1]);
          //item.put("code", info[2]);
          item.put("code", info[2]);
          if (connected) {
            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -42,6 +42,9 @@
public class MqttSampleCallback implements MqttCallback {
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
@@ -76,8 +79,8 @@
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
//        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
//                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
@@ -98,6 +101,8 @@
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        // 不符合的设备不进行管理
                        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  return;
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
@@ -105,12 +110,14 @@
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                            item.put("code", info[2]);
                            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                            System.err.println(String.format("设备: %s上线", clientid));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
@@ -204,16 +211,16 @@
                });
                break;
            // 接收设备实时数据
            // 接收设备实时数据 TODO 20250718暂不使用,使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
//                ThreadUtil.execute(() -> {
//                    try {
//                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
//                        realTimeDataService.realTimeDataHandle(vo);
//                    } catch (Exception e) {
//                        e.printStackTrace();
//                    }
//                });
                break;
            // 接收设备实时数据-机台
@@ -221,40 +228,42 @@
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                        synchronized (realTimeDataService) {
                            realTimeDataService.realTimeDataHandle(vo);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            //各租户上传的实时报警数据 TODO 20250721暂不使用,统一使用TENANT_UP_PREFIX_REALTIME_DATA_EQP
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tenantId = realFaultMessage.getTentId();
//                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
//                });
//                //故障数据
//                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
//                //租户id
//                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
//                //转换为 Map<String, Object>
//                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
//                        .collect(Collectors.toMap(
//                                Map.Entry::getKey,
//                                entry -> (Object) entry.getValue()
//                        ));
//                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
//                //广播发送给各租户下移动设备
//                if (dryFaultMap.isEmpty()) {
//                    return;
//                }
//                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
//                //数据转换
//                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
//                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
//                //发送广播
//                System.err.println("广播给:" + recTopic);
//                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
@@ -263,7 +272,7 @@
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
@@ -1,7 +1,10 @@
package org.jeecg.modules.dry.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
@Component
@@ -9,4 +12,30 @@
public class MqttUtil {
  public   MqttClient mqttClient;
  /**
   * 发送消息
   *
   * @param topic     订阅
   * @param mqMessage 消息体
   * @param type      1-发送给租户   2-发送给固定id
   */
  public void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
    ThreadUtil.execute(() -> {
      try {
        if (type == 1) {
          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
          sendMessage.setQos(0);
          mqttClient.publish(String.format(topic, mqMessage.getTentId()), sendMessage);
        } else if (type == 2) {
          MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
          sendMessage.setQos(0);
          mqttClient.publish(topic, sendMessage);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.session.IoSession;
@@ -23,6 +24,7 @@
import org.jeecg.modules.dry.common.CacheConstants;
import org.jeecg.modules.dry.entity.*;
import org.jeecg.modules.dry.mqtt.MqMessage;
import org.jeecg.modules.dry.mqtt.MqttConfig;
import org.jeecg.modules.dry.mqtt.MqttUtil;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.socket.ServerHandler;
@@ -54,6 +56,9 @@
    private IDryEquipmentService equipmentService;
    @Autowired
    private IDryEqpTypeService dryEqpTypeService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
@@ -67,6 +72,7 @@
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
@@ -90,24 +96,24 @@
    @Override
    @Transactional
    public Result<?> realTimeDataHandle(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid()+"");
        log.info("实时数据:"+realTimeDataVo.toString());
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        log.info("实时数据:" + realTimeDataVo.toString());
        // 1 查询或创建工单
        // 1.1 从redis取出工单缓存
        DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
                realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid());
                realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid());
        // 1.2 如果有缓存记录
        if(orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) {
        if (orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) {
        // 1.3 没有缓存记录再查询数据库
            // 1.3 没有缓存记录再查询数据库
        } else {
            // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回
            orderVo = getOrSaveDryOrderVoDB(realTimeDataVo);
        }
        if (orderVo == null) {
            log.error("工单不存在,工单号:"+realTimeDataVo.getWorkorder()+",设备:" + realTimeDataVo.getMachineid() +",药材:" + realTimeDataVo.getName());
            log.error("工单不存在,工单号:" + realTimeDataVo.getWorkorder() + ",设备:" + realTimeDataVo.getMachineid() + ",药材:" + realTimeDataVo.getName());
            return Result.error("工单不存在");
        }
@@ -147,27 +153,25 @@
        orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2());
        // 2.3 更新到redis缓存
        redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
                realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60);
                realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60);
        return Result.ok();
    }
    @Override
    @Transactional
    public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) {
        TenantContext.setTenant(realTimeDataParentVo.getTenantid()+"");
        log.info("实时数据:"+realTimeDataParentVo.toString());
        TenantContext.setTenant(realTimeDataParentVo.getTenantid() + "");
        log.info("实时数据:" + realTimeDataParentVo.toString());
        if (realTimeDataParentVo.getRealTime() != null) {
            RealTimeDataVo realTimeDataVo = realTimeDataParentVo.getRealTime();
            // 1 查询或创建工单
            // 1.1 从redis取出工单缓存
            DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
                    realTimeDataParentVo.getTenantid()+"_"+realTimeDataParentVo.getMachineid());
                    realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid());
            // 1.2 如果有缓存记录
            if(orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
            if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) {
                // 1.3 没有缓存记录再查询数据库
            } else {
@@ -175,7 +179,7 @@
                orderVo = getOrSaveDryOrderVoDB(realTimeDataVo);
            }
            if (orderVo == null) {
                log.error("工单不存在,工单号:"+realTimeDataParentVo.getWorkorder()+",设备:" + realTimeDataParentVo.getMachineid() +",药材:" + realTimeDataVo.getName());
                log.error("工单不存在,工单号:" + realTimeDataParentVo.getWorkorder() + ",设备:" + realTimeDataParentVo.getMachineid() + ",药材:" + realTimeDataVo.getName());
                return Result.error("工单不存在");
            }
@@ -214,18 +218,22 @@
            // 2.3 更新到redis缓存
            redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(),
                    realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60);
                    realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60);
        }
        if (realTimeDataParentVo.getReport() != null) {
            saveReport(realTimeDataParentVo);
        }
        if (realTimeDataParentVo.getFault() != null) {
            fitFaultRecord(realTimeDataParentVo);
        }
        return Result.ok();
    }
    private void saveReport(RealTimeDataParentVo realTimeDataParentVo) {
        RealTimeReportVo report = realTimeDataParentVo.getReport();
        if(report.getReport_flag()) {
        if (report.getReport_flag()) {
            DryProdRecord prodRecord = new DryProdRecord();
            prodRecord.setReportHeadName(report.getReport_head_name());
            prodRecord.setReportHeadBatch(report.getReport_head_batch());
@@ -236,24 +244,24 @@
            prodRecord.setReportHeadLeader(report.getReport_head_leader());
            prodRecord.setReportHeadTecher(report.getReport_head_techer());
            prodRecord.setReportCheckField(report.getReport_check_field()?1:0);
            prodRecord.setReportCheckFile(report.getReport_check_file()?1:0);
            prodRecord.setReportCheckTag(report.getReport_check_tag()?1:0);
            prodRecord.setReportCheckTool(report.getReport_check_tool()?1:0);
            prodRecord.setReportCheckField(report.getReport_check_field() ? 1 : 0);
            prodRecord.setReportCheckFile(report.getReport_check_file() ? 1 : 0);
            prodRecord.setReportCheckTag(report.getReport_check_tag() ? 1 : 0);
            prodRecord.setReportCheckTool(report.getReport_check_tool() ? 1 : 0);
            prodRecord.setReportCheckMan(report.getReport_check_man());
            prodRecord.setReportCheckStatus(report.getReport_check_status()?1:0);
            prodRecord.setReportCheckStatus(report.getReport_check_status() ? 1 : 0);
            prodRecord.setReportCheckQa(report.getReport_check_qa());
            prodRecord.setReportCheckRecord(report.getReport_check_record());
            prodRecord.setReportProductView(report.getReport_product_view()?1:0);
            prodRecord.setReportProductWind(report.getReport_product_wind()?1:0);
            prodRecord.setReportProductSun(report.getReport_product_sun()?1:0);
            prodRecord.setReportProductLowDry(report.getReport_product_low_dry()?1:0);
            prodRecord.setReportProductDry(report.getReport_product_dry()?1:0);
            prodRecord.setReportProductView(report.getReport_product_view() ? 1 : 0);
            prodRecord.setReportProductWind(report.getReport_product_wind() ? 1 : 0);
            prodRecord.setReportProductSun(report.getReport_product_sun() ? 1 : 0);
            prodRecord.setReportProductLowDry(report.getReport_product_low_dry() ? 1 : 0);
            prodRecord.setReportProductDry(report.getReport_product_dry() ? 1 : 0);
            prodRecord.setReportProductStart(report.getReport_product_start());
            prodRecord.setReportProductEnd(report.getReport_product_end());
            prodRecord.setReportProductTotal(report.getReport_product_total());
            prodRecord.setReportProductCheck(report.getReport_product_check()?1:0);
            prodRecord.setReportProductCheck(report.getReport_product_check() ? 1 : 0);
            prodRecord.setReportProductMan1(report.getReport_product_man1());
            prodRecord.setReportProductMan2(report.getReport_product_man2());
            prodRecord.setReportProductWeight(report.getReport_product_weight());
@@ -261,15 +269,15 @@
            prodRecord.setReportProductUse(report.getReport_product_use());
            prodRecord.setReportProductQa(report.getReport_product_qa());
            prodRecord.setReportCleanMachine(report.getReport_clean_machine()?1:0);
            prodRecord.setReportCleanWaste(report.getReport_clean_waste()?1:0);
            prodRecord.setReportCleanTool(report.getReport_clean_tool()?1:0);
            prodRecord.setReportCleanDoor(report.getReport_clean_door()?1:0);
            prodRecord.setReportCleanBox(report.getReport_clean_box()?1:0);
            prodRecord.setReportCleanRecord(report.getReport_clean_record()?1:0);
            prodRecord.setReportCleanMachine(report.getReport_clean_machine() ? 1 : 0);
            prodRecord.setReportCleanWaste(report.getReport_clean_waste() ? 1 : 0);
            prodRecord.setReportCleanTool(report.getReport_clean_tool() ? 1 : 0);
            prodRecord.setReportCleanDoor(report.getReport_clean_door() ? 1 : 0);
            prodRecord.setReportCleanBox(report.getReport_clean_box() ? 1 : 0);
            prodRecord.setReportCleanRecord(report.getReport_clean_record() ? 1 : 0);
            prodRecord.setReportCleanDate(report.getReport_clean_date());
            prodRecord.setReportCleanMan(report.getReport_clean_man());
            prodRecord.setReportCleanConfirm(report.getReport_clean_confirm()?1:0);
            prodRecord.setReportCleanConfirm(report.getReport_clean_confirm() ? 1 : 0);
            prodRecord.setReportCleanQa(report.getReport_clean_qa());
            prodRecordService.save(prodRecord);
        }
@@ -278,10 +286,12 @@
    /**
     * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条
     *
     * @param realTimeDataVo
     * @return
     */
    private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        DryOrderVo orderVo;
        LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -293,7 +303,8 @@
            // 转换为缓存数据结构
            orderVo = BeanUtil.toBean(one, DryOrderVo.class);
            if (one.getTemps() != null) {
                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){});
                Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() {
                });
                orderVo.setBellowsTemp(map);
            }
            // 查询称重记录,添加到缓存数据结构
@@ -303,7 +314,7 @@
                orderVo.setTrendVo(oldVo);
                orderVo.setDetailList(trendVos);
            }
        // 3 数据库没有则新增一条数据
            // 3 数据库没有则新增一条数据
        } else {
            orderVo = saveNewOrder(realTimeDataVo);
@@ -313,23 +324,60 @@
    /**
     * 保存新工单
     *
     * @param realTimeDataVo
     * @return
     */
    private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        DryOrderVo orderVo;
        // 查询设备
        DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo);
        if (equ == null) {
            log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid());
            return null;
            log.error("未找到设备:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid());
            log.error("新增设备:");
            if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) {
                log.error("新增设备失败:设备ID或租户ID为空!machineid={}, tenantid={}",
                        realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid());
                return null;
            }
            DryEquipment addEqu = new DryEquipment(realTimeDataVo);
            try {
                String digits = StringUtils.getDigits(realTimeDataVo.getMachineid());
                addEqu.setName(Integer.parseInt(digits) + "#干燥设备");
            } catch (NumberFormatException e) {
                log.error("设备ID格式错误,无法提取数字部分:machineid={}", realTimeDataVo.getMachineid(), e);
                return null;
            }
            DryEqpType eqpType = dryEqpTypeService.getOne(
                    new LambdaQueryWrapper<DryEqpType>()
                            .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
            );
            if(eqpType == null){
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() );
                return null;
            }
            Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
            if (!equipmentService.save(addEqu)) {
                log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                return null;
            }
            equ = addEqu;
            log.info("新增设备成功:equipmentId={}", addEqu.getId());
        }
        // 查询药材
        DryHerbFormula herbFormula =  queryHerbByIndexTenant(realTimeDataVo);
        DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo);
        if (herbFormula == null) {
            log.error("未找到药材:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid());
            log.error("未找到药材:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid());
            return null;
        }
        // 创建新工单
@@ -345,6 +393,7 @@
    /**
     * 查询设备,新设备则添加到设备主数据
     *
     * @param realTimeDataVo
     * @return
     */
@@ -354,7 +403,7 @@
        queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid());
        DryEquipment one = equipmentService.getOne(queryWrapper);
        if (one == null) {
            log.error(role+"保存实时数据,未找到设备:"+realTimeDataVo.getMachineid());
            log.error(role + "保存实时数据,未找到设备:" + realTimeDataVo.getMachineid());
//            one = new DryEquipment(realTimeDataVo);
//            equipmentService.save(one);
            if (MqttConstant.ROLE_ADMIN.equals(role)) {
@@ -365,8 +414,10 @@
                object.put("tenantId", realTimeDataVo.getTenantid());
                mqttMessage.setPayload(object.toJSONString().getBytes());
                try {
                    mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage);
                }catch (MqttException e) {
                   if(mqttEnable){
                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
                   }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
@@ -378,6 +429,7 @@
    /**
     * 查询药材,新药材添加到数据库
     *
     * @param realTimeDataVo
     * @return
     */
@@ -389,7 +441,10 @@
        if (one == null) {
            one = new DryHerbFormula(realTimeDataVo);
            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
            one.setEqpType(dryEquipment.getType());
            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
                one.setEqpType(dryEquipment.getType());
            }
            dryHerbFormulaService.save(one);
        }
        return one;
@@ -397,14 +452,15 @@
    /**
     * 保存含水率变化记录
     *
     * @param trendVo
     * @param orderVo
     */
    private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) {
        //判断 实时含水率 或 实时重量有没有变化,有变化则更新
        if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0
                || orderVo.getTrendVo()!=null &&  trendVo.getWeight() < orderVo.getTrendVo().getWeight()
                ) {
        if (orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0
                || orderVo.getTrendVo() != null && trendVo.getWeight() < orderVo.getTrendVo().getWeight()
        ) {
            DryOrder byId = dryOrderService.getById(orderVo.getId());
            // 将最新结果更新到工单
            if (byId != null) {
@@ -422,12 +478,13 @@
    /**
     * 查询机台实时数据
     *
     * @param realTimeDataVo
     * @return
     */
    @Override
    public Result<?> queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid()+"");
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        // 查询所有机台,查询语句组装
        LambdaQueryWrapper<DryEquipment> queryWrapper = new LambdaQueryWrapper<>();
@@ -449,13 +506,13 @@
                dryEquipments.stream().forEach(item -> {
                    // 获取工单
                    DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode());
                    list.add(item.getName().substring(0, item.getName().indexOf('#')+1));
                    list.add(item.getName().substring(0, item.getName().indexOf('#') + 1));
                    if (order != null) {
                        // 计算干燥效率,用于对比
                        DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1);
                        double v = order.getOriginWeight() - dryOrderTrendVo.getWeight();
                        if (v > 0 && dryOrderTrendVo.getTotalTime()>0) {
                        if (v > 0 && dryOrderTrendVo.getTotalTime() > 0) {
                            DecimalFormat df = new DecimalFormat("#.00");
                            dList.add(Double.valueOf(df.format(v / dryOrderTrendVo.getTotalTime() * 60)));
                        } else {
@@ -477,7 +534,7 @@
                // 查询近十次效率和能能耗平均
                dryOrderService.queryRecentOrderAvg(orderVo);
            }
        }catch (Exception e) {
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Result.ok(orderVo);
@@ -491,27 +548,27 @@
        DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(msgVo.getTenantId() + "", msgVo.getMachineId());
        log.info("获取设备:" + dryEquipment.toString());
       // managedSessions.keySet().forEach(addr -> {
           // ObjectOutputStream oos = null;
            try {
        // managedSessions.keySet().forEach(addr -> {
        // ObjectOutputStream oos = null;
        try {
//                Socket socket = SocketServerConfig.clientMap.get(addr);
                IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp());
                if (session == null) {
                    return Result.error("未获取到session,请检查客户端配置或设备ip配置是否正常");
                }
                SocketMsgVo smv = new SocketMsgVo(msgVo);
                session.write(JSONObject.toJSONString(smv));
            IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp());
            if (session == null) {
                return Result.error("未获取到session,请检查客户端配置或设备ip配置是否正常");
            }
            SocketMsgVo smv = new SocketMsgVo(msgVo);
            session.write(JSONObject.toJSONString(smv));
//                oos = new ObjectOutputStream(socket.getOutputStream());
//                String s = JSONObject.toJSONString(new SocketMsgVo(msgVo));
//                oos.writeUTF(s);
//                oos.flush();
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            }
     //   });
        }
        //   });
        return Result.OK();
    }
@@ -530,15 +587,15 @@
                    orderVo.setEnvHum(order.getEnvHum());
                    orderVo.setEnvTemp(order.getEnvTemp());
                    double watt = order.getWatt() - order.getDetailList().get(0).getWatt();
                    orderVo.setWatt(orderVo.getWatt()==null? watt : orderVo.getWatt() + watt);
                    orderVo.setWatt(orderVo.getWatt() == null ? watt : orderVo.getWatt() + watt);
                    double steam = order.getSteam() - order.getDetailList().get(0).getSteam();
                    orderVo.setSteam(orderVo.getSteam()==null? steam : orderVo.getSteam() + steam);
                    orderVo.setOriginWeight(orderVo.getOriginWeight()==null? order.getOriginWeight(): orderVo.getOriginWeight() + order.getOriginWeight());
                    orderVo.setSteam(orderVo.getSteam() == null ? steam : orderVo.getSteam() + steam);
                    orderVo.setOriginWeight(orderVo.getOriginWeight() == null ? order.getOriginWeight() : orderVo.getOriginWeight() + order.getOriginWeight());
                    double yield = order.getOriginWeight()*(1-(order.getInitial()/100))/(1-(order.getTarget()/100));
                    orderVo.setYield(orderVo.getYield()==null? yield: orderVo.getYield() + yield);
                    double yield = order.getOriginWeight() * (1 - (order.getInitial() / 100)) / (1 - (order.getTarget() / 100));
                    orderVo.setYield(orderVo.getYield() == null ? yield : orderVo.getYield() + yield);
                    double sub = order.getOriginWeight() - order.getYield();
                    orderVo.setReduce(orderVo.getReduce()==null? sub: orderVo.getReduce() + sub);
                    orderVo.setReduce(orderVo.getReduce() == null ? sub : orderVo.getReduce() + sub);
                }
@@ -555,7 +612,7 @@
    @Override
    public Result<?> fitFaultRecord(RealTimeDataVo vo) {
        TenantContext.setTenant(vo.getTenantid()+"");
        TenantContext.setTenant(vo.getTenantid() + "");
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
@@ -565,30 +622,28 @@
                //处理结束后,将redis中实时数据发送至云服务器
                    Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
                    if(mqttEnable && !toCloudFaultMap.isEmpty()){
                        MqMessage< Map<Object, Object>> message = new MqMessage<>();
                        message.setData(toCloudFaultMap);
                        message.setTentId(vo.getTenantid()+"");
                        MqttMessage mqttMessage = new MqttMessage();
                        mqttMessage.setQos(0);
                        mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
                        mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
                    }
                //要保存的历史故障
                if(!faultRecords1.isEmpty()){
                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
                    message.setData(faultRecords1);
                    message.setTentId(vo.getTenantid()+"");
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                if (mqttEnable && !toCloudFaultMap.isEmpty()) {
                    MqMessage<Map<Object, Object>> message = new MqMessage<>();
                    message.setData(toCloudFaultMap);
                    message.setTentId(vo.getTenantid() + "");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                    mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA, mqttMessage);
                }
                //要保存的历史故障
                if (!faultRecords1.isEmpty()) {
                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
                    message.setData(faultRecords1);
                    message.setTentId(vo.getTenantid() + "");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA, mqttMessage);
                }
            } catch (Exception e) {
@@ -601,41 +656,47 @@
    @Override
    public void fitFaultRecord(RealTimeDataParentVo vo) {
        TenantContext.setTenant(vo.getTenantid()+"");
        TenantContext.setTenant(vo.getTenantid() + "");
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                faultRecords1.addAll(faultRecords2);
                //处理结束后,将redis中实时数据发送至云服务器
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
                if(mqttEnable && !toCloudFaultMap.isEmpty()){
                    MqMessage< Map<Object, Object>> message = new MqMessage<>();
                    message.setData(toCloudFaultMap);
                    message.setTentId(vo.getTenantid()+"");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                if(!errorList.isEmpty()){
                   log.error("保存故障:{}", errorList.toString());
                }
                if(!warnList.isEmpty()){
                    log.error("保存告警:{}", warnList.toString());
                }
                //以下为云服务处理故障,厂内本地服务无需处理
                if(!mqttEnable)return;
                //要保存的历史故障
                if(!faultRecords1.isEmpty()){
                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
                    message.setData(faultRecords1);
                    message.setTentId(vo.getTenantid()+"");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo)entry.getValue()
                        ));
                String tenantId = vo.getTenantid() +"";
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                log.error("广播给:{}" , recTopic);
                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            } catch (Exception e) {
                e.printStackTrace();
@@ -648,20 +709,21 @@
    /**
     * 解析存储故障数据
     * TODO 保证原子性
     * @param fault 故障数据
     * @param orderId 工单
     * @param tenantId 租户
     *
     * @param fault     故障数据
     * @param orderId   工单
     * @param tenantId  租户
     * @param machineId 设备
     * @param faultType 故障类型
     * @return 组装好故障数据
     */
    private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){
    private List<DryFaultRecord> fitFault(String fault, String orderId, Integer tenantId, String machineId, Integer faultType) {
        List<DryFaultRecord> result = new ArrayList<>();
        if(StringUtils.isEmpty(fault))return  result;
        if (StringUtils.isEmpty(fault)) return result;
        //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警",
        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"--"+fault);
        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime() + "--" + fault);
        //redis中的故障
        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
        Map<String, Object> redFauMap = rFauMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),  // 键转换为字符串
@@ -669,42 +731,45 @@
                ));
        //没有生成工单的故障数据不存储
        if(StringUtils.isEmpty(orderId)){
        if (StringUtils.isEmpty(orderId)) {
            return result;
        }
        if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){
        if (StringUtils.isEmpty(fault) && rFauMap.isEmpty()) {
            return result;
        }
        //1.解析数据
        String[] eqpFaults = fault.split(",");
        Map<String,DryFaultRecord> addFauMap = new HashMap<>();
        Map<String,DryFaultRecord> realFauMap = new HashMap<>();
        Map<String, DryFaultRecord> addFauMap = new HashMap<>();
        Map<String, DryFaultRecord> realFauMap = new HashMap<>();
        for (int i = 0; i < eqpFaults.length; i++) {
            String eqpFault = eqpFaults[i];
            //避免空字符串
            if(StringUtils.isEmpty(eqpFault.trim())) continue;
            if (StringUtils.isEmpty(eqpFault.trim())) continue;
            //1.1检查mqtt中是否已存在这个故障
            String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim();
            String redisKey = String.format("%s_%s_%s", tenantId, machineId, eqpFault).trim();
            realFauMap.put(redisKey, new DryFaultRecord());
            DryFaultRecordVo  rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey);
            DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey);
            //1.2如果redis不存在则存入(存故障开始)
            if(rFault ==null){
            if (rFault == null) {
                //组装缓存数据
//                DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null);
//                addFauMap.put(redisKey,faultRecord);
                Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName);
                addFauMap.put(redisKey,vo);
            }else {
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
                addFauMap.put(redisKey, vo);
            } else {
                //TODO 特殊情况,如果redis的故障和新
                //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数)
                if(rFault.getECount()!=null && rFault.getECount() > 1){
                if (rFault.getECount() != null && rFault.getECount() > 1) {
                    rFault.setECount(1);
                    redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault);
                    System.err.println("报警次数重置 clear clear ,key-"+redisKey);
                    redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault);
                    System.err.println("报警次数重置 clear clear ,key-" + redisKey);
                }
            }
@@ -715,40 +780,38 @@
        //合并数据
        addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
        //没有新故障数据不用覆盖
        if(!addFauMap.isEmpty()){
            redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap);
        if (!addFauMap.isEmpty()) {
            redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap);
        }
        //2检测已结束的故障
        //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库
        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId));
        curFauMap.keySet().stream()
                //特别注意,多个报警类型共用方法需要区分类型
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType)
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType)
                .forEach(key -> {
                    DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key);
                    vo.setECount(vo.getECount()+1);
                    if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){
                    DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key);
                    vo.setECount(vo.getECount() + 1);
                    if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) {
                        //更新次数
                        redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo);
                        System.err.println("报警次数更新,key-"+key);
                        redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo);
                        System.err.println("报警次数更新,key-" + key);
                    }
                    if(vo.getECount()>=3){
                    if (vo.getECount() >= 3) {
                        vo.setEndTime(new Date());
                        //TODO 结束超过某个时间区间判定为错误数据
                        faultRecordService.save(vo);
                        redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key);
                        result.add(vo);
                        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"存入数据库");
                        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime() + "存入数据库");
                    }
                });
        return result;
    }
}