| | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.apache.mina.core.session.IoSession; |
| | |
| | | import org.jeecg.modules.dry.common.CacheConstants; |
| | | import org.jeecg.modules.dry.entity.*; |
| | | import org.jeecg.modules.dry.mqtt.MqMessage; |
| | | import org.jeecg.modules.dry.mqtt.MqttConfig; |
| | | import org.jeecg.modules.dry.mqtt.MqttUtil; |
| | | import org.jeecg.modules.dry.service.*; |
| | | import org.jeecg.modules.dry.socket.ServerHandler; |
| | |
| | | private IDryEquipmentService equipmentService; |
| | | |
| | | @Autowired |
| | | private IDryEqpTypeService dryEqpTypeService; |
| | | |
| | | @Autowired |
| | | private RedisUtil redisUtil; |
| | | |
| | | @Autowired |
| | |
| | | |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | |
| | | |
| | | @Autowired |
| | | private MqttUtil mqttUtil; |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | @Override |
| | | @Transactional |
| | | public Result<?> realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) { |
| | |
| | | if (realTimeDataParentVo.getReport() != null) { |
| | | saveReport(realTimeDataParentVo); |
| | | } |
| | | |
| | | if (realTimeDataParentVo.getFault() != null) { |
| | | fitFaultRecord(realTimeDataParentVo); |
| | | } |
| | | return Result.ok(); |
| | | } |
| | | |
| | |
| | | |
| | | /** |
| | | * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条 |
| | | * |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | | private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() +""); |
| | | DryOrderVo orderVo; |
| | | LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); |
| | |
| | | // 转换为缓存数据结构 |
| | | orderVo = BeanUtil.toBean(one, DryOrderVo.class); |
| | | if (one.getTemps() != null) { |
| | | Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer,Double>>(){}); |
| | | Map map = JSONObject.parseObject(one.getTemps(), new TypeReference<Map<Integer, Double>>() { |
| | | }); |
| | | orderVo.setBellowsTemp(map); |
| | | } |
| | | // 查询称重记录,添加到缓存数据结构 |
| | |
| | | |
| | | /** |
| | | * 保存新工单 |
| | | * |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | | private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid() +""); |
| | | DryOrderVo orderVo; |
| | | |
| | | // 查询设备 |
| | | DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); |
| | | if (equ == null) { |
| | | log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); |
| | | log.error("新增设备:"); |
| | | if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) { |
| | | log.error("新增设备失败:设备ID或租户ID为空!machineid={}, tenantid={}", |
| | | realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid()); |
| | | return null; |
| | | } |
| | | |
| | | DryEquipment addEqu = new DryEquipment(realTimeDataVo); |
| | | |
| | | try { |
| | | String digits = StringUtils.getDigits(realTimeDataVo.getMachineid()); |
| | | addEqu.setName(Integer.parseInt(digits) + "#干燥设备"); |
| | | } catch (NumberFormatException e) { |
| | | log.error("设备ID格式错误,无法提取数字部分:machineid={}", realTimeDataVo.getMachineid(), e); |
| | | return null; |
| | | } |
| | | |
| | | DryEqpType eqpType = dryEqpTypeService.getOne( |
| | | new LambdaQueryWrapper<DryEqpType>() |
| | | .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) |
| | | ); |
| | | if(eqpType == null){ |
| | | log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() ); |
| | | return null; |
| | | } |
| | | |
| | | Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); |
| | | |
| | | if (!equipmentService.save(addEqu)) { |
| | | log.error("新增设备失败:数据库保存异常!equipment={}", addEqu); |
| | | return null; |
| | | } |
| | | equ = addEqu; |
| | | |
| | | log.info("新增设备成功:equipmentId={}", addEqu.getId()); |
| | | |
| | | } |
| | | // 查询药材 |
| | | DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); |
| | |
| | | |
| | | /** |
| | | * 查询设备,新设备则添加到设备主数据 |
| | | * |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | |
| | | object.put("tenantId", realTimeDataVo.getTenantid()); |
| | | mqttMessage.setPayload(object.toJSONString().getBytes()); |
| | | try { |
| | | if(mqttEnable){ |
| | | mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage); |
| | | } |
| | | }catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | |
| | | |
| | | /** |
| | | * 查询药材,新药材添加到数据库 |
| | | * |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | |
| | | if (one == null) { |
| | | one = new DryHerbFormula(realTimeDataVo); |
| | | DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); |
| | | if (dryEquipment!=null&&dryEquipment.getType()!=null) { |
| | | one.setEqpType(dryEquipment.getType()); |
| | | } |
| | | |
| | | dryHerbFormulaService.save(one); |
| | | } |
| | | return one; |
| | |
| | | |
| | | /** |
| | | * 保存含水率变化记录 |
| | | * |
| | | * @param trendVo |
| | | * @param orderVo |
| | | */ |
| | |
| | | |
| | | /** |
| | | * 查询机台实时数据 |
| | | * |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | |
| | | |
| | | |
| | | //处理结束后,将redis中实时数据发送至云服务器 |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); |
| | | if(mqttEnable && !toCloudFaultMap.isEmpty()){ |
| | | MqMessage< Map<Object, Object>> message = new MqMessage<>(); |
| | | message.setData(toCloudFaultMap); |
| | |
| | | mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | } catch (Exception e) { |
| | |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | //解析存储报警数据 |
| | | List<DryFaultRecord> faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); |
| | | List<DryFaultRecord> faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); |
| | | faultRecords1.addAll(faultRecords2); |
| | | |
| | | |
| | | //处理结束后,将redis中实时数据发送至云服务器 |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | if(mqttEnable && !toCloudFaultMap.isEmpty()){ |
| | | MqMessage< Map<Object, Object>> message = new MqMessage<>(); |
| | | message.setData(toCloudFaultMap); |
| | | message.setTentId(vo.getTenantid()+""); |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(0); |
| | | mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); |
| | | List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); |
| | | List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); |
| | | if(!errorList.isEmpty()){ |
| | | log.error("保存故障:{}", errorList.toString()); |
| | | } |
| | | if(!warnList.isEmpty()){ |
| | | log.error("保存告警:{}", warnList.toString()); |
| | | } |
| | | |
| | | //以下为云服务处理故障,厂内本地服务无需处理 |
| | | if(!mqttEnable)return; |
| | | |
| | | //要保存的历史故障 |
| | | if(!faultRecords1.isEmpty()){ |
| | | MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); |
| | | message.setData(faultRecords1); |
| | | message.setTentId(vo.getTenantid()+""); |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(0); |
| | | mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); |
| | | |
| | | |
| | | //处理结束后,将redis中实时数据发送至云服务器 key = tenantId + machineId + eqpFault |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); |
| | | |
| | | |
| | | Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), |
| | | entry -> (DryFaultRecordVo)entry.getValue() |
| | | )); |
| | | |
| | | String tenantId = vo.getTenantid() +""; |
| | | |
| | | //广播发送给各租户下移动设备 |
| | | if (dryFaultMap.isEmpty()) { |
| | | return; |
| | | } |
| | | |
| | | |
| | | |
| | | String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); |
| | | //数据转换 |
| | | List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values())); |
| | | MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); |
| | | //发送广播 |
| | | log.error("广播给:{}" , recTopic); |
| | | mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | |
| | | /** |
| | | * 解析存储故障数据 |
| | | * TODO 保证原子性 |
| | | * |
| | | * @param fault 故障数据 |
| | | * @param orderId 工单 |
| | | * @param tenantId 租户 |
| | |
| | | //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警", |
| | | System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"--"+fault); |
| | | //redis中的故障 |
| | | Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | Map<Object, Object> rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); |
| | | Map<String, Object> redFauMap = rFauMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), // 键转换为字符串 |
| | |
| | | |
| | | |
| | | realFauMap.put(redisKey, new DryFaultRecord()); |
| | | DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); |
| | | DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey); |
| | | //1.2如果redis不存在则存入(存故障开始) |
| | | if(rFault ==null){ |
| | | //组装缓存数据 |
| | |
| | | DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); |
| | | addFauMap.put(redisKey,vo); |
| | | }else { |
| | | //TODO 特殊情况,如果redis的故障和新 |
| | | |
| | | |
| | | //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) |
| | | if(rFault.getECount()!=null && rFault.getECount() > 1){ |
| | | rFault.setECount(1); |
| | | redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault); |
| | | System.err.println("报警次数重置 clear clear ,key-"+redisKey); |
| | | } |
| | | |
| | |
| | | addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); |
| | | //没有新故障数据不用覆盖 |
| | | if(!addFauMap.isEmpty()){ |
| | | redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); |
| | | redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap); |
| | | } |
| | | |
| | | //2检测已结束的故障 |
| | | //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库 |
| | | Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | Map<Object, Object> curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); |
| | | curFauMap.keySet().stream() |
| | | //特别注意,多个报警类型共用方法需要区分类型 |
| | | .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) |
| | | .forEach(key -> { |
| | | DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); |
| | | vo.setECount(vo.getECount()+1); |
| | | if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ |
| | | if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { |
| | | //更新次数 |
| | | redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); |
| | | redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); |
| | | System.err.println("报警次数更新,key-"+key); |
| | | } |
| | | |
| | |
| | | vo.setEndTime(new Date()); |
| | | //TODO 结束超过某个时间区间判定为错误数据 |
| | | faultRecordService.save(vo); |
| | | redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); |
| | | redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); |
| | | result.add(vo); |
| | | System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"存入数据库"); |
| | | } |
| | | }); |
| | | |
| | | |
| | | |
| | | return result; |
| | | } |
| | | |
| | | |
| | | |
| | | } |