干燥机配套车间生产管理系统/云平台服务端
bsw215583320
2024-11-22 ca75cf818e434f77ca71d78ac2c883ca41b18713
Merge remote-tracking branch 'origin/herb' into herb
已添加4个文件
已修改6个文件
274 ■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonCacheConstant.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/entity/DryFaultRecord.java 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/IDryEquipmentService.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryEquipmentServiceImpl.java 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/config/init/RedisInitListener.java 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonCacheConstant.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,8 @@
package org.jeecg.common.constant;
public interface CommonCacheConstant {
    //redis缓存租户数据
    String SYS_CACHE_TENANT = "sys:cache:tenant";
    //redis缓存各租户下设备信息
    String DRY_CACHE_TENANT_EQUS = "dry:cache:tenant::equs";
}
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -55,6 +55,7 @@
  String TENANT_UP_PREFIX = "tenant/up";
  String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
  String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data";
  String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data";
  String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment";
@@ -71,10 +72,19 @@
  /**************************start*******************************/
  /**************************end*******************************/
  //redis缓存
  //client
  String MQTT_REAL_FAULT = "mqtt:real:fault";
  //service(cloud)
  //在线客户端
  String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
  String MQTT_EQP_FAULT = "mqtt:eqp:fault";
  //所有租户的实时报警(%s:租户id)
  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:";
}
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/entity/DryFaultRecord.java
@@ -40,6 +40,9 @@
    @Excel(name = "工单id", width = 15)
    @ApiModelProperty(value = "工单id")
    private String orderId;
    @Excel(name = "租户id", width = 15)
    @ApiModelProperty(value = "租户id")
    private Integer tenantId;
    /**故障名称*/
    @Excel(name = "故障名称", width = 15)
    @ApiModelProperty(value = "故障名称")
@@ -92,8 +95,9 @@
    public DryFaultRecord() {
    }
    public DryFaultRecord(String orderId, String faultName,Integer faultType, Date startTime, Date endTime) {
    public DryFaultRecord(String orderId,Integer tenantId, String faultName,Integer faultType, Date startTime, Date endTime) {
        this.orderId = orderId;
        this.tenantId = tenantId;
        this.faultName = faultName;
        this.startTime = startTime;
        this.endTime = endTime;
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,32 @@
package org.jeecg.modules.dry.vo;
import lombok.Data;
import org.jeecg.modules.dry.entity.DryFaultRecord;
import java.io.Serializable;
import java.util.Date;
/**
 */
@Data
public class DryFaultRecordVo extends DryFaultRecord implements Serializable {
    private static final long serialVersionUID = 1L;
    //redis故障结束技术
    private Integer eCount;
    //设备名称
    private String equName;
    //租户名称
    private String tenantName;
    public DryFaultRecordVo() {
    }
    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) {
        super(orderId, tenantId, faultName, faultType, startTime, endTime);
        this.eCount = eCount;
        this.equName = equName;
        this.tenantName = tenantName;
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,22 @@
package org.jeecg.modules.dry.mqtt;
import lombok.Data;
/**
 * Mqtt消息载体
 * @param <T>
 */
@Data
public class MqMessage<T> {
    private T data;
    private String tentId;
    public MqMessage() {
    }
    public MqMessage(T data, String tentId) {
        this.data = data;
        this.tentId = tentId;
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -216,11 +216,11 @@
      case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
        ThreadUtil.execute(() -> {
          try {
            JSONObject jsonObject = JSON.parseObject(message);
            List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
            System.err.println(faultRecords.toString());
            faultRecordService.saveBatch(faultRecords);
            MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
            });
         //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
            System.err.println(listMqMessage.toString());
            faultRecordService.saveBatch(listMqMessage.getData());
          } catch (Exception e) {
            e.printStackTrace();
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/IDryEquipmentService.java
@@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @Description: å¹²ç‡¥æœº
@@ -16,4 +17,11 @@
    DryEquipment selectByTenantIdEquipmentId(String tenantId, String equipmentId);
    /**
     * æŸ¥è¯¢ç§Ÿæˆ·ä¸‹æ‰€æœ‰è®¾å¤‡
     * @param tenantId
     * @return
     */
    Map<String,DryEquipment>  queryEquByTenantId(Integer tenantId);
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryEquipmentServiceImpl.java
@@ -1,17 +1,23 @@
package org.jeecg.modules.dry.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.CommonCacheConstant;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.dry.common.CacheConstants;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.mapper.DryEquipmentMapper;
import org.jeecg.modules.dry.service.IDryEquipmentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
 * @Description: å¹²ç‡¥æœº
@@ -24,6 +30,8 @@
    @Autowired
    private RedisUtil redisUtil;
    @Override
    public DryEquipment selectByTenantIdEquipmentId(String tenantId, String equipmentId) {
        DryEquipment dryEquipment = (DryEquipment) redisUtil.hget(CacheConstants.RedisKeyEnum.EQP_MAP.getCode(), tenantId + equipmentId);
@@ -39,4 +47,20 @@
        }
        return dryEquipment;
    }
    @Override
    @Cacheable(cacheNames = CommonCacheConstant.DRY_CACHE_TENANT_EQUS, key = "#tenantId" , unless = "#result == null " )
    public Map<String,DryEquipment> queryEquByTenantId(Integer tenantId) {
        TenantContext.setTenant(tenantId +"");
        QueryWrapper<DryEquipment> queryWrapper  = new QueryWrapper<>();
        queryWrapper.lambda().eq(DryEquipment::getTenantId,tenantId);
        List<DryEquipment> equipmentList = this.list(queryWrapper);
        Map<String, DryEquipment> userMap = equipmentList.stream()
                .collect(Collectors.toMap(
                        DryEquipment::getCode,
                        Function.identity(),
                        (existingValue, newValue) -> existingValue // å¦‚果键冲突,保留旧值
                ));
        return userMap;
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -3,48 +3,35 @@
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.service.IoHandler;
import org.apache.mina.core.session.IoSession;
import org.apache.shiro.SecurityUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.api.CommonAPI;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.CommonCacheConstant;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.system.util.JwtUtil;
import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.dry.common.CacheConstants;
import org.jeecg.modules.dry.entity.*;
import org.jeecg.modules.dry.mqtt.MqMessage;
import org.jeecg.modules.dry.mqtt.MqttUtil;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.socket.ServerHandler;
import org.jeecg.modules.dry.socket.SocketServerConfig;
import org.jeecg.modules.dry.util.DryUtil;
import org.jeecg.modules.dry.vo.*;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.security.auth.login.LoginContext;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.text.DecimalFormat;
import java.util.*;
import java.util.stream.Collectors;
@@ -494,20 +481,41 @@
    @Override
    public Result<?> fitFultRecord(RealTimeDataVo vo) {
        TenantContext.setTenant(vo.getTenantid()+"");
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                faultRecords1.addAll(faultRecords2);
                JSONObject json = new JSONObject();
                json.put("data",JSON.toJSONString(faultRecords1));
                if(faultRecords1.isEmpty())  return;
                MqttMessage mqttMessage = new MqttMessage();
                mqttMessage.setQos(0);
                mqttMessage.setPayload((JSON.toJSONString(json).getBytes()));
                mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                //处理结束后,将redis中实时数据发送至云服务器
                    Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
                    if(!toCloudFaultMap.isEmpty()){
                        MqMessage< Map<Object, Object>> message = new MqMessage<>();
                        message.setData(toCloudFaultMap);
                        message.setTentId(vo.getTenantid()+"");
                        MqttMessage mqttMessage = new MqttMessage();
                        mqttMessage.setQos(0);
                        mqttMessage.setPayload(JSON.toJSONString(message).getBytes());
                        mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage);
                    }
                //要保存的历史故障
                if(!faultRecords1.isEmpty()){
                    MqMessage<List<DryFaultRecord>> message = new MqMessage<>();
                    message.setData(faultRecords1);
                    message.setTentId(vo.getTenantid()+"");
                    MqttMessage mqttMessage = new MqttMessage();
                    mqttMessage.setQos(0);
                    mqttMessage.setPayload((JSON.toJSONString(message).getBytes()));
                    mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
@@ -533,7 +541,7 @@
        //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警",
        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"--"+fault);
        //redis中的故障
        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT);
        Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        Map<String, Object> redFauMap = rFauMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),  // é”®è½¬æ¢ä¸ºå­—符串
@@ -550,50 +558,74 @@
        }
        //1.解析数据
        String[] eqpFaults = fault.split(",");
        Map<String,DryFaultRecord> addFauMap = new HashMap<>();
        Map<String,DryFaultRecord> realFauMap = new HashMap<>();
        for (int i = 0; i < eqpFaults.length; i++) {
            String eqpFault = eqpFaults[i];
            //避免空字符串
            if(StringUtils.isEmpty(eqpFault)) continue;
            if(StringUtils.isEmpty(eqpFault.trim())) continue;
            //1.1检查mqtt中是否已存在这个故障
            String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault);
            String  rFault = (String) redisUtil.get(redisKey);
            String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim();
            realFauMap.put(redisKey, new DryFaultRecord());
            DryFaultRecordVo  rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey);
            //1.2如果redis不存在则存入(存故障开始)
            if(rFault ==null){
                //组装缓存数据
                DryFaultRecord faultRecord = new DryFaultRecord(orderId,eqpFault,faultType,new Date(),null);
                realFauMap.put(redisKey,faultRecord);
//                DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null);
//                addFauMap.put(redisKey,faultRecord);
                Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName);
                addFauMap.put(redisKey,vo);
            }else {
                //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 éœ€è¦é‡æ–°è®¡æ•°ï¼‰
                if(rFault.getECount()!=null && rFault.getECount() > 1){
                    rFault.setECount(1);
                    redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault);
                    System.err.println("报警次数重置 clear clear ï¼Œkey-"+redisKey);
                }
            }
        }
        //1.3缓存至redis
        //合并数据
        realFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
        addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value));
        //没有新故障数据不用覆盖
        if(!realFauMap.isEmpty()){
            redisUtil.hmset(MqttConstant.MQTT_EQP_FAULT,redFauMap);
        if(!addFauMap.isEmpty()){
            redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap);
        }
        //2检测已结束的故障
        //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库
        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT);
        Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT);
        curFauMap.keySet().stream()
                //特别注意,多个报警类型共用方法需要区分类型
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecord)curFauMap.get(key)).getFaultType() == faultType)
                .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType)
                .forEach(key -> {
                    System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"存入数据库");
                    DryFaultRecord record = (DryFaultRecord)redFauMap.get(key);
                    record.setEndTime(new Date());
                    faultRecordService.save(record);
                    redisUtil.hdel(MqttConstant.MQTT_EQP_FAULT,key);
                    DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key);
                    vo.setECount(vo.getECount()+1);
                    if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){
                        //更新次数
                        redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo);
                        System.err.println("报警次数更新,key-"+key);
                    }
                    result.add(record);
                    if(vo.getECount()>=3){
                        vo.setEndTime(new Date());
                        //TODO ç»“束超过某个时间区间判定为错误数据
                        faultRecordService.save(vo);
                        redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key);
                        result.add(vo);
                        System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") +  DateUtils.formatDateTime()+"存入数据库");
                    }
                });
        return result;
    }
jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/config/init/RedisInitListener.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,32 @@
package org.jeecg.config.init;
import org.jeecg.common.constant.CommonCacheConstant;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.system.entity.SysTenant;
import org.jeecg.modules.system.service.ISysTenantService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
public class RedisInitListener  implements ApplicationListener<ApplicationStartedEvent> {
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private ISysTenantService sysTenantService;
    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        //查询所有租户信息并缓存至redis
        List<SysTenant> tenantList = sysTenantService.list();
        //list转map
        Map<String, Object> tenantMap = tenantList.stream()
                .collect(Collectors.toMap(t -> String.valueOf(t.getId()),  t -> (Object) t.getName(), (existingValue, newValue) -> existingValue));
        redisUtil.hmset(CommonCacheConstant.SYS_CACHE_TENANT,tenantMap);
    }
}