干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2024-12-11 5dd889b470543bed7564054cdfcd750b1d9316cb
添加移动端实时故障和历史故障接口
已修改4个文件
317 ■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java 216 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 85 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -32,6 +32,9 @@
  //移动端远程请求指令
  String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd";
  //移动端请求查询一次设备实时故障告警
  String MOBILE_REQ_EQU_REAL_FAULT = MOBILE_UP_PREFIX + "/req/real/fault";
  /**************************移动端向服务端请求指令end*******************************/
@@ -57,7 +60,9 @@
  String SERVICE_BROADCAST_PREFIX = "service/broadcast";
  //服务端向各租户客户端发送实时故障广播
  String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"  ;
  String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s";
  //服务端向移动端回复一次设备实时故障告警
  String  SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s";
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
@@ -13,16 +13,21 @@
@Data
public class DryFaultRecordVo extends DryFaultRecord implements Serializable {
    private static final long serialVersionUID = 1L;
    //redis故障结束技术
    //redis故障结束记数
    private Integer eCount;
    //设备名称
    private String equName;
    //租户名称
    private String tenantName;
    //故障时间
    private String faultTimeStr;
    public DryFaultRecordVo() {
    }
    public DryFaultRecordVo(DryFaultRecord record, Integer count) {
        super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime());
        this.eCount = count;
    }
    public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) {
        super(orderId, tenantId, faultName, faultType, startTime, endTime);
        this.eCount = eCount;
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -8,6 +8,7 @@
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.MqttConstant;
@@ -19,8 +20,11 @@
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.entity.DryFaultRecord;
import org.jeecg.modules.dry.entity.DryOrder;
import org.jeecg.modules.dry.service.IDryEquipmentService;
import org.jeecg.modules.dry.service.IDryFaultRecordService;
import org.jeecg.modules.dry.service.IDryOrderService;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.MoEquVo;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -30,6 +34,8 @@
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
@@ -40,71 +46,169 @@
@RequestMapping("/mobile")
@Slf4j
public class MobileController {
  @Autowired
  private IDryEquipmentService dryEquipmentService;
  @Autowired
  private IDryFaultRecordService faultRecordService;
  @Autowired
  private RedisUtil redisUtil;
    @Autowired
    private IDryEquipmentService dryEquipmentService;
    @Autowired
    private IDryFaultRecordService faultRecordService;
    @Autowired
    private RedisUtil redisUtil;
    @Autowired
    private IDryOrderService orderService;
  @ApiOperation(value = "设备列表查询", notes = "设备列表查询")
  @GetMapping(value = "/equ/list")
  public Result<IPage<MoEquVo>> queryPageList(DryEquipment dryEquipment, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) {
    //------------------------------------------------------------------------------------------------
    //是否开启系统管理模块的多租户数据隔离【SAAS多租户模式】
    if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) {
      dryEquipment.setTenantId(oConvertUtils.getInt(TenantContext.getTenant(), 0));
    @ApiOperation(value = "设备列表查询", notes = "设备列表查询")
    @GetMapping(value = "/equ/list")
    public Result<IPage<MoEquVo>> queryPageList(DryEquipment dryEquipment, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) {
        //------------------------------------------------------------------------------------------------
        //是否开启系统管理模块的多租户数据隔离【SAAS多租户模式】
        if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) {
            dryEquipment.setTenantId(oConvertUtils.getInt(TenantContext.getTenant(), 0));
        }
        //------------------------------------------------------------------------------------------------
        QueryWrapper<DryEquipment> queryWrapper = QueryGenerator.initQueryWrapper(dryEquipment, req.getParameterMap());
        Page<DryEquipment> page = new Page<DryEquipment>(pageNo, pageSize);
        Page<MoEquVo> voPage = new Page<MoEquVo>(pageNo, pageSize);
        IPage<DryEquipment> pageList = dryEquipmentService.page(page, queryWrapper);
        compEqu(pageList, voPage);
        return Result.OK(voPage);
    }
    //------------------------------------------------------------------------------------------------
    QueryWrapper<DryEquipment> queryWrapper = QueryGenerator.initQueryWrapper(dryEquipment, req.getParameterMap());
    Page<DryEquipment> page = new Page<DryEquipment>(pageNo, pageSize);
    Page<MoEquVo> voPage = new Page<MoEquVo>(pageNo, pageSize);
    IPage<DryEquipment> pageList = dryEquipmentService.page(page, queryWrapper);
    @ApiOperation(value = "设备报警数据", notes = "设备报警数据列表查询")
    @GetMapping(value = "/fault/list")
    public Result<IPage<DryFaultRecordVo>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) {
        int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
        //是否开启系统管理模块的多租户数据隔离【SAAS多租户模式】
        if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) {
            faultRecord.setTenantId(tenantId);
        }
        QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
        Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize);
        IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper);
        Page<DryFaultRecordVo> voPage = new Page<DryFaultRecordVo>(pageNo, pageSize);
        compFault(pageList, voPage);
        return Result.OK(voPage);
    }
    private void compFault(IPage<DryFaultRecord> pageList, Page<DryFaultRecordVo> page) {
        List<DryFaultRecordVo> collect = pageList.getRecords().stream().filter(item -> item.getOrderId() != null).map(item -> {
            DryFaultRecordVo vo = new DryFaultRecordVo();
            BeanUtils.copyProperties(item, vo);
            String orderCode = item.getOrderId();
            QueryWrapper<DryOrder> orderQueryWrapper = new QueryWrapper<>();
            orderQueryWrapper.lambda().eq(DryOrder::getCode, orderCode);
            DryOrder order = orderService.getOne(orderQueryWrapper);
            if (vo.getStartTime() != null && vo.getEndTime() != null) {
                String faultTimeStr = calculateTimeDifference(DateUtils.date2Str(vo.getStartTime(), DateUtils.datetimeFormat.get()), DateUtils.date2Str(vo.getEndTime(), DateUtils.datetimeFormat.get()));
                vo.setFaultTimeStr(faultTimeStr);
            }
            if (order == null) return vo;
            String equId = order.getEquId();
            if (equId == null) return vo;
            DryEquipment equipment = dryEquipmentService.getById(equId);
            if (equipment == null || equipment.getName() == null) return vo;
            vo.setEquName(equipment.getName().substring(0, 2));
            return vo;
        }).collect(Collectors.toList());
        BeanUtils.copyProperties(pageList, page);
        page.setRecords(collect);
    }
    comp(pageList, voPage);
    @ApiOperation(value = "报警图标数据", notes = "报警图标数据查询")
    @GetMapping(value = "/fault/chart")
    public List<DryFaultRecordVo> queryFaultChartData(DryFaultRecord faultRecord, HttpServletRequest req) {
        int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
        //是否开启系统管理模块的多租户数据隔离【SAAS多租户模式】
        if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) {
            faultRecord.setTenantId(tenantId);
        }
        QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
        List<DryFaultRecord> faultList = faultRecordService.list(queryWrapper);
        List<DryFaultRecordVo> result = new ArrayList<>(faultList.stream()
                .collect(Collectors.groupingBy(DryFaultRecord::getFaultName,
                        Collectors.collectingAndThen(Collectors.toList(), list ->
                                new DryFaultRecordVo(list.get(0), list.size()))))
                .values())
                .stream()
                .sorted(Comparator.comparingInt(DryFaultRecordVo::getECount).reversed())
                .collect(Collectors.toList());
        return result;
    }
    return Result.OK(voPage);
  }
  @ApiOperation(value = "设备报警数据", notes = "设备报警数据列表查询")
  @GetMapping(value = "/fault/list")
  public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){
    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
    QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
    Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize);
    IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper);
    return Result.OK(pageList);
  }
    //此接口仅用作uniapp下拉返回,无实际意义
    @GetMapping(value = "/sample/list")
    public Result<IPage<Object>> querySampleList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) {
        Page<Object> page = new Page<Object>(pageNo, pageSize);
        page.setRecords(Arrays.asList(""));
        page.setSize(1);
        page.setCurrent(1);
        return Result.OK(page);
    }
    private void compEqu(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
        //当前租户id
        int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
        List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
            MoEquVo vo = new MoEquVo();
            BeanUtils.copyProperties(item, vo);
            String clientid = "client-" + tenantId + "-" + item.getCode();
            //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId);
            JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
            //组装状态数据
            if (client != null) {
                vo.setOnline(true);
                //连接时间
                String st = client.getString("connectedAt");
                vo.setUpTime(st);
                vo.setClientId(clientid);
            }
            return vo;
        }).collect(Collectors.toList());
        //排序
        collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
        collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
        BeanUtils.copyProperties(pageList, page);
        page.setRecords(collect);
    }
  private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
    //当前租户id
    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
    List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
      MoEquVo vo = new MoEquVo();
      BeanUtils.copyProperties(item, vo);
      String clientid = "client-" + tenantId + "-" + item.getCode();
      //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId);
      JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid);
      //组装状态数据
      if (client != null) {
        vo.setOnline(true);
        //连接时间
        String st = client.getString("connectedAt");
        vo.setUpTime(st);
        vo.setClientId(clientid);
      }
      return vo;
    }).collect(Collectors.toList());
    //排序
    collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
    collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
    BeanUtils.copyProperties(pageList, page);
    page.setRecords(collect);
  }
    /**
     * @param startTimeStr
     * @param endTimeStr
     * @return
     */
    private String calculateTimeDifference(String startTimeStr, String endTimeStr) {
        // 定义时间格式
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        // 将字符串解析为 LocalDateTime 对象
        LocalDateTime startTime = LocalDateTime.parse(startTimeStr, formatter);
        LocalDateTime endTime = LocalDateTime.parse(endTimeStr, formatter);
        // 计算两个时间点之间的持续时间
        Duration duration = Duration.between(startTime, endTime);
        // 获取小时、分钟和秒数
        long hours = duration.toHours();
        long minutes = duration.toMinutes() % 60;
        long seconds = duration.getSeconds() % 60;
        StringBuilder result = new StringBuilder();
        if (hours > 0) {
            result.append(hours).append("时");
        }
        if (minutes > 0) {
            result.append(minutes).append("分");
        }
        if (seconds > 0) {
            result.append(seconds).append("秒");
        }
        return result.toString();
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -85,7 +85,7 @@
                JSONObject messageJson = JSONObject.parseObject(message);
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
                    if (client == null) {
                        JSONObject item = new JSONObject();
                        //username
@@ -108,7 +108,7 @@
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
                        System.err.println(String.format("设备: %s上线", clientid));
                    }
@@ -116,7 +116,7 @@
                if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
                    try {
                        String clientid = messageJson.getString("clientid");
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
                        System.err.println(String.format("设备: %s下线", clientid));
                    } catch (Exception e) {
                        e.printStackTrace();
@@ -163,7 +163,7 @@
        }
        // 实时数据上传太频繁且数据内容超过字段大小不记录日志
        if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) {
           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
            // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
        }
        switch (topic) {
@@ -172,7 +172,7 @@
                System.err.println("admin收到" + topic);
                // 根据设备id查询设备mqtt在线状态
                String clientId = messageJson.getString("clientId");
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
                ThreadUtil.execute(() -> {
@@ -216,13 +216,14 @@
                break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: {
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
                Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData();
                //租户id
                String tentId = realFaultMessage.getTentId();
                String tenantId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
@@ -230,21 +231,51 @@
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if(dryFaultMap.isEmpty()){
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
                break;
            // 接收设备报警数据
            }
            break;
            //移动端主动请求设备实时故障数据(用于页面刚打开时拉取一次数据)
            case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: {
                String tenantId = (String) messageJson.get("tenantId");
                if (req.toString().isEmpty() || tenantId == null) {
                    return;
                }
                Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId));
                //转换为 Map<String, DryFaultRecordVo>
                Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo) entry.getValue()
                        ));
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic);
                //发送请求设备
                System.err.println("发送给:" + resTopic);
                sendMqttMessage(resTopic, mqMessage, 2);
            }
            break;
            // 接收设备报警历史数据
            case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
                ThreadUtil.execute(() -> {
                    try {
@@ -372,21 +403,29 @@
        }
    }
    /**
     * 发送消息
     * @param topic       订阅
     * @param mqMessage   消息体
     *
     * @param topic     订阅
     * @param mqMessage 消息体
     * @param type      1-发送给租户   2-发送给固定id
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage){
    private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) {
        ThreadUtil.execute(() -> {
            try {
                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                sendMessage.setQos(0);
                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
            }catch (Exception e){
                if (type == 1) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
                } else if (type == 2) {
                    MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                    sendMessage.setQos(0);
                    mqttUtil.getMqttClient().publish(topic, sendMessage);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        });