干燥机配套车间生产管理系统/云平台服务端
bsw215583320
2025-02-17 ff34638b445619d83740223514aa4de4a8e9a65f
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -2,224 +2,442 @@
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.service.IDryRealTimeDataService;
import org.jeecg.modules.dry.entity.DryEqpType;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.entity.DryFaultRecord;
import org.jeecg.modules.dry.entity.DryShop;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataParentVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Component
@Scope("prototype")
public class MqttSampleCallback implements MqttCallback {
  @Value(value = "${jeecg.mqtt.role}")
  private String role;
  @Autowired
  private MqttUtil mqttUtil;
  @Autowired
  private EmqxApi emqxApi;
  @Autowired
  private BaseCommonService baseCommonService;
  @Autowired
  private RedisUtil redisUtil;
    @Value(value = "${jeecg.mqtt.role}")
    private String role;
    @Autowired
    private MqttUtil mqttUtil;
    @Autowired
    private EmqxApi emqxApi;
    @Autowired
    private BaseCommonService baseCommonService;
    @Autowired
    private RedisUtil redisUtil;
  @Autowired
  private IDryRealTimeDataService realTimeDataService;
    @Autowired
    private IDryRealTimeDataService realTimeDataService;
  @Override
  public void connectionLost(Throwable throwable) {
    System.err.println("连接断开::掉线");
  }
    @Autowired
    private IDryEquipmentService equipmentService;
  @Override
  public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
    System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
      + new String(mqttMessage.getPayload()));
    @Autowired
    private IDryEqpTypeService eqpTypeService;
    switch (role) {
      // 管理员
      case "admin":
    @Autowired
    private IDryShopService dryShopService;
    @Autowired
    private IDryFaultRecordService faultRecordService;
    @Override
    public void connectionLost(Throwable throwable) {
        System.err.println("连接断开::掉线");
        System.err.println("连接断开::" + throwable.toString());
    }
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
                + new String(mqttMessage.getPayload()));
        switch (role) {
            // 管理员
            case "admin":
                String message = new String(mqttMessage.getPayload());
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
                        //username
                        item.put("username", messageJson.get("username"));
                        //连接时间
                        Long st = messageJson.getLong("connected_at");
                        String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
                        item.put("connectedAt", upTime);
                        //clientid
                        String clientid = messageJson.getString("clientid");
                        item.put("clientid", clientid);
                        //是否连接
                        item.put("connected", true);
                        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
                        try {
                            String[] info = clientid.split("-");
                            item.put("type", info[0]);
                            item.put("tenantId", info[1]);
                            //item.put("code", info[2]);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
                }
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                parseAdminCommand(topic, mqttMessage);
                break;
            // 普通用户
            case "user":
                System.err.println("user");
                try {
                    parseUserCommand(topic, mqttMessage);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                break;
        }
    }
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.err.println("消息传递成功");
    }
    // 解析admin角色指令
    private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
        String message = new String(mqttMessage.getPayload());
        JSONObject messageJson = JSONObject.parseObject(message);
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
          JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
          if (client == null) {
            JSONObject item = new JSONObject();
            //username
            item.put("username", messageJson.get("username"));
            //连接时间
            Long st = messageJson.getLong("connected_at");
            String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
            item.put("connectedAt", upTime);
            //clientid
            String clientid = messageJson.getString("clientid");
            item.put("clientid", clientid);
            //是否连接
            item.put("connected", true);
            //
            String[] info = clientid.split("-");
            item.put("type", info[0]);
            item.put("tenantId", info[1]);
            item.put("code", info[2]);
            redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
            System.err.println(String.format("设备: %s上线", clientid));
          }
        //请求的客户端(服务端只推送数据到请求的客户端)
        StringBuilder req = new StringBuilder();
        if (messageJson.containsKey("req")) {
            req.append(messageJson.get("req"));
        }
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
          String clientid = messageJson.getString("clientid");
          redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
          System.err.println(String.format("设备: %s下线", clientid));
        //前端传参时间戳转换
        if (messageJson.containsKey("timestamp")) {
            messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
        }
        parseAdminCommand(topic, mqttMessage);
        break;
      // 普通用户
      case "user":
        System.err.println("user");
        try {
          parseUserCommand(topic, mqttMessage);
        } catch (Exception e) {
          e.printStackTrace();
        // 实时数据上传太频繁且数据内容超过字段大小不记录日志
        if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
            // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
        }
        break;
        switch (topic) {
            // 查询设备在线
            case MqttConstant.MOBILE_QUERY_EQU_STATU:
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
    }
                ThreadUtil.execute(() -> {
  }
                    if (client == null || client.isEmpty()) {
                        JSONObject res = new JSONObject();
                        res.put("success", false);
                        res.put("msg", "查询失败");
                        try {
                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
                            sendMessage.setQos(0);
                            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return;
                    }
                    client.put("success", true);
                    client.put("msg", "查询成功");
                    try {
                        MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
                        sendMessage.setQos(0);
                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
                        baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
  @Override
  public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    System.err.println("消息传递成功");
  }
            // 接收设备实时数据
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
  // 解析admin角色指令
  private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
    String message = new String(mqttMessage.getPayload());
    JSONObject messageJson = JSONObject.parseObject(message);
                break;
            // 接收设备实时数据-机台
            case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
                ThreadUtil.execute(() -> {
                    try {
                        RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
                        realTimeDataService.realTimeDataHandle(vo);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
    //请求的客户端(服务端只推送数据到请求的客户端)
    StringBuilder req = new StringBuilder();
    if (messageJson.containsKey("req")) {
      req.append(messageJson.get("req"));
    }
    //前端传参时间戳转换
    if (messageJson.containsKey("timestamp")) {
      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
    }
    // 实时数据上传太频繁且数据内容超过字段大小不记录日志
    if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){
      baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
    }
    switch (topic) {
      // 查询设备在线
      case MqttConstant.MOBILE_QUERY_EQU_STATU:
        System.err.println("admin收到" + topic);
        // 根据设备id查询设备mqtt在线状态
        String clientId = messageJson.getString("clientId");
        JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
        ThreadUtil.execute(() -> {
          if (client == null || client.isEmpty()) {
            JSONObject res = new JSONObject();
            res.put("success", false);
            res.put("msg", "查询失败");
            try {
              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
              sendMessage.setQos(0);
              mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
            } catch (Exception e) {
              e.printStackTrace();
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            }
            return;
          }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
                String tenantId = (String) messageJson.get("tenantId");
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
                //发送请求设备
                System.err.println("发送给:" + resTopic);
                sendMqttMessage(resTopic, mqMessage, 2);
          client.put("success", true);
          client.put("msg", "查询成功");
          try {
            MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
            sendMessage.setQos(0);
            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
            baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        break;
            }
            break;
            // 接收设备报警历史数据
            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                ThreadUtil.execute(() -> {
                    try {
                        MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
                        });
                        //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
                        System.err.println(faultMessage.toString());
                        faultRecordService.saveBatch(faultMessage.getData());
        // 接收设备实时数据
      case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA:
        try {
          RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class);
          realTimeDataService.realTimeDataHandle(vo);
        } catch (Exception e) {
          e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            case MqttConstant.TENANT_UP_PREFIX_EQU:
                ThreadUtil.execute(() -> {
                    try {
                        Object equObj = messageJson.get("equipment");
                        DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class);
                        TenantContext.setTenant(equipment.getTenantId() + "");
                        DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode());
                        if (dryEquipment == null) {
                            equipmentService.save(equipment);
                        }
                        Object typeObj = messageJson.get("eqpType");
                        DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class);
                        DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId());
                        if (dryEqpType == null) {
                            eqpTypeService.save(eqpType);
                        }
                        // 获取设备所属车间
                        Object shopObj = messageJson.get("shop");
                        DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class);
                        DryShop dryShop = dryShopService.getById(shop.getId());
                        if (dryShop == null) {
                            dryShopService.save(shop);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
        }
        break;
    }
    // 解析user角色指令
    private void parseUserCommand(String topic, MqttMessage mqttMessage) {
        String message = new String(mqttMessage.getPayload());
        JSONObject messageJson = JSONObject.parseObject(message);
        //请求的客户端(服务端只推送数据到请求的客户端)
        StringBuilder req = new StringBuilder();
        if (messageJson.containsKey("req")) {
            req.append(messageJson.get("req"));
        }
        //前端传参时间戳转换
        if (messageJson.containsKey("timestamp")) {
            messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
        }
        switch (topic) {
            case MqttConstant.MOBILE_REQ_EQU_CMD:
                System.err.println("user收到" + topic);
                System.err.println(message);
                ThreadUtil.execute(() -> {
                    //TODO 向PLC发送开关机操作,并返回信息
                    JSONObject res = new JSONObject();
                    res.put("success", true);
                    res.put("msg", "操作成功");
                    try {
                        MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
                        sendMessage.setQos(0);
                        mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
                        baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                break;
            case MqttConstant.SERVICE_REQ_PREFIX:
                log.debug("收到设备详细信息查询请求");
                ThreadUtil.execute(() -> {
                    String tenantId = messageJson.getString("tenantId");
                    String clientId = mqttUtil.getMqttClient().getClientId();
                    String tenant = clientId.substring(clientId.lastIndexOf("_") + 1);
                    if (tenantId != null && tenantId.equals(tenant)) {
                        TenantContext.setTenant(tenantId);
                        // 根据设备编码查询设备信息
                        String code = messageJson.getString("code");
                        DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code);
                        // 根据设备车间id查询车间信息
                        DryShop shop = dryShopService.getById(equipmentVo.getShopId());
                        // 根据设备类型ID查询设备类型信息
                        DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType());
                        JSONObject res = new JSONObject();
                        res.put("tenant", tenantId);
                        res.put("equipment", equipmentVo);
                        res.put("shop", shop);
                        res.put("eqpType", eqpType);
                        try {
                            MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
                            sendMessage.setQos(0);
                            mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage);
                            // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
                break;
        }
    }
  }
  // 解析user角色指令
  private void parseUserCommand(String topic, MqttMessage mqttMessage) {
    String message = new String(mqttMessage.getPayload());
    JSONObject messageJson = JSONObject.parseObject(message);
    //请求的客户端(服务端只推送数据到请求的客户端)
    StringBuilder req = new StringBuilder();
    if (messageJson.containsKey("req")) {
      req.append(messageJson.get("req"));
    }
    //前端传参时间戳转换
    if (messageJson.containsKey("timestamp")) {
      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
    }
    switch (topic) {
      case MqttConstant.MOBILE_REQ_EQU_CMD:
        System.err.println("user收到" + topic);
        System.err.println(message);
    /**
     * 发送消息
     *
     * @param topic     订阅
     * @param mqMessage 消息体
     * @param type      1-发送给租户   2-发送给固定id
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
        ThreadUtil.execute(() -> {
          //TODO 向PLC发送开关机操作,并返回信息
          JSONObject res = new JSONObject();
          res.put("success", true);
          res.put("msg", "操作成功");
          try {
            MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
            sendMessage.setQos(0);
            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
            baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
          } catch (Exception e) {
            e.printStackTrace();
          }
            try {
                if (type == 1) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
                } else if (type == 2) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(topic, sendMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        break;
    }
  }
}