| | |
| | | import cn.hutool.core.thread.ThreadUtil; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import lombok.Data; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | |
| | | /** |
| | | * mqtt |
| | | */ |
| | | @Data |
| | | @Slf4j |
| | | @Configuration |
| | | public class MqttConfig { |
| | |
| | | //订阅移动端上行指令 |
| | | mqttClient.subscribe(MqttConstant.MOBILE_UP); |
| | | System.err.println("admin订阅" + MqttConstant.MOBILE_UP); |
| | | // 订阅租户实时数据 |
| | | mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); |
| | | System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); |
| | | // 订阅租户报警数据 |
| | | mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); |
| | | System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); |
| | | mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); |
| | | System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_EQU); |
| | | initClients(); |
| | | |
| | | break; |
| | |
| | | case "user": |
| | | //普通客户端只需订阅自身相关消息 |
| | | mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); |
| | | mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX); |
| | | System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); |
| | | break; |
| | | |
| | |
| | | * 服务端(admin角色)启动时查询所有设备并缓存到redis |
| | | */ |
| | | private void initClients() { |
| | | redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT); |
| | | redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT); |
| | | |
| | | JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); |
| | | //TODO 根据emqx返回编写实体类 |
| | |
| | | //是否连接 |
| | | Boolean connected = obj.getBoolean("connected"); |
| | | item.put("connected", connected); |
| | | // |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | item.put("code", info[2]); |
| | | //根据clientid解析(注意配置文件中clientid格式 例:client-1000) |
| | | try { |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | //item.put("code", info[2]); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | if (connected) { |
| | | redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); |