干燥机配套车间生产管理系统/云平台服务端
guifei zhu
2024-08-21 2297b10a86a233155006297d451b142d3f7ce3db
mqtt初步实现
已添加8个文件
已修改6个文件
889 ■■■■■ 文件已修改
.gitignore 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/pom.xml 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DateUtils.java 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/service/BaseCommonService.java 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/service/impl/BaseCommonServiceImpl.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/api/EmqxApi.java 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/api/EmqxApi.txt 180 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/MoEquVo.java 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java 182 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java 199 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -32,3 +32,8 @@
/.vscode/
/.history/
/svn clear.bat
/target/
**/target
**/logs
jeecg-boot-base-core/pom.xml
@@ -252,6 +252,13 @@
            <groupId>commons-fileupload</groupId>
            <artifactId>commons-fileupload</artifactId>
        </dependency>
    <!--MQTT client -->
    <dependency>
      <groupId>org.eclipse.paho</groupId>
      <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
      <version>1.2.5</version>
    </dependency>
    </dependencies>
</project>
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java
@@ -35,6 +35,10 @@
     * ç³»ç»Ÿæ—¥å¿—类型: æ“ä½œ
     */
    int LOG_TYPE_2 = 2;
  /**
   * MQTT日志
   */
  int LOG_TYPE_MQTT = 100;
    /**
     * æ“ä½œæ—¥å¿—类型: æŸ¥è¯¢
@@ -67,6 +71,15 @@
    int OPERATE_TYPE_6 = 6;
    
    
  /**
   * MQTT日志
   * 100-订阅
   * 200-发布
   */
  int OPERATE_MQTT_1 = 100;
  int OPERATE_MQTT_2 = 200;
    /** {@code 500 Server Error} (HTTP/1.0 - RFC 1945) */
    Integer SC_INTERNAL_SERVER_ERROR_500 = 500;
    /** {@code 200 OK} (HTTP/1.0 - RFC 1945) */
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,45 @@
package org.jeecg.common.constant;
/**
 * Mqtt指令常量
 */
public interface MqttConstant {
  /**************************系统订阅strat*******************************/
  //上线订阅
  String MQTT_TOPIC_ONLINE = "$SYS/brokers/+/clients/+/connected";
  //下线订阅
  String MQTT_TOPIC_OFFLINE = "$SYS/brokers/+/clients/+/disconnected";
  /**************************系统订阅end*******************************/
  /**************************移动端向服务端请求指令start*******************************/
  //移动端相关
  String MOBILE_UP = "mobile/up/#";
  //查询设备状态
  //移动端上行指令前缀(移动端请求)
  String MOBILE_UP_PREFIX = "mobile/up";
  //请求查询设备状态
  String MOBILE_QUERY_EQU_STATU = MOBILE_UP_PREFIX + "/query/equ/statu";
  //移动端远程请求指令
  String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd";
  /**************************移动端向服务端请求指令end*******************************/
  /**************************服务端向移动端响应指令start*******************************/
  //返回数据仅返回给请求的客户端 %s发起请求客户端id
  //服务端下行指令前缀(返回给移动端)
  String SERVICE_DOWN_PREFIX = "service/down/res";
  //返回移动端查询设备状态
  String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
  //返回移动端远程请求指令
  String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
  /**************************服务端向移动端响应指令end*******************************/
  //redis缓存
  String MQTT_ONLINE_CLIENT = "mqtt:online:client::";
}
jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DateUtils.java
@@ -8,6 +8,8 @@
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
@@ -74,6 +76,7 @@
    /**
     * æŒ‡å®šæ¨¡å¼çš„æ—¶é—´æ ¼å¼
   *
     * @param pattern
     * @return
     */
@@ -140,6 +143,20 @@
        }
        return date2Str(date_sdf.get());
    }
  /**
   * æ—¶é—´æˆ³è½¬æ¢ä¸ºå­—符串
   *
   * @param time
   * @return
   */
  public static String timestamptoStr(Long time, SimpleDateFormat sdf) {
    Date date = null;
    if (null != time) {
      date = new Date(time);
    }
    return date2Str(date, sdf);
  }
    /**
     * å­—符串转换时间戳
@@ -673,6 +690,7 @@
    /**
     * å°†å­—符串转成时间
   *
     * @param str
     * @return
     */
@@ -743,6 +761,24 @@
    }
    /**
   * ç‰¹æ®Šæ—¶é—´str转正常str
   *
   * @param st 2024-08-14T01:07:36.761+00:00
   * @return
   */
  public static String zone2Str(String st) {
    // è§£æžå­—符串为ZonedDateTime对象
    ZonedDateTime zonedDateTime = ZonedDateTime.parse(st);
    // è½¬æ¢ä¸ºä¸­å›½æ ‡å‡†æ—¶é—´ï¼ˆUTC+8)
    ZonedDateTime cstDateTime = zonedDateTime.withZoneSameInstant(java.time.ZoneOffset.ofHours(8));
    // å®šä¹‰æƒ³è¦çš„输出格式
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    // æ ¼å¼åŒ–为想要的字符串
    String str = cstDateTime.format(formatter);
    return str;
  }
  /**
     * åˆ¤æ–­ä¸¤ä¸ªæ—¶é—´æ˜¯å¦æ˜¯åŒä¸€å¹´
     *
     * @param date1
jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/service/BaseCommonService.java
@@ -32,4 +32,12 @@
     */
    void addLog(String logContent, Integer logType, Integer operateType);
  /**
   * ä¿å­˜æ—¥å¿—
   * @param logContent
   * @param logType
   * @param operateType
   */
  void addLog(String logContent, Integer logType, Integer operateType,Integer tenantId,String methods);
}
jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/service/impl/BaseCommonServiceImpl.java
@@ -85,6 +85,34 @@
        addLog(logContent, logType, operateType, null);
    }
  @Override
  public void addLog(String logContent, Integer logType, Integer operatetype, Integer tenantId, String methods) {
    LogDTO sysLog = new LogDTO();
    sysLog.setId(String.valueOf(IdWorker.getId()));
    //注解上的描述,操作日志内容
    sysLog.setLogContent(logContent);
    sysLog.setLogType(logType);
    sysLog.setOperateType(operatetype);
    sysLog.setMethod(methods);
    sysLog.setTenantId(tenantId);
    try {
      //获取request
      HttpServletRequest request = SpringContextUtils.getHttpServletRequest();
      //设置IP地址
      sysLog.setIp(IpUtils.getIpAddr(request));
    } catch (Exception e) {
      sysLog.setIp("127.0.0.1");
    }
    sysLog.setCreateTime(new Date());
    //保存日志(异常捕获处理,防止数据太大存储失败,导致业务失败)JT-238
    try {
      baseCommonMapper.saveLog(sysLog);
    } catch (Exception e) {
      log.warn(" LogContent length : " + sysLog.getLogContent().length());
      log.warn(e.getMessage());
    }
  }
}
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/api/EmqxApi.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,53 @@
package org.jeecg.modules.dry.api;
import com.alibaba.fastjson.JSONObject;
import okhttp3.Credentials;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
 * emqx rest api
 * å…¥å‚出参参考:https://docs.emqx.com/zh/emqx/v5.7/admin/api-docs.html
 */
@Component
public class EmqxApi {
  @Value(value = "${jeecg.mqtt.emqx_api_key}")
  private String emqxApiKey;
  @Value(value = "${jeecg.mqtt.emqx_secret_key}")
  private String emqxSecretKey;
  @Value(value = "${jeecg.mqtt.emqx_base}")
  private String emqxBaseUrl;
  //所有客户端
  public static final String CMD_CLIENTS = "/clients";
  //查询一个客户端状态
  public static final String CMD_CLIENTS_CLIENT = "/clients/%s";
  /**
   * EMQX接口
   *
   * @return
   */
  public JSONObject queryEmqx(String cmd) {
    try {
      OkHttpClient client = new OkHttpClient();
      Request request = new Request.Builder()
        .url(emqxBaseUrl + cmd)
        .header("Content-Type", "application/json")
        .header("Authorization", Credentials.basic(emqxApiKey, emqxSecretKey))
        .build();
      Response response = client.newCall(request).execute();
      String res = response.body().string();
      return JSONObject.parseObject(res);
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
}
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/api/EmqxApi.txt
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,180 @@
//接口返回数据示例
1.clients:
{
    "data": [
        {
            "recv_msg.qos0": 0,
            "recv_oct": 603,
            "recv_msg.qos2": 0,
            "clean_start": true,
            "enable_authn": true,
            "send_msg.dropped.queue_full": 0,
            "proto_name": "MQTT",
            "listener": "tcp:default",
            "send_msg.qos0": 28,
            "user_property": {},
            "keepalive": 60,
            "username": "tongjitang",
            "proto_ver": 5,
            "node": "emqx@172.17.0.2",
            "mqueue_dropped": 0,
            "send_cnt": 243,
            "ip_address": "172.17.0.1",
            "created_at": "2024-08-07T01:23:29.226+00:00",
            "auth_expire_at": null,
            "recv_msg.qos1": 0,
            "durable": false,
            "is_bridge": false,
            "recv_msg.dropped": 0,
            "subscriptions_cnt": 5,
            "send_oct": 5886,
            "expiry_interval": 0,
            "heap_size": 610,
            "awaiting_rel_cnt": 0,
            "connected_at": "2024-08-07T01:23:29.226+00:00",
            "clientid": "1000_GM003",
            "inflight_cnt": 0,
            "port": 38422,
            "send_msg.dropped.expired": 0,
            "send_msg": 28,
            "mailbox_len": 0,
            "mountpoint": null,
            "send_msg.qos2": 0,
            "recv_msg.dropped.await_pubrel_timeout": 0,
            "recv_msg": 0,
            "send_msg.qos1": 0,
            "send_msg.dropped.too_large": 0,
            "send_msg.dropped": 0,
            "subscriptions_max": "infinity",
            "send_pkt": 243,
            "inflight_max": 32,
            "recv_pkt": 215,
            "client_attrs": {},
            "connected": true,
            "reductions": 204416,
            "awaiting_rel_max": 100,
            "is_persistent": false,
            "mqueue_max": 1000,
            "mqueue_len": 0,
            "recv_cnt": 215,
            "is_expired": false
        },
        {
            "recv_msg.qos0": 0,
            "recv_oct": 468,
            "recv_msg.qos2": 0,
            "clean_start": true,
            "enable_authn": true,
            "send_msg.dropped.queue_full": 0,
            "proto_name": "MQTT",
            "listener": "tcp:default",
            "send_msg.qos0": 0,
            "user_property": {},
            "keepalive": 60,
            "username": "tongjitang",
            "proto_ver": 5,
            "node": "emqx@172.17.0.2",
            "mqueue_dropped": 0,
            "send_cnt": 210,
            "ip_address": "172.17.0.1",
            "created_at": "2024-08-07T01:23:50.589+00:00",
            "auth_expire_at": null,
            "recv_msg.qos1": 0,
            "durable": false,
            "is_bridge": false,
            "recv_msg.dropped": 0,
            "subscriptions_cnt": 0,
            "send_oct": 442,
            "expiry_interval": 0,
            "heap_size": 610,
            "awaiting_rel_cnt": 0,
            "connected_at": "2024-08-07T01:23:50.589+00:00",
            "clientid": "1000_GM002",
            "inflight_cnt": 0,
            "port": 38424,
            "send_msg.dropped.expired": 0,
            "send_msg": 0,
            "mailbox_len": 0,
            "mountpoint": null,
            "send_msg.qos2": 0,
            "recv_msg.dropped.await_pubrel_timeout": 0,
            "recv_msg": 0,
            "send_msg.qos1": 0,
            "send_msg.dropped.too_large": 0,
            "send_msg.dropped": 0,
            "subscriptions_max": "infinity",
            "send_pkt": 210,
            "inflight_max": 32,
            "recv_pkt": 210,
            "client_attrs": {},
            "connected": true,
            "reductions": 171935,
            "awaiting_rel_max": 100,
            "is_persistent": false,
            "mqueue_max": 1000,
            "mqueue_len": 0,
            "recv_cnt": 210,
            "is_expired": false
        },
        {
            "recv_msg.qos0": 1,
            "recv_msg.qos2": 0,
            "clean_start": false,
            "enable_authn": true,
            "send_msg.dropped.queue_full": 0,
            "proto_name": "MQTT",
            "listener": "tcp:default",
            "send_msg.qos0": 0,
            "user_property": {},
            "keepalive": 10,
            "username": "tongjitang",
            "proto_ver": 4,
            "node": "emqx@172.17.0.2",
            "mqueue_dropped": 0,
            "ip_address": "172.17.0.1",
            "created_at": "2024-08-07T01:23:13.724+00:00",
            "auth_expire_at": null,
            "recv_msg.qos1": 0,
            "durable": false,
            "is_bridge": false,
            "recv_msg.dropped": 0,
            "subscriptions_cnt": 1,
            "disconnected_at": "2024-08-07T04:52:44.267+00:00",
            "expiry_interval": 7200,
            "heap_size": 610,
            "awaiting_rel_cnt": 0,
            "connected_at": "2024-08-07T02:41:51.865+00:00",
            "clientid": "1000_GM001",
            "inflight_cnt": 0,
            "port": 38454,
            "send_msg.dropped.expired": 0,
            "send_msg": 0,
            "mailbox_len": 0,
            "mountpoint": null,
            "send_msg.qos2": 0,
            "recv_msg.dropped.await_pubrel_timeout": 0,
            "recv_msg": 1,
            "send_msg.qos1": 0,
            "send_msg.dropped.too_large": 0,
            "send_msg.dropped": 0,
            "subscriptions_max": "infinity",
            "send_pkt": 786,
            "inflight_max": 32,
            "recv_pkt": 787,
            "client_attrs": {},
            "connected": false,
            "reductions": 386694,
            "awaiting_rel_max": 100,
            "is_persistent": true,
            "mqueue_max": 1000,
            "mqueue_len": 0,
            "is_expired": false
        }
    ],
    "meta": {
        "count": 3,
        "limit": 100,
        "page": 1,
        "hasnext": false
    }
}
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/MoEquVo.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,24 @@
package org.jeecg.modules.dry.vo;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.springframework.beans.BeanUtils;
/**
 * ç§»åŠ¨ç«¯-干燥机
 */
@EqualsAndHashCode(callSuper = true)
@Data
public class MoEquVo extends DryEquipment {
  //设备是否在线
  private Boolean online;
  //设备工作状态
  private Integer statu;
  //开机时间
  private String upTime;
  //MQTT客户端id
  private String clientId;
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,97 @@
package org.jeecg.modules.dry.controller;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.TenantContext;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.config.mybatis.MybatisPlusSaasConfig;
import org.jeecg.modules.dry.api.EmqxApi;
import org.jeecg.modules.dry.entity.DryEquipment;
import org.jeecg.modules.dry.service.IDryEquipmentService;
import org.jeecg.modules.dry.vo.MoEquVo;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@Api(tags = "移动端")
@RestController
@RequestMapping("/mobile")
@Slf4j
public class MobileController {
  @Autowired
  private IDryEquipmentService dryEquipmentService;
  @Autowired
  private RedisUtil redisUtil;
  @ApiOperation(value = "设备列表查询", notes = "设备列表查询")
  @GetMapping(value = "/equ/list")
  public Result<IPage<MoEquVo>> queryPageList(DryEquipment dryEquipment, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) {
    //------------------------------------------------------------------------------------------------
    //是否开启系统管理模块的多租户数据隔离【SAAS多租户模式】
    if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) {
      dryEquipment.setTenantId(oConvertUtils.getInt(TenantContext.getTenant(), 0));
    }
    //------------------------------------------------------------------------------------------------
    QueryWrapper<DryEquipment> queryWrapper = QueryGenerator.initQueryWrapper(dryEquipment, req.getParameterMap());
    Page<DryEquipment> page = new Page<DryEquipment>(pageNo, pageSize);
    Page<MoEquVo> voPage = new Page<MoEquVo>(pageNo, pageSize);
    IPage<DryEquipment> pageList = dryEquipmentService.page(page, queryWrapper);
    comp(pageList, voPage);
    return Result.OK(voPage);
  }
  private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) {
    //当前租户id
    int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0);
    List<MoEquVo> collect = pageList.getRecords().stream().map(item -> {
      MoEquVo vo = new MoEquVo();
      BeanUtils.copyProperties(item, vo);
      String clientid = "client-" + tenantId + "-" + item.getCode();
      JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
      //组装状态数据
      if (client != null) {
        vo.setOnline(true);
        //连接时间
        String st = client.getString("connectedAt");
        vo.setUpTime(st);
        vo.setClientId(clientid);
      }
      return vo;
    }).collect(Collectors.toList());
    //排序
    collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
    collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
    BeanUtils.copyProperties(pageList, page);
    page.setRecords(collect);
  }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,182 @@
package org.jeecg.modules.dry.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.dry.api.EmqxApi;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;
/**
 * mqtt
 */
@Slf4j
@Configuration
public class MqttConfig {
  @Value(value = "${jeecg.mqtt.broker}")
  private String broker;
  @Value(value = "${jeecg.mqtt.mqtt_name}")
  private String mqttName;
  @Value(value = "${jeecg.mqtt.mqtt_pass}")
  private String mqttPass;
  @Value(value = "${jeecg.mqtt.mqtt_client_id}")
  private String mqttClientId;
  @Value(value = "${jeecg.mqtt.role}")
  private String role;
  @Autowired
  private MqttSampleCallback mqttSampleCallback;
  @Autowired
  private MqttUtil mqttUtil;
  @Autowired
  private RedisUtil redisUtil;
  @Autowired
  private EmqxApi emqxApi;
  @Bean
  public void initMqtt() {
    conn();
    reconn();
  }
  /**
   * mqtt连接配置
   */
  private void conn() {
    MemoryPersistence persistence = new MemoryPersistence();
    MqttConnectOptions mqttConnOpt = new MqttConnectOptions();
    mqttConnOpt.setUserName(mqttName);
    mqttConnOpt.setPassword(mqttPass.toCharArray());
    mqttConnOpt.setKeepAliveInterval(10);//设置心跳间隔
    mqttConnOpt.setConnectionTimeout(10);//设置连接超时时间
    mqttConnOpt.setCleanSession(false);//设置是否清除会话
    mqttConnOpt.setAutomaticReconnect(false);//设置是否自动重连
    //遗嘱消息 TODO qos2需要在设备上线时做清除消息操作
    mqttConnOpt.setWill("downline", ("我是" + mqttName + "_" + mqttClientId + ",我下线了").getBytes(), 2, false);
    try {
      MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence);
      mqttClient.connect(mqttConnOpt);
      if (mqttClient.isConnected()) {
        System.err.println("连接成功");
        // åˆ›å»ºæ¶ˆæ¯å¹¶è®¾ç½® QoS
        String content = "我是" + mqttName + "_" + mqttClientId + ",我上线了";
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(0);
        // æµ‹è¯•发布消息
        mqttClient.publish("topline", message);
        mqttClient.subscribe("message", 0);
        mqttClient.setCallback(mqttSampleCallback);
        mqttUtil.setMqttClient(mqttClient);
        //不同的角色需要不同的订阅
        switch (role) {
          //管理员
          case "admin":
            //订阅系统级设备上下线通知
            mqttClient.subscribe(MqttConstant.MQTT_TOPIC_ONLINE);
            mqttClient.subscribe(MqttConstant.MQTT_TOPIC_OFFLINE);
            //订阅移动端上行指令
            mqttClient.subscribe(MqttConstant.MOBILE_UP);
            System.err.println("admin订阅" + MqttConstant.MOBILE_UP);
            initClients();
            break;
          //普通用户
          case "user":
            //普通客户端只需订阅自身相关消息
            mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
            System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#");
            break;
        }
      } else {
        System.err.println("连接失败");
      }
    } catch (MqttException e) {
      e.printStackTrace();
    }
  }
  //重连
  private void reconn() {
    Timer timer = new Timer();
    TimerTask task = new TimerTask() {
      @Override
      public void run() {
        // åœ¨è¿™é‡Œç¼–写定时执行的任务逻辑
        System.out.println("定时任务执行:" + new java.util.Date());
        if (mqttUtil.getMqttClient() == null || !mqttUtil.getMqttClient().isConnected()) {
          try {
            conn();
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }
    };
    // è®¾å®šå®šæ—¶ä»»åŠ¡ï¼Œå»¶è¿Ÿ0毫秒后开始执行,每隔1000毫秒(1秒)执行一次
    timer.schedule(task, 0, 10000);
  }
  /**
   * æœåŠ¡ç«¯ï¼ˆadmin角色)启动时查询所有设备并缓存到redis
   */
  private void initClients() {
    redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT);
    JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS);
    //TODO æ ¹æ®emqx返回编写实体类
    if (clients != null && clients.containsKey("data")) {
      JSONArray data = clients.getJSONArray("data");
      for (int i = 0; i < data.size(); i++) {
        JSONObject obj = data.getJSONObject(i);
        JSONObject item = new JSONObject();
        //username
        item.put("username", obj.get("username"));
        //连接时间
        String st = obj.getString("connected_at");
        String upTime = DateUtils.zone2Str(st);
        item.put("connectedAt", upTime);
        //clientid
        String clientid = obj.getString("clientid");
        item.put("clientid", clientid);
        //是否连接
        Boolean connected = obj.getBoolean("connected");
        item.put("connected", connected);
        //
        String[] info = clientid.split("-");
        item.put("type", info[0]);
        item.put("tenantId", info[1]);
        item.put("code", info[2]);
        if (connected) {
          redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
        }
      }
    }
  }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,199 @@
package org.jeecg.modules.dry.mqtt;
import cn.hutool.core.thread.ThreadUtil;
import com.alibaba.fastjson.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.MqttConstant;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.modules.dry.api.EmqxApi;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope("prototype")
public class MqttSampleCallback implements MqttCallback {
  @Value(value = "${jeecg.mqtt.role}")
  private String role;
  @Autowired
  private MqttUtil mqttUtil;
  @Autowired
  private EmqxApi emqxApi;
  @Autowired
  private BaseCommonService baseCommonService;
  @Autowired
  private RedisUtil redisUtil;
  @Override
  public void connectionLost(Throwable throwable) {
    System.err.println("连接断开::掉线");
  }
  @Override
  public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
    System.out.println("收到消息: \n  topic:" + topic + "\n  Qos:" + mqttMessage.getQos() + "\n  payload:"
      + new String(mqttMessage.getPayload()));
    switch (role) {
      // ç®¡ç†å‘˜
      case "admin":
        String message = new String(mqttMessage.getPayload());
        JSONObject messageJson = JSONObject.parseObject(message);
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
          JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid"));
          if (client == null) {
            JSONObject item = new JSONObject();
            //username
            item.put("username", messageJson.get("username"));
            //连接时间
            Long st = messageJson.getLong("connected_at");
            String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get());
            item.put("connectedAt", upTime);
            //clientid
            String clientid = messageJson.getString("clientid");
            item.put("clientid", clientid);
            //是否连接
            item.put("connected", true);
            //
            String[] info = clientid.split("-");
            item.put("type", info[0]);
            item.put("tenantId", info[1]);
            item.put("code", info[2]);
            redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item);
            System.err.println(String.format("设备: %s上线", clientid));
          }
        }
        if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
          String clientid = messageJson.getString("clientid");
          redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid);
          System.err.println(String.format("设备: %s下线", clientid));
        }
        parseAdminCommand(topic, mqttMessage);
        break;
      // æ™®é€šç”¨æˆ·
      case "user":
        System.err.println("user");
        parseUserCommand(topic, mqttMessage);
        break;
    }
  }
  @Override
  public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    System.err.println("消息传递成功");
  }
  // è§£æžadmin角色指令
  private void parseAdminCommand(String topic, MqttMessage mqttMessage) {
    String message = new String(mqttMessage.getPayload());
    JSONObject messageJson = JSONObject.parseObject(message);
    //请求的客户端(服务端只推送数据到请求的客户端)
    StringBuilder req = new StringBuilder();
    if (messageJson.containsKey("req")) {
      req.append(messageJson.get("req"));
    }
    //前端传参时间戳转换
    if (messageJson.containsKey("timestamp")) {
      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
    }
    baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1);
    switch (topic) {
      // æŸ¥è¯¢è®¾å¤‡åœ¨çº¿
      case MqttConstant.MOBILE_QUERY_EQU_STATU:
        System.err.println("admin收到" + topic);
        // æ ¹æ®è®¾å¤‡id查询设备mqtt在线状态
        String clientId = messageJson.getString("clientId");
        JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId);
        ThreadUtil.execute(() -> {
          if (client == null || client.isEmpty()) {
            JSONObject res = new JSONObject();
            res.put("success", false);
            res.put("msg", "查询失败");
            try {
              MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
              sendMessage.setQos(0);
              mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
            } catch (Exception e) {
              e.printStackTrace();
            }
            return;
          }
          client.put("success", true);
          client.put("msg", "查询成功");
          try {
            MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
            sendMessage.setQos(0);
            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
            baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        break;
    }
  }
  // è§£æžuser角色指令
  private void parseUserCommand(String topic, MqttMessage mqttMessage) {
    String message = new String(mqttMessage.getPayload());
    JSONObject messageJson = JSONObject.parseObject(message);
    //请求的客户端(服务端只推送数据到请求的客户端)
    StringBuilder req = new StringBuilder();
    if (messageJson.containsKey("req")) {
      req.append(messageJson.get("req"));
    }
    //前端传参时间戳转换
    if (messageJson.containsKey("timestamp")) {
      messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString()));
    }
    switch (topic) {
      case MqttConstant.MOBILE_REQ_EQU_CMD:
        System.err.println("user收到" + topic);
        System.err.println(message);
        ThreadUtil.execute(() -> {
          //TODO å‘PLC发送开关机操作,并返回信息
          JSONObject res = new JSONObject();
          res.put("success", true);
          res.put("msg", "操作成功");
          try {
            MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes());
            sendMessage.setQos(0);
            mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage);
            baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        break;
    }
  }
}
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttUtil.java
¶Ô±ÈÐÂÎļþ
@@ -0,0 +1,12 @@
package org.jeecg.modules.dry.mqtt;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.stereotype.Component;
@Component
@Data
public class MqttUtil {
  public   MqttClient mqttClient;
}