干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2024-11-27 320c0c10a90140627b10a6fcf498e79d09785da6
添加mqtt数据接口
已修改5个文件
161 ■■■■ 文件已修改
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 73 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -31,10 +31,9 @@
  String MOBILE_QUERY_EQU_STATU = MOBILE_UP_PREFIX + "/query/equ/statu";
  //移动端远程请求指令
  String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd";
  /**************************移动端向服务端请求指令end*******************************/
@@ -47,7 +46,26 @@
  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
  //返回移动端远程请求指令
  String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
  /**************************服务端向移动端响应指令end*******************************/
  /**************************服务端向移动端发送广播start*******************************/
  //广播类型推送无关移动端设备id,向所有在线移动端发送
  String SERVICE_BROADCAST_PREFIX = "service/broadcast";
  //服务端向各租户客户端发送实时故障广播
  String  SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"  ;
  /**************************服务端向移动端发送广播end*******************************/
  /**************************租户端向服务端发送数据start*******************************/
@@ -80,9 +98,9 @@
  //service(cloud)
  //在线客户端
  String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
  String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
  //所有租户的实时报警(%s:租户id)
  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:";
  String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s";
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -18,7 +18,9 @@
import org.jeecg.config.mybatis.MybatisPlusSaasConfig;
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.entity.DryFaultRecord;
import org.jeecg.modules.dry.service.IDryEquipmentService;
import org.jeecg.modules.dry.service.IDryFaultRecordService;
import org.jeecg.modules.dry.vo.MoEquVo;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +42,8 @@
public class MobileController {
  @Autowired
  private IDryEquipmentService dryEquipmentService;
  @Autowired
  private IDryFaultRecordService faultRecordService;
  @Autowired
  private RedisUtil redisUtil;
@@ -64,18 +68,27 @@
    return Result.OK(voPage);
  }
  @ApiOperation(value = "设备报警数据", notes = "设备报警数据列表查询")
  @GetMapping(value = "/fault/list")
  public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){
    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
    QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap());
    Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize);
    IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper);
    return Result.OK(pageList);
  }
  private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
    //当前租户id
    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
    List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
      MoEquVo vo = new MoEquVo();
      BeanUtils.copyProperties(item, vo);
      String clientid = "client-" + tenantId + "-" + item.getCode();
      JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
      //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId);
      JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid);
      //组装状态数据
      if (client != null) {
        vo.setOnline(true);
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
@@ -10,6 +10,7 @@
public class MqMessage<T> {
    private T data;
    private String tentId;
    private String topic;
    public MqMessage() {
@@ -19,4 +20,18 @@
        this.data = data;
        this.tentId = tentId;
    }
    public MqMessage(T data, String tentId,String topic) {
        this.data = data;
        this.tentId = tentId;
        this.topic = topic;
    }
    @Override
    public String toString() {
        return "MqMessage{" +
                "data=" + data +
                ", tentId='" + tentId + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.*;
@@ -44,6 +45,8 @@
  private RedisUtil redisUtil;
  @Autowired
  private EmqxApi emqxApi;
  @Autowired
  private RedisTemplate redisTemplate;
  @Bean
@@ -98,6 +101,8 @@
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            // 订阅租户报警数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
@@ -153,7 +158,14 @@
   * 服务端(admin角色)启动时查询所有设备并缓存到redis
   */
  private void initClients() {
    redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT);
    //初始化时先删除所有在线设备
    Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
    if (keys != null && !keys.isEmpty()) {
      keys.forEach(key -> System.out.println("初始化删除在线设备: " + key));
      redisTemplate.delete(keys);
    } else {
      System.out.println("初始化无在线设备: " + MqttConstant.MQTT_ONLINE_CLIENT);
    }
    JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
    //TODO 根据emqx返回编写实体类
@@ -180,13 +192,15 @@
          item.put("type", info[0]);
          item.put("tenantId", info[1]);
          //item.put("code", info[2]);
          if (connected) {
            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
          }
        }catch (Exception e){
          e.printStackTrace();
        }
        if (connected) {
          redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
        }
      }
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -22,13 +23,17 @@
import org.jeecg.modules.dry.entity.DryShop;
import org.jeecg.modules.dry.service.*;
import org.jeecg.modules.dry.vo.DryEquipmentVo;
import org.jeecg.modules.dry.vo.DryFaultRecordVo;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Component
@@ -62,8 +67,6 @@
  private IDryFaultRecordService faultRecordService;
  @Override
  public void connectionLost(Throwable throwable) {
    System.err.println("连接断开::掉线");
@@ -82,7 +85,7 @@
        JSONObject messageJson = JSONObject.parseObject(message);
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
          JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
                    JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
          if (client == null) {
            JSONObject item = new JSONObject();
            //username
@@ -105,7 +108,7 @@
            }catch (Exception e){
              e.printStackTrace();
            }
            redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
                        redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item);
            System.err.println(String.format("设备: %s上线", clientid));
          }
@@ -113,7 +116,7 @@
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
          try {
            String clientid = messageJson.getString("clientid");
            redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
                        redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]),  clientid);
            System.err.println(String.format("设备: %s下线", clientid));
          } catch (Exception e) {
            e.printStackTrace();
@@ -160,7 +163,7 @@
    }
    // 实时数据上传太频繁且数据内容超过字段大小不记录日志
    if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){
      baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
           // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
    }
    switch (topic) {
@@ -169,7 +172,7 @@
        System.err.println("admin收到" + topic);
        // 根据设备id查询设备mqtt在线状态
        String clientId = messageJson.getString("clientId");
        JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
                JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId);
        ThreadUtil.execute(() -> {
@@ -212,15 +215,44 @@
        });
        break;
            //各租户上传的实时报警数据
            case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA:
                MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() {
                });
                //故障数据
                Map<String, DryFaultRecordVo> dryFaultMap =  realFaultMessage.getData();
                //租户id
                String tentId = realFaultMessage.getTentId();
                //收到租户实时报警数据存入redis
                //转换为 Map<String, Object>
                Map<String, Object> objectMap = dryFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                entry -> (Object) entry.getValue()
                        ));
                redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap);
                //广播发送给各租户下移动设备
                if(dryFaultMap.isEmpty()){
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic);
                //发送广播
                System.err.println("广播给:" + recTopic);
                sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage);
                break;
      // 接收设备报警数据
      case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA:
        ThreadUtil.execute(() -> {
          try {
            MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
                        MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() {
            });
         //   List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class);
            System.err.println(listMqMessage.toString());
            faultRecordService.saveBatch(listMqMessage.getData());
                        System.err.println(faultMessage.toString());
                        faultRecordService.saveBatch(faultMessage.getData());
          } catch (Exception e) {
            e.printStackTrace();
@@ -335,11 +367,30 @@
        });
        break;
    }
  }
    /**
     * 发送消息
     * @param topic       订阅
     * @param mqMessage   消息体
     */
    private void sendMqttMessage(String topic, MqMessage mqMessage){
        ThreadUtil.execute(() -> {
            try {
                MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes());
                sendMessage.setQos(0);
                mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage);
            }catch (Exception e){
                e.printStackTrace();
            }
        });
    }
}