From 5dd889b470543bed7564054cdfcd750b1d9316cb Mon Sep 17 00:00:00 2001 From: zhuguifei <zhuguifei@zhuguifeideiMac.local> Date: 星期三, 11 十二月 2024 10:19:52 +0800 Subject: [PATCH] 添加移动端实时故障和历史故障接口 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 85 ++++++++++--- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java | 216 ++++++++++++++++++++++++++--------- jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java | 9 + jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 7 + 4 files changed, 235 insertions(+), 82 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java index f7867b5..f4c85dd 100755 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java @@ -32,6 +32,9 @@ //绉诲姩绔繙绋嬭姹傛寚浠� String MOBILE_REQ_EQU_CMD = MOBILE_UP_PREFIX + "/req/equ/cmd"; + //绉诲姩绔姹傛煡璇竴娆¤澶囧疄鏃舵晠闅滃憡璀� + String MOBILE_REQ_EQU_REAL_FAULT = MOBILE_UP_PREFIX + "/req/real/fault"; + /**************************绉诲姩绔悜鏈嶅姟绔姹傛寚浠nd*******************************/ @@ -57,7 +60,9 @@ String SERVICE_BROADCAST_PREFIX = "service/broadcast"; //鏈嶅姟绔悜鍚勭鎴峰鎴风鍙戦�佸疄鏃舵晠闅滃箍鎾� - String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s" ; + String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s"; + //鏈嶅姟绔悜绉诲姩绔洖澶嶄竴娆¤澶囧疄鏃舵晠闅滃憡璀� + String SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s"; diff --git a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java index 8e54c18..b74fdf1 100755 --- a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java +++ b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java @@ -13,16 +13,21 @@ @Data public class DryFaultRecordVo extends DryFaultRecord implements Serializable { private static final long serialVersionUID = 1L; - //redis鏁呴殰缁撴潫鎶�鏈� + //redis鏁呴殰缁撴潫璁版暟 private Integer eCount; //璁惧鍚嶇О private String equName; //绉熸埛鍚嶇О private String tenantName; + //鏁呴殰鏃堕棿 + private String faultTimeStr; public DryFaultRecordVo() { } - + public DryFaultRecordVo(DryFaultRecord record, Integer count) { + super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime()); + this.eCount = count; + } public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) { super(orderId, tenantId, faultName, faultType, startTime, endTime); this.eCount = eCount; diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java index feea7b1..4fcdc30 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java @@ -8,6 +8,7 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; +import org.checkerframework.checker.units.qual.A; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.MqttConstant; @@ -19,8 +20,11 @@ import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryFaultRecord; +import org.jeecg.modules.dry.entity.DryOrder; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryFaultRecordService; +import org.jeecg.modules.dry.service.IDryOrderService; +import org.jeecg.modules.dry.vo.DryFaultRecordVo; import org.jeecg.modules.dry.vo.MoEquVo; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -30,6 +34,8 @@ import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletRequest; +import java.time.Duration; +import java.time.LocalDateTime; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.*; @@ -40,71 +46,169 @@ @RequestMapping("/mobile") @Slf4j public class MobileController { - @Autowired - private IDryEquipmentService dryEquipmentService; - @Autowired - private IDryFaultRecordService faultRecordService; - @Autowired - private RedisUtil redisUtil; + @Autowired + private IDryEquipmentService dryEquipmentService; + @Autowired + private IDryFaultRecordService faultRecordService; + @Autowired + private RedisUtil redisUtil; + @Autowired + private IDryOrderService orderService; - @ApiOperation(value = "璁惧鍒楄〃鏌ヨ", notes = "璁惧鍒楄〃鏌ヨ") - @GetMapping(value = "/equ/list") - public Result<IPage<MoEquVo>> queryPageList(DryEquipment dryEquipment, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) { - //------------------------------------------------------------------------------------------------ - //鏄惁寮�鍚郴缁熺鐞嗘ā鍧楃殑澶氱鎴锋暟鎹殧绂汇�怱AAS澶氱鎴锋ā寮忋�� - if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) { - dryEquipment.setTenantId(oConvertUtils.getInt(TenantContext.getTenant(), 0)); + @ApiOperation(value = "璁惧鍒楄〃鏌ヨ", notes = "璁惧鍒楄〃鏌ヨ") + @GetMapping(value = "/equ/list") + public Result<IPage<MoEquVo>> queryPageList(DryEquipment dryEquipment, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) { + //------------------------------------------------------------------------------------------------ + //鏄惁寮�鍚郴缁熺鐞嗘ā鍧楃殑澶氱鎴锋暟鎹殧绂汇�怱AAS澶氱鎴锋ā寮忋�� + if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) { + dryEquipment.setTenantId(oConvertUtils.getInt(TenantContext.getTenant(), 0)); + } + //------------------------------------------------------------------------------------------------ + QueryWrapper<DryEquipment> queryWrapper = QueryGenerator.initQueryWrapper(dryEquipment, req.getParameterMap()); + Page<DryEquipment> page = new Page<DryEquipment>(pageNo, pageSize); + Page<MoEquVo> voPage = new Page<MoEquVo>(pageNo, pageSize); + IPage<DryEquipment> pageList = dryEquipmentService.page(page, queryWrapper); + + + compEqu(pageList, voPage); + + + return Result.OK(voPage); } - //------------------------------------------------------------------------------------------------ - QueryWrapper<DryEquipment> queryWrapper = QueryGenerator.initQueryWrapper(dryEquipment, req.getParameterMap()); - Page<DryEquipment> page = new Page<DryEquipment>(pageNo, pageSize); - Page<MoEquVo> voPage = new Page<MoEquVo>(pageNo, pageSize); - IPage<DryEquipment> pageList = dryEquipmentService.page(page, queryWrapper); + + @ApiOperation(value = "璁惧鎶ヨ鏁版嵁", notes = "璁惧鎶ヨ鏁版嵁鍒楄〃鏌ヨ") + @GetMapping(value = "/fault/list") + public Result<IPage<DryFaultRecordVo>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) { + int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); + //鏄惁寮�鍚郴缁熺鐞嗘ā鍧楃殑澶氱鎴锋暟鎹殧绂汇�怱AAS澶氱鎴锋ā寮忋�� + if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) { + faultRecord.setTenantId(tenantId); + } + QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap()); + Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize); + IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper); + Page<DryFaultRecordVo> voPage = new Page<DryFaultRecordVo>(pageNo, pageSize); + compFault(pageList, voPage); + return Result.OK(voPage); + } + + private void compFault(IPage<DryFaultRecord> pageList, Page<DryFaultRecordVo> page) { + List<DryFaultRecordVo> collect = pageList.getRecords().stream().filter(item -> item.getOrderId() != null).map(item -> { + DryFaultRecordVo vo = new DryFaultRecordVo(); + BeanUtils.copyProperties(item, vo); + String orderCode = item.getOrderId(); + QueryWrapper<DryOrder> orderQueryWrapper = new QueryWrapper<>(); + orderQueryWrapper.lambda().eq(DryOrder::getCode, orderCode); + DryOrder order = orderService.getOne(orderQueryWrapper); + if (vo.getStartTime() != null && vo.getEndTime() != null) { + String faultTimeStr = calculateTimeDifference(DateUtils.date2Str(vo.getStartTime(), DateUtils.datetimeFormat.get()), DateUtils.date2Str(vo.getEndTime(), DateUtils.datetimeFormat.get())); + vo.setFaultTimeStr(faultTimeStr); + } + + if (order == null) return vo; + String equId = order.getEquId(); + if (equId == null) return vo; + DryEquipment equipment = dryEquipmentService.getById(equId); + if (equipment == null || equipment.getName() == null) return vo; + vo.setEquName(equipment.getName().substring(0, 2)); + return vo; + }).collect(Collectors.toList()); + BeanUtils.copyProperties(pageList, page); + page.setRecords(collect); + } - comp(pageList, voPage); + @ApiOperation(value = "鎶ヨ鍥炬爣鏁版嵁", notes = "鎶ヨ鍥炬爣鏁版嵁鏌ヨ") + @GetMapping(value = "/fault/chart") + public List<DryFaultRecordVo> queryFaultChartData(DryFaultRecord faultRecord, HttpServletRequest req) { + int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); + //鏄惁寮�鍚郴缁熺鐞嗘ā鍧楃殑澶氱鎴锋暟鎹殧绂汇�怱AAS澶氱鎴锋ā寮忋�� + if (MybatisPlusSaasConfig.OPEN_SYSTEM_TENANT_CONTROL) { + faultRecord.setTenantId(tenantId); + } + QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap()); + List<DryFaultRecord> faultList = faultRecordService.list(queryWrapper); + List<DryFaultRecordVo> result = new ArrayList<>(faultList.stream() + .collect(Collectors.groupingBy(DryFaultRecord::getFaultName, + Collectors.collectingAndThen(Collectors.toList(), list -> + new DryFaultRecordVo(list.get(0), list.size())))) + .values()) + .stream() + .sorted(Comparator.comparingInt(DryFaultRecordVo::getECount).reversed()) + .collect(Collectors.toList()); + return result; + } - return Result.OK(voPage); - } - - @ApiOperation(value = "璁惧鎶ヨ鏁版嵁", notes = "璁惧鎶ヨ鏁版嵁鍒楄〃鏌ヨ") - @GetMapping(value = "/fault/list") - public Result<IPage<DryFaultRecord>> queryFaultList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req){ - int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); - QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(faultRecord, req.getParameterMap()); - Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize); - IPage<DryFaultRecord> pageList = faultRecordService.page(page, queryWrapper); - return Result.OK(pageList); - } + //姝ゆ帴鍙d粎鐢ㄤ綔uniapp涓嬫媺杩斿洖锛屾棤瀹為檯鎰忎箟 + @GetMapping(value = "/sample/list") + public Result<IPage<Object>> querySampleList(DryFaultRecord faultRecord, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, HttpServletRequest req) { + Page<Object> page = new Page<Object>(pageNo, pageSize); + page.setRecords(Arrays.asList("")); + page.setSize(1); + page.setCurrent(1); + return Result.OK(page); + } + private void compEqu(IPage<DryEquipment> pageList, Page<MoEquVo> page) { + //褰撳墠绉熸埛id + int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); + List<MoEquVo> collect = pageList.getRecords().stream().map(item -> { + MoEquVo vo = new MoEquVo(); + BeanUtils.copyProperties(item, vo); + String clientid = "client-" + tenantId + "-" + item.getCode(); + //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId); + JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid); + //缁勮鐘舵�佹暟鎹� + if (client != null) { + vo.setOnline(true); + //杩炴帴鏃堕棿 + String st = client.getString("connectedAt"); + vo.setUpTime(st); + vo.setClientId(clientid); + } + return vo; + }).collect(Collectors.toList()); + //鎺掑簭 + collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder()))); + collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder()))); + BeanUtils.copyProperties(pageList, page); + page.setRecords(collect); + } - private void comp(IPage<DryEquipment> pageList, Page<MoEquVo> page) { - //褰撳墠绉熸埛id - int tenantId = oConvertUtils.getInt(TenantContext.getTenant(), 0); - List<MoEquVo> collect = pageList.getRecords().stream().map(item -> { - MoEquVo vo = new MoEquVo(); - BeanUtils.copyProperties(item, vo); - String clientid = "client-" + tenantId + "-" + item.getCode(); - //JSONObject client = (JSONObject) redisUtil.hget(MqttConstant.MQTT_ONLINE_CLIENT ,tenantId); - JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,tenantId),clientid); - //缁勮鐘舵�佹暟鎹� - if (client != null) { - vo.setOnline(true); - //杩炴帴鏃堕棿 - String st = client.getString("connectedAt"); - vo.setUpTime(st); - vo.setClientId(clientid); - } - return vo; - }).collect(Collectors.toList()); - //鎺掑簭 - collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder()))); - collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder()))); - BeanUtils.copyProperties(pageList, page); - page.setRecords(collect); - } + /** + * @param startTimeStr + * @param endTimeStr + * @return + */ + private String calculateTimeDifference(String startTimeStr, String endTimeStr) { + // 瀹氫箟鏃堕棿鏍煎紡 + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + // 灏嗗瓧绗︿覆瑙f瀽涓� LocalDateTime 瀵硅薄 + LocalDateTime startTime = LocalDateTime.parse(startTimeStr, formatter); + LocalDateTime endTime = LocalDateTime.parse(endTimeStr, formatter); + // 璁$畻涓や釜鏃堕棿鐐逛箣闂寸殑鎸佺画鏃堕棿 + Duration duration = Duration.between(startTime, endTime); + // 鑾峰彇灏忔椂銆佸垎閽熷拰绉掓暟 + long hours = duration.toHours(); + long minutes = duration.toMinutes() % 60; + long seconds = duration.getSeconds() % 60; + + StringBuilder result = new StringBuilder(); + if (hours > 0) { + result.append(hours).append("鏃�"); + } + if (minutes > 0) { + result.append(minutes).append("鍒�"); + } + if (seconds > 0) { + result.append(seconds).append("绉�"); + } + + return result.toString(); + + } } diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java index 068d415..e835376 100755 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java @@ -85,7 +85,7 @@ JSONObject messageJson = JSONObject.parseObject(message); if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { - JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); + JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); if (client == null) { JSONObject item = new JSONObject(); //username @@ -108,7 +108,7 @@ } catch (Exception e) { e.printStackTrace(); } - redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item); + redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item); System.err.println(String.format("璁惧: %s涓婄嚎", clientid)); } @@ -116,7 +116,7 @@ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { try { String clientid = messageJson.getString("clientid"); - redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid); + redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid); System.err.println(String.format("璁惧: %s涓嬬嚎", clientid)); } catch (Exception e) { e.printStackTrace(); @@ -163,7 +163,7 @@ } // 瀹炴椂鏁版嵁涓婁紶澶绻佷笖鏁版嵁鍐呭瓒呰繃瀛楁澶у皬涓嶈褰曟棩蹇� if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { - // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); + // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } switch (topic) { @@ -172,7 +172,7 @@ System.err.println("admin鏀跺埌" + topic); // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵�� String clientId = messageJson.getString("clientId"); - JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId); + JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId); ThreadUtil.execute(() -> { @@ -216,13 +216,14 @@ break; //鍚勭鎴蜂笂浼犵殑瀹炴椂鎶ヨ鏁版嵁 - case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: + case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: { + MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { }); //鏁呴殰鏁版嵁 - Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); + Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); //绉熸埛id - String tentId = realFaultMessage.getTentId(); + String tenantId = realFaultMessage.getTentId(); //鏀跺埌绉熸埛瀹炴椂鎶ヨ鏁版嵁瀛樺叆redis //杞崲涓� Map<String, Object> Map<String, Object> objectMap = dryFaultMap.entrySet().stream() @@ -230,21 +231,51 @@ Map.Entry::getKey, entry -> (Object) entry.getValue() )); - redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap); + redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, realFaultMessage.getTentId()), objectMap); //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧 - if(dryFaultMap.isEmpty()){ + if (dryFaultMap.isEmpty()) { return; } - String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); + String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); //鏁版嵁杞崲 List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); - MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); //鍙戦�佸箍鎾� System.err.println("骞挎挱缁欙細" + recTopic); - sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); + sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); - break; - // 鎺ユ敹璁惧鎶ヨ鏁版嵁 + + } + break; + //绉诲姩绔富鍔ㄨ姹傝澶囧疄鏃舵晠闅滄暟鎹紙鐢ㄤ簬椤甸潰鍒氭墦寮�鏃舵媺鍙栦竴娆℃暟鎹級 + case MqttConstant.MOBILE_REQ_EQU_REAL_FAULT: { + String tenantId = (String) messageJson.get("tenantId"); + if (req.toString().isEmpty() || tenantId == null) { + return; + } + Map<Object, Object> objFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT, tenantId)); + //杞崲涓� Map<String, DryFaultRecordVo> + Map<String, DryFaultRecordVo> dryFaultMap = objFaultMap.entrySet().stream() + .collect(Collectors.toMap( + entry -> entry.getKey().toString(), + entry -> (DryFaultRecordVo) entry.getValue() + )); + + + if (dryFaultMap.isEmpty()) { + return; + } + String resTopic = String.format(MqttConstant.SERVICE_ONECE_TENANT_REAL_FAULT, req); + //鏁版嵁杞崲 + List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); + MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, resTopic); + //鍙戦�佽姹傝澶� + System.err.println("鍙戦�佺粰锛�" + resTopic); + sendMqttMessage(resTopic, mqMessage, 2); + + } + break; + // 鎺ユ敹璁惧鎶ヨ鍘嗗彶鏁版嵁 case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { @@ -372,21 +403,29 @@ } - } /** * 鍙戦�佹秷鎭� - * @param topic 璁㈤槄 - * @param mqMessage 娑堟伅浣� + * + * @param topic 璁㈤槄 + * @param mqMessage 娑堟伅浣� + * @param type 1-鍙戦�佺粰绉熸埛 2-鍙戦�佺粰鍥哄畾id */ - private void sendMqttMessage(String topic, MqMessage mqMessage){ + private void sendMqttMessage(String topic, MqMessage mqMessage, Integer type) { ThreadUtil.execute(() -> { try { - MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); - sendMessage.setQos(0); - mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); - }catch (Exception e){ + if (type == 1) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); + } else if (type == 2) { + MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); + sendMessage.setQos(0); + mqttUtil.getMqttClient().publish(topic, sendMessage); + } + + } catch (Exception e) { e.printStackTrace(); } }); -- Gitblit v1.9.3