干燥机配套车间生产管理系统/云平台服务端
bsw215583320
2024-11-22 ca75cf818e434f77ca71d78ac2c883ca41b18713
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -1,21 +1,36 @@
package org.jeecg.modules.dry.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.entity.DryEqpType;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.entity.DryFaultRecord;
import org.jeecg.modules.dry.entity.DryShop;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
@Scope("prototype")
public class MqttSampleCallback implements MqttCallback {
@@ -30,14 +45,33 @@
  @Autowired
  private RedisUtil redisUtil;
  @Autowired
  private IDryRealTimeDataService realTimeDataService;
  @Autowired
  private IDryEquipmentService equipmentService;
  @Autowired
  private IDryEqpTypeService eqpTypeService;
  @Autowired
  private IDryShopService dryShopService;
  @Autowired
  private IDryFaultRecordService faultRecordService;
  @Override
  public void connectionLost(Throwable throwable) {
    System.err.println("连接断开::掉线");
    System.err.println("连接断开::"+throwable.toString());
  }
  @Override
  public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
  public void messageArrived(String topic, MqttMessage mqttMessage) {
    System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
      + new String(mqttMessage.getPayload()));
@@ -62,21 +96,29 @@
            item.put("clientid", clientid);
            //是否连接
            item.put("connected", true);
            //
            String[] info = clientid.split("-");
            item.put("type", info[0]);
            item.put("tenantId", info[1]);
            item.put("code", info[2]);
            //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
            try {
              String[] info = clientid.split("-");
              item.put("type", info[0]);
              item.put("tenantId", info[1]);
              //item.put("code", info[2]);
            }catch (Exception e){
              e.printStackTrace();
            }
            redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
            System.err.println(String.format("设备: %s上线", clientid));
          }
        }
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
          String clientid = messageJson.getString("clientid");
          redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
          System.err.println(String.format("设备: %s下线", clientid));
          try {
            String clientid = messageJson.getString("clientid");
            redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
            System.err.println(String.format("设备: %s下线", clientid));
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
        parseAdminCommand(topic, mqttMessage);
@@ -84,7 +126,12 @@
      // 普通用户
      case "user":
        System.err.println("user");
        parseUserCommand(topic, mqttMessage);
        try {
          parseUserCommand(topic, mqttMessage);
        } catch (Exception e) {
          e.printStackTrace();
        }
        break;
    }
@@ -111,7 +158,11 @@
    if (messageJson.containsKey("timestamp")) {
      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
    }
    baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
    // 实时数据上传太频繁且数据内容超过字段大小不记录日志
    if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){
      baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
    }
    switch (topic) {
      // 查询设备在线
      case MqttConstant.MOBILE_QUERY_EQU_STATU:
@@ -148,6 +199,64 @@
          }
        });
        break;
        // 接收设备实时数据
      case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
        ThreadUtil.execute(() -> {
          try {
            RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
            realTimeDataService.realTimeDataHandle(vo);
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        break;
      // 接收设备报警数据
      case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
        ThreadUtil.execute(() -> {
          try {
            MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
            });
         //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
            System.err.println(listMqMessage.toString());
            faultRecordService.saveBatch(listMqMessage.getData());
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        break;
      case MqttConstant.TENANT_UP_PREFIX_EQU:
        ThreadUtil.execute(() -> {
          try {
            Object equObj = messageJson.get("equipment");
            DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
            TenantContext.setTenant(equipment.getTenantId()+"");
            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
            if (dryEquipment == null) {
              equipmentService.save(equipment);
            }
            Object typeObj = messageJson.get("eqpType");
            DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
            DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
            if (dryEqpType == null) {
              eqpTypeService.save(eqpType);
            }
            // 获取设备所属车间
            Object shopObj = messageJson.get("shop");
            DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
            DryShop dryShop = dryShopService.getById(shop.getId());
            if (dryShop == null) {
              dryShopService.save(shop);
            }
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
    }
@@ -192,6 +301,43 @@
        break;
      case MqttConstant.SERVICE_REQ_PREFIX:
        log.debug("收到设备详细信息查询请求");
        ThreadUtil.execute(() -> {
          String tenantId = messageJson.getString("tenantId");
          String clientId = mqttUtil.getMqttClient().getClientId();
          String tenant = clientId.substring(clientId.lastIndexOf("_")+1);
          if (tenantId!=null && tenantId.equals(tenant)) {
            TenantContext.setTenant(tenantId);
            // 根据设备编码查询设备信息
            String code = messageJson.getString("code");
            DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code);
            // 根据设备车间id查询车间信息
            DryShop shop = dryShopService.getById(equipmentVo.getShopId());
            // 根据设备类型ID查询设备类型信息
            DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
            JSONObject res = new JSONObject();
            res.put("tenant", tenantId);
            res.put("equipment", equipmentVo);
            res.put("shop", shop);
            res.put("eqpType", eqpType);
            try {
              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
              sendMessage.setQos(0);
              mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
              // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        });
        break;
    }
  }