干燥机配套车间生产管理系统/云平台服务端
zhuguifei
3 天以前 b38019aae593a66c16f7e75d6e37d14eb8d2c42e
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
old mode 100644 new mode 100755
@@ -15,6 +15,8 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.*;
@@ -35,6 +37,8 @@
  private String mqttClientId;
  @Value(value = "${jeecg.mqtt.role}")
  private String role;
  @Value(value = "${jeecg.mqtt.enable}")
  private boolean enable;
  @Autowired
  private MqttSampleCallback mqttSampleCallback;
@@ -44,6 +48,8 @@
  private RedisUtil redisUtil;
  @Autowired
  private EmqxApi emqxApi;
  @Autowired
  private RedisTemplate redisTemplate;
  @Bean
@@ -56,6 +62,7 @@
   * mqtt连接配置
   */
  private void conn() {
    if (!isEnable()) return;
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions mqttConnOpt = new MqttConnectOptions();
    mqttConnOpt.setUserName(mqttName);
@@ -95,8 +102,15 @@
            mqttClient.subscribe(MqttConstant.MOBILE_UP);
            System.err.println("admin订阅" + MqttConstant.MOBILE_UP);
            // 订阅租户实时数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP);
            // 订阅租户报警数据
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA);
            mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU);
            System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_EQU);
            initClients();
            break;
@@ -104,7 +118,7 @@
          case "user":
            //普通客户端只需订阅自身相关消息
            mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
            mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX + "/" + mqttClientId.substring(mqttClientId.lastIndexOf("_")+1) + "/#");
            mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX);
            System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
            break;
@@ -122,13 +136,14 @@
  //重连
  private void reconn() {
    if (!isEnable()) return;
    Timer timer = new Timer();
    TimerTask task = new TimerTask() {
      @Override
      public void run() {
        // 在这里编写定时执行的任务逻辑
        System.out.println("定时任务执行:" + new java.util.Date());
        //System.out.println("定时任务执行:" + new java.util.Date());
        if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) {
          try {
            conn();
@@ -148,7 +163,14 @@
   * 服务端(admin角色)启动时查询所有设备并缓存到redis
   */
  private void initClients() {
    redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT);
    //初始化时先删除所有在线设备
    Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*"));
    if (keys != null && !keys.isEmpty()) {
      keys.forEach(key -> System.out.println("初始化删除在线设备: " + key));
      redisTemplate.delete(keys);
    } else {
      System.out.println("初始化无在线设备: " + MqttConstant.MQTT_ONLINE_CLIENT);
    }
    JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
    //TODO 根据emqx返回编写实体类
@@ -157,29 +179,37 @@
      for (int i = 0; i < data.size(); i++) {
        JSONObject obj = data.getJSONObject(i);
        JSONObject item = new JSONObject();
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //TODO 校验租户id是否存在
        if(!clientid.matches("^[^-]+-[^-]+-[^-]+$"))  continue;
        //username
        item.put("username", obj.get("username"));
        //连接时间
        String st = obj.getString("connected_at");
        String upTime = DateUtils.zone2Str(st);
        item.put("connectedAt", upTime);
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //是否连接
        Boolean connected = obj.getBoolean("connected");
        item.put("connected", connected);
        //
        String[] info = clientid.split("-");
        item.put("type", info[0]);
        item.put("tenantId", info[1]);
        item.put("code", info[2]);
        //根据clientid解析(注意配置文件中clientid格式  例:client-1000)
        try {
          String[] info = clientid.split("-");
          item.put("type", info[0]);
          item.put("tenantId", info[1]);
          item.put("code", info[2]);
        if (connected) {
          redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
          if (connected) {
            redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item);
          }
        }catch (Exception e){
          e.printStackTrace();
        }
      }
    }
  }