干燥机配套车间生产管理系统/云平台服务端
bsw215583320
2024-11-18 765f87b17ff1709d5307a6c01bbc40bcfd1c4d48
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -1,10 +1,14 @@
package org.jeecg.modules.dry.service.impl;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
@@ -20,6 +24,7 @@
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.system.util.JwtUtil;
import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.dry.common.CacheConstants;
@@ -41,10 +46,8 @@
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
@Slf4j
@Service
@@ -68,6 +71,9 @@
    @Autowired
    private IDryProdRecordService prodRecordService;
    @Autowired
    private IDryFaultRecordService faultRecordService;
    private String token;
@@ -295,9 +301,10 @@
                mqttMessage.setQos(0);
                JSONObject object = new JSONObject();
                object.put("code", realTimeDataVo.getMachineid());
                object.put("tenantId", realTimeDataVo.getTenantid());
                mqttMessage.setPayload(object.toJSONString().getBytes());
                try {
                    mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_REQ_EQUIPMENT, TenantContext.getTenant()) ,mqttMessage);
                    mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage);
                }catch (MqttException e) {
                    e.printStackTrace();
                }
@@ -335,7 +342,7 @@
    private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) {
        //判断 实时含水率 或 实时重量有没有变化,有变化则更新
        if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0
                || trendVo.getWeight() < orderVo.getTrendVo().getWeight()
                || orderVo.getTrendVo()!=null &&  trendVo.getWeight() < orderVo.getTrendVo().getWeight()
                ) {
            DryOrder byId = dryOrderService.getById(orderVo.getId());
            // 将最新结果更新到工单
@@ -484,4 +491,112 @@
    public Result<?> statisticsDataHandle(StatisticsDataVo statsDataVo) {
        return null;
    }
    @Override
    public Result<?> fitFultRecord(RealTimeDataVo vo) {
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                faultRecords1.addAll(faultRecords2);
                JSONObject json = new JSONObject();
                json.put("data",JSON.toJSONString(faultRecords1));
                if(faultRecords1.isEmpty())  return;
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(0);
                mqttMessage.setPayload((JSON.toJSONString(json).getBytes()));
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        return null;
    }
    /**
     * 解析存储故障数据
     * TODO 保证原子性
     * @param fault 故障数据
     * @param orderId 工单
     * @param tenantId 租户
     * @param machineId 设备
     * @param faultType 故障类型
     * @return 组装好故障数据
     */
    private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){
        List<DryFaultRecord> result = new ArrayList<>();
        //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警",
        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"--"+fault);
        //redis中的故障
        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT);
        Map<String, Object> redFauMap = rFauMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),  // 键转换为字符串
                        entry -> entry.getValue()
                ));
        //没有生成工单的故障数据不存储
        if(StringUtils.isEmpty(orderId)){
            return result;
        }
        if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){
            return result;
        }
        //1.解析数据
        String[] eqpFaults = fault.split(",");
        Map<String,DryFaultRecord> realFauMap = new HashMap<>();
        for (int i = 0; i < eqpFaults.length; i++) {
            String eqpFault = eqpFaults[i];
            //避免空字符串
            if(StringUtils.isEmpty(eqpFault)) continue;
            //1.1检查mqtt中是否已存在这个故障
            String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault);
            String  rFault = (String) redisUtil.get(redisKey);
            //1.2如果redis不存在则存入(存故障开始)
            if(rFault ==null){
                //组装缓存数据
                DryFaultRecord faultRecord = new DryFaultRecord(orderId,eqpFault,faultType,new Date(),null);
                realFauMap.put(redisKey,faultRecord);
            }
        }
        //1.3缓存至redis
        //合并数据
        realFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
        //没有新故障数据不用覆盖
        if(!realFauMap.isEmpty()){
            redisUtil.hmset(MqttConstant.MQTT_EQP_FAULT,redFauMap);
        }
        //2检测已结束的故障
        //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库
        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT);
        curFauMap.keySet().stream()
                //特别注意,多个报警类型共用方法需要区分类型
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecord)curFauMap.get(key)).getFaultType() == faultType)
                .forEach(key -> {
                    System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"存入数据库");
                    DryFaultRecord record = (DryFaultRecord)redFauMap.get(key);
                    record.setEndTime(new Date());
                    faultRecordService.save(record);
                    redisUtil.hdel(MqttConstant.MQTT_EQP_FAULT,key);
                    result.add(record);
                });
        return result;
    }
}