jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -31,10 +31,9 @@ String MOBILE_QUERY_EQU_STATU = MOBILE_UP_PREFIX + "/query/equ/statu"; //移动端远程请求指令 String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd"; /**************************移动端向服务端请求指令end*******************************/ @@ -47,7 +46,26 @@ String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu"; //返回移动端远程请求指令 String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd"; /**************************服务端向移动端响应指令end*******************************/ /**************************服务端向移动端发送广播start*******************************/ //广播类型推送无关移动端设备id,向所有在线移动端发送 String SERVICE_BROADCAST_PREFIX = "service/broadcast"; //服务端向各租户客户端发送实时故障广播 String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s" ; /**************************服务端向移动端发送广播end*******************************/ /**************************租户端向服务端发送数据start*******************************/ @@ -80,9 +98,9 @@ //service(cloud) //在线客户端 String MQTT_ONLINE_CLIENT = "mqtt:online:client::"; String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s"; //所有租户的实时报警(%s:租户id) String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:"; String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s"; jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -18,7 +18,9 @@ import org.jeecg.config.mybatis.MybatisPlusSaasConfig; import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryFaultRecordService; import org.jeecg.modules.dry.vo.MoEquVo; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -40,6 +42,8 @@ public class MobileController { @Autowired private IDryEquipmentService dryEquipmentService; @Autowired private IDryFaultRecordService faultRecordService; @Autowired private RedisUtil redisUtil; @@ -64,18 +68,27 @@ return Result.OK(voPage); } @ApiOperation(value = "设备报警数据", notes = "设备报警数据列表查询") @GetMapping(value = "/fault/list") public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){ int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap()); Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize); IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper); return Result.OK(pageList); } private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) { //当前租户id int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); List<MoEquVo> collect = pageList.getRecords().stream().map(item -> { MoEquVo vo = new MoEquVo(); BeanUtils.copyProperties(item, vo); String clientid = "client-" + tenantId + "-" + item.getCode(); JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientid); //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId); JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid); //组装状态数据 if (client != null) { vo.setOnline(true); jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
@@ -10,6 +10,7 @@ public class MqMessage<T> { private T data; private String tentId; private String topic; public MqMessage() { @@ -19,4 +20,18 @@ this.data = data; this.tentId = tentId; } public MqMessage(T data, String tentId,String topic) { this.data = data; this.tentId = tentId; this.topic = topic; } @Override public String toString() { return "MqMessage{" + "data=" + data + ", tentId='" + tentId + '\'' + ", topic='" + topic + '\'' + '}'; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -15,6 +15,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.core.RedisTemplate; import java.util.*; @@ -44,6 +45,8 @@ private RedisUtil redisUtil; @Autowired private EmqxApi emqxApi; @Autowired private RedisTemplate redisTemplate; @Bean @@ -98,6 +101,8 @@ mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); // 订阅租户报警数据 mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); System.out.println("admin订阅" + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); @@ -153,7 +158,14 @@ * 服务端(admin角色)启动时查询所有设备并缓存到redis */ private void initClients() { redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT); //初始化时先删除所有在线设备 Set keys = redisTemplate.keys( String.format(MqttConstant.MQTT_ONLINE_CLIENT,"*")); if (keys != null && !keys.isEmpty()) { keys.forEach(key -> System.out.println("初始化删除在线设备: " + key)); redisTemplate.delete(keys); } else { System.out.println("初始化无在线设备: " + MqttConstant.MQTT_ONLINE_CLIENT); } JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); //TODO 根据emqx返回编写实体类 @@ -180,13 +192,15 @@ item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); if (connected) { redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId").toString()) , clientid, item); } }catch (Exception e){ e.printStackTrace(); } if (connected) { redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -22,13 +23,17 @@ import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.DryFaultRecordVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @Component @@ -62,8 +67,6 @@ private IDryFaultRecordService faultRecordService; @Override public void connectionLost(Throwable throwable) { System.err.println("连接断开::掉线"); @@ -82,7 +85,7 @@ JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username @@ -105,7 +108,7 @@ }catch (Exception e){ e.printStackTrace(); } redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item); System.err.println(String.format("设备: %s上线", clientid)); } @@ -113,7 +116,7 @@ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid); System.err.println(String.format("设备: %s下线", clientid)); } catch (Exception e) { e.printStackTrace(); @@ -160,7 +163,7 @@ } // 实时数据上传太频繁且数据内容超过字段大小不记录日志 if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { @@ -169,7 +172,7 @@ System.err.println("admin收到" + topic); // 根据设备id查询设备mqtt在线状态 String clientId = messageJson.getString("clientId"); JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId); ThreadUtil.execute(() -> { @@ -212,15 +215,44 @@ }); break; //各租户上传的实时报警数据 case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { }); //故障数据 Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); //租户id String tentId = realFaultMessage.getTentId(); //收到租户实时报警数据存入redis //转换为 Map<String, Object> Map<String, Object> objectMap = dryFaultMap.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, entry -> (Object) entry.getValue() )); redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap); //广播发送给各租户下移动设备 if(dryFaultMap.isEmpty()){ return; } String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); //数据转换 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); //发送广播 System.err.println("广播给:" + recTopic); sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); break; // 接收设备报警数据 case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { }); // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(listMqMessage.toString()); faultRecordService.saveBatch(listMqMessage.getData()); System.err.println(faultMessage.toString()); faultRecordService.saveBatch(faultMessage.getData()); } catch (Exception e) { e.printStackTrace(); @@ -335,11 +367,30 @@ }); break; } } /** * 发送消息 * @param topic 订阅 * @param mqMessage 消息体 */ private void sendMqttMessage(String topic, MqMessage mqMessage){ ThreadUtil.execute(() -> { try { MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); }catch (Exception e){ e.printStackTrace(); } }); } }