| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.poi.ss.formula.functions.T; |
| | | import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; |
| | | import org.eclipse.paho.client.mqttv3.MqttCallback; |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | |
| | | import org.jeecg.modules.dry.entity.DryShop; |
| | | import org.jeecg.modules.dry.service.*; |
| | | import org.jeecg.modules.dry.vo.DryEquipmentVo; |
| | | import org.jeecg.modules.dry.vo.DryFaultRecordVo; |
| | | import org.jeecg.modules.dry.vo.RealTimeDataVo; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.context.annotation.Scope; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Slf4j |
| | | @Component |
| | | @Scope("prototype") |
| | | public class MqttSampleCallback implements MqttCallback { |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | @Autowired |
| | | private MqttUtil mqttUtil; |
| | | @Autowired |
| | | private EmqxApi emqxApi; |
| | | @Autowired |
| | | private BaseCommonService baseCommonService; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | @Autowired |
| | | private MqttUtil mqttUtil; |
| | | @Autowired |
| | | private EmqxApi emqxApi; |
| | | @Autowired |
| | | private BaseCommonService baseCommonService; |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | |
| | | @Autowired |
| | | private IDryRealTimeDataService realTimeDataService; |
| | | @Autowired |
| | | private IDryRealTimeDataService realTimeDataService; |
| | | |
| | | |
| | | @Autowired |
| | | private IDryEquipmentService equipmentService; |
| | | @Autowired |
| | | private IDryEquipmentService equipmentService; |
| | | |
| | | @Autowired |
| | | private IDryEqpTypeService eqpTypeService; |
| | | @Autowired |
| | | private IDryEqpTypeService eqpTypeService; |
| | | |
| | | @Autowired |
| | | private IDryShopService dryShopService; |
| | | @Autowired |
| | | private IDryShopService dryShopService; |
| | | |
| | | @Autowired |
| | | private IDryFaultRecordService faultRecordService; |
| | | @Autowired |
| | | private IDryFaultRecordService faultRecordService; |
| | | |
| | | |
| | | @Override |
| | | public void connectionLost(Throwable throwable) { |
| | | System.err.println("连接断开::掉线"); |
| | | System.err.println("连接断开::" + throwable.toString()); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage mqttMessage) { |
| | | System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" |
| | | + new String(mqttMessage.getPayload())); |
| | | |
| | | switch (role) { |
| | | // 管理员 |
| | | case "admin": |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { |
| | | JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT,messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid")); |
| | | if (client == null) { |
| | | JSONObject item = new JSONObject(); |
| | | //username |
| | | item.put("username", messageJson.get("username")); |
| | | //连接时间 |
| | | Long st = messageJson.getLong("connected_at"); |
| | | String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); |
| | | item.put("connectedAt", upTime); |
| | | //clientid |
| | | String clientid = messageJson.getString("clientid"); |
| | | item.put("clientid", clientid); |
| | | //是否连接 |
| | | item.put("connected", true); |
| | | //根据clientid解析(注意配置文件中clientid格式 例:client-1000) |
| | | try { |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | //item.put("code", info[2]); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT,item.get("tenantId") ), clientid, item); |
| | | System.err.println(String.format("设备: %s上线", clientid)); |
| | | } |
| | | |
| | | } |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { |
| | | try { |
| | | String clientid = messageJson.getString("clientid"); |
| | | redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientid.split("-")[1]), clientid); |
| | | System.err.println(String.format("设备: %s下线", clientid)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | } |
| | | parseAdminCommand(topic, mqttMessage); |
| | | |
| | | break; |
| | | // 普通用户 |
| | | case "user": |
| | | System.err.println("user"); |
| | | try { |
| | | parseUserCommand(topic, mqttMessage); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | break; |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public void connectionLost(Throwable throwable) { |
| | | System.err.println("连接断开::掉线"); |
| | | System.err.println("连接断开::"+throwable.toString()); |
| | | } |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
| | | System.err.println("消息传递成功"); |
| | | } |
| | | |
| | | @Override |
| | | public void messageArrived(String topic, MqttMessage mqttMessage) { |
| | | System.out.println("收到消息: \n topic:" + topic + "\n Qos:" + mqttMessage.getQos() + "\n payload:" |
| | | + new String(mqttMessage.getPayload())); |
| | | |
| | | switch (role) { |
| | | // 管理员 |
| | | case "admin": |
| | | // 解析admin角色指令 |
| | | private void parseAdminCommand(String topic, MqttMessage mqttMessage) { |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) { |
| | | JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + messageJson.get("clientid")); |
| | | if (client == null) { |
| | | JSONObject item = new JSONObject(); |
| | | //username |
| | | item.put("username", messageJson.get("username")); |
| | | //连接时间 |
| | | Long st = messageJson.getLong("connected_at"); |
| | | String upTime = DateUtils.timestamptoStr(st, DateUtils.datetimeFormat.get()); |
| | | item.put("connectedAt", upTime); |
| | | //clientid |
| | | String clientid = messageJson.getString("clientid"); |
| | | item.put("clientid", clientid); |
| | | //是否连接 |
| | | item.put("connected", true); |
| | | //根据clientid解析(注意配置文件中clientid格式 例:client-1000) |
| | | //请求的客户端(服务端只推送数据到请求的客户端) |
| | | StringBuilder req = new StringBuilder(); |
| | | if (messageJson.containsKey("req")) { |
| | | req.append(messageJson.get("req")); |
| | | } |
| | | //前端传参时间戳转换 |
| | | if (messageJson.containsKey("timestamp")) { |
| | | messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); |
| | | } |
| | | // 实时数据上传太频繁且数据内容超过字段大小不记录日志 |
| | | if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)) { |
| | | // baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); |
| | | } |
| | | |
| | | switch (topic) { |
| | | // 查询设备在线 |
| | | case MqttConstant.MOBILE_QUERY_EQU_STATU: |
| | | System.err.println("admin收到" + topic); |
| | | // 根据设备id查询设备mqtt在线状态 |
| | | String clientId = messageJson.getString("clientId"); |
| | | JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT,clientId.split("-")[1]) , clientId); |
| | | |
| | | ThreadUtil.execute(() -> { |
| | | |
| | | if (client == null || client.isEmpty()) { |
| | | JSONObject res = new JSONObject(); |
| | | res.put("success", false); |
| | | res.put("msg", "查询失败"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | client.put("success", true); |
| | | client.put("msg", "查询成功"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | break; |
| | | |
| | | // 接收设备实时数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | break; |
| | | //各租户上传的实时报警数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA: |
| | | MqMessage<Map<String, DryFaultRecordVo>> realFaultMessage = JSON.parseObject(message, new TypeReference<MqMessage<Map<String, DryFaultRecordVo>>>() { |
| | | }); |
| | | //故障数据 |
| | | Map<String, DryFaultRecordVo> dryFaultMap = realFaultMessage.getData(); |
| | | //租户id |
| | | String tentId = realFaultMessage.getTentId(); |
| | | //收到租户实时报警数据存入redis |
| | | //转换为 Map<String, Object> |
| | | Map<String, Object> objectMap = dryFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | Map.Entry::getKey, |
| | | entry -> (Object) entry.getValue() |
| | | )); |
| | | redisUtil.hmset(String.format(MqttConstant.MQTT_CLOUD_REAL_FAULT,realFaultMessage.getTentId()), objectMap); |
| | | //广播发送给各租户下移动设备 |
| | | if(dryFaultMap.isEmpty()){ |
| | | return; |
| | | } |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tentId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage< List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList,tentId,recTopic); |
| | | //发送广播 |
| | | System.err.println("广播给:" + recTopic); |
| | | sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT,mqMessage); |
| | | |
| | | break; |
| | | // 接收设备报警数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | MqMessage<List<DryFaultRecord>> faultMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { |
| | | }); |
| | | // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); |
| | | System.err.println(faultMessage.toString()); |
| | | faultRecordService.saveBatch(faultMessage.getData()); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | break; |
| | | |
| | | |
| | | case MqttConstant.TENANT_UP_PREFIX_EQU: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | Object equObj = messageJson.get("equipment"); |
| | | DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); |
| | | TenantContext.setTenant(equipment.getTenantId() + ""); |
| | | DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); |
| | | if (dryEquipment == null) { |
| | | equipmentService.save(equipment); |
| | | } |
| | | Object typeObj = messageJson.get("eqpType"); |
| | | DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); |
| | | DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); |
| | | if (dryEqpType == null) { |
| | | eqpTypeService.save(eqpType); |
| | | } |
| | | // 获取设备所属车间 |
| | | Object shopObj = messageJson.get("shop"); |
| | | DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); |
| | | DryShop dryShop = dryShopService.getById(shop.getId()); |
| | | if (dryShop == null) { |
| | | dryShopService.save(shop); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | // 解析user角色指令 |
| | | private void parseUserCommand(String topic, MqttMessage mqttMessage) { |
| | | |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | //请求的客户端(服务端只推送数据到请求的客户端) |
| | | StringBuilder req = new StringBuilder(); |
| | | if (messageJson.containsKey("req")) { |
| | | req.append(messageJson.get("req")); |
| | | } |
| | | //前端传参时间戳转换 |
| | | if (messageJson.containsKey("timestamp")) { |
| | | messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); |
| | | } |
| | | |
| | | switch (topic) { |
| | | case MqttConstant.MOBILE_REQ_EQU_CMD: |
| | | System.err.println("user收到" + topic); |
| | | System.err.println(message); |
| | | ThreadUtil.execute(() -> { |
| | | //TODO 向PLC发送开关机操作,并返回信息 |
| | | JSONObject res = new JSONObject(); |
| | | res.put("success", true); |
| | | res.put("msg", "操作成功"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); |
| | | baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | }); |
| | | |
| | | |
| | | break; |
| | | case MqttConstant.SERVICE_REQ_PREFIX: |
| | | log.debug("收到设备详细信息查询请求"); |
| | | ThreadUtil.execute(() -> { |
| | | String tenantId = messageJson.getString("tenantId"); |
| | | String clientId = mqttUtil.getMqttClient().getClientId(); |
| | | String tenant = clientId.substring(clientId.lastIndexOf("_") + 1); |
| | | if (tenantId != null && tenantId.equals(tenant)) { |
| | | TenantContext.setTenant(tenantId); |
| | | // 根据设备编码查询设备信息 |
| | | String code = messageJson.getString("code"); |
| | | DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId, code); |
| | | // 根据设备车间id查询车间信息 |
| | | DryShop shop = dryShopService.getById(equipmentVo.getShopId()); |
| | | // 根据设备类型ID查询设备类型信息 |
| | | DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); |
| | | |
| | | JSONObject res = new JSONObject(); |
| | | |
| | | res.put("tenant", tenantId); |
| | | res.put("equipment", equipmentVo); |
| | | res.put("shop", shop); |
| | | res.put("eqpType", eqpType); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); |
| | | // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | |
| | | break; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送消息 |
| | | * @param topic 订阅 |
| | | * @param mqMessage 消息体 |
| | | */ |
| | | private void sendMqttMessage(String topic, MqMessage mqMessage){ |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | String[] info = clientid.split("-"); |
| | | item.put("type", info[0]); |
| | | item.put("tenantId", info[1]); |
| | | //item.put("code", info[2]); |
| | | MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(mqMessage).getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(topic, mqMessage.getTentId()), sendMessage); |
| | | }catch (Exception e){ |
| | | e.printStackTrace(); |
| | | e.printStackTrace(); |
| | | } |
| | | redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); |
| | | System.err.println(String.format("设备: %s上线", clientid)); |
| | | } |
| | | |
| | | } |
| | | if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) { |
| | | try { |
| | | String clientid = messageJson.getString("clientid"); |
| | | redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT + clientid); |
| | | System.err.println(String.format("设备: %s下线", clientid)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | } |
| | | parseAdminCommand(topic, mqttMessage); |
| | | |
| | | break; |
| | | // 普通用户 |
| | | case "user": |
| | | System.err.println("user"); |
| | | try { |
| | | parseUserCommand(topic, mqttMessage); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | break; |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { |
| | | System.err.println("消息传递成功"); |
| | | } |
| | | |
| | | // 解析admin角色指令 |
| | | private void parseAdminCommand(String topic, MqttMessage mqttMessage) { |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | //请求的客户端(服务端只推送数据到请求的客户端) |
| | | StringBuilder req = new StringBuilder(); |
| | | if (messageJson.containsKey("req")) { |
| | | req.append(messageJson.get("req")); |
| | | } |
| | | //前端传参时间戳转换 |
| | | if (messageJson.containsKey("timestamp")) { |
| | | messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); |
| | | } |
| | | // 实时数据上传太频繁且数据内容超过字段大小不记录日志 |
| | | if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ |
| | | baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); |
| | | } |
| | | |
| | | switch (topic) { |
| | | // 查询设备在线 |
| | | case MqttConstant.MOBILE_QUERY_EQU_STATU: |
| | | System.err.println("admin收到" + topic); |
| | | // 根据设备id查询设备mqtt在线状态 |
| | | String clientId = messageJson.getString("clientId"); |
| | | JSONObject client = (JSONObject) redisUtil.get(MqttConstant.MQTT_ONLINE_CLIENT + clientId); |
| | | |
| | | ThreadUtil.execute(() -> { |
| | | |
| | | if (client == null || client.isEmpty()) { |
| | | JSONObject res = new JSONObject(); |
| | | res.put("success", false); |
| | | res.put("msg", "查询失败"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | client.put("success", true); |
| | | client.put("msg", "查询成功"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage); |
| | | baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | break; |
| | | |
| | | // 接收设备实时数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | RealTimeDataVo vo = JSON.parseObject(message, RealTimeDataVo.class); |
| | | realTimeDataService.realTimeDataHandle(vo); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | break; |
| | | // 接收设备报警数据 |
| | | case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { |
| | | }); |
| | | // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); |
| | | System.err.println(listMqMessage.toString()); |
| | | faultRecordService.saveBatch(listMqMessage.getData()); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | break; |
| | | |
| | | |
| | | case MqttConstant.TENANT_UP_PREFIX_EQU: |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | Object equObj = messageJson.get("equipment"); |
| | | DryEquipment equipment = JSON.parseObject(equObj.toString(), DryEquipment.class); |
| | | TenantContext.setTenant(equipment.getTenantId()+""); |
| | | DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); |
| | | if (dryEquipment == null) { |
| | | equipmentService.save(equipment); |
| | | } |
| | | Object typeObj = messageJson.get("eqpType"); |
| | | DryEqpType eqpType = JSON.parseObject(typeObj.toString(), DryEqpType.class); |
| | | DryEqpType dryEqpType = eqpTypeService.getById(eqpType.getId()); |
| | | if (dryEqpType == null) { |
| | | eqpTypeService.save(eqpType); |
| | | } |
| | | // 获取设备所属车间 |
| | | Object shopObj = messageJson.get("shop"); |
| | | DryShop shop = JSON.parseObject(shopObj.toString(), DryShop.class); |
| | | DryShop dryShop = dryShopService.getById(shop.getId()); |
| | | if (dryShop == null) { |
| | | dryShopService.save(shop); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | // 解析user角色指令 |
| | | private void parseUserCommand(String topic, MqttMessage mqttMessage) { |
| | | |
| | | String message = new String(mqttMessage.getPayload()); |
| | | JSONObject messageJson = JSONObject.parseObject(message); |
| | | |
| | | //请求的客户端(服务端只推送数据到请求的客户端) |
| | | StringBuilder req = new StringBuilder(); |
| | | if (messageJson.containsKey("req")) { |
| | | req.append(messageJson.get("req")); |
| | | } |
| | | //前端传参时间戳转换 |
| | | if (messageJson.containsKey("timestamp")) { |
| | | messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); |
| | | } |
| | | |
| | | switch (topic) { |
| | | case MqttConstant.MOBILE_REQ_EQU_CMD: |
| | | System.err.println("user收到" + topic); |
| | | System.err.println(message); |
| | | ThreadUtil.execute(() -> { |
| | | //TODO 向PLC发送开关机操作,并返回信息 |
| | | JSONObject res = new JSONObject(); |
| | | res.put("success", true); |
| | | res.put("msg", "操作成功"); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(JSONObject.toJSONString(res).getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_CMD, req), sendMessage); |
| | | baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | }); |
| | | |
| | | |
| | | break; |
| | | case MqttConstant.SERVICE_REQ_PREFIX: |
| | | log.debug("收到设备详细信息查询请求"); |
| | | ThreadUtil.execute(() -> { |
| | | String tenantId = messageJson.getString("tenantId"); |
| | | String clientId = mqttUtil.getMqttClient().getClientId(); |
| | | String tenant = clientId.substring(clientId.lastIndexOf("_")+1); |
| | | if (tenantId!=null && tenantId.equals(tenant)) { |
| | | TenantContext.setTenant(tenantId); |
| | | // 根据设备编码查询设备信息 |
| | | String code = messageJson.getString("code"); |
| | | DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(tenantId,code); |
| | | // 根据设备车间id查询车间信息 |
| | | DryShop shop = dryShopService.getById(equipmentVo.getShopId()); |
| | | // 根据设备类型ID查询设备类型信息 |
| | | DryEqpType eqpType = eqpTypeService.getById(equipmentVo.getType()); |
| | | |
| | | JSONObject res = new JSONObject(); |
| | | |
| | | res.put("tenant", tenantId); |
| | | res.put("equipment", equipmentVo); |
| | | res.put("shop", shop); |
| | | res.put("eqpType", eqpType); |
| | | try { |
| | | MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); |
| | | sendMessage.setQos(0); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); |
| | | // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | |
| | | |
| | | break; |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | } |