package org.jeecg.modules.dry.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.session.IoSession; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqMessage; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; import org.jeecg.modules.dry.vo.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.text.DecimalFormat; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service public class DryRealTimeDataServiceImpl implements IDryRealTimeDataService { @Autowired private IDryOrderService dryOrderService; @Autowired private IDryOrderTrendService dryOrderTrendService; @Autowired private IDryHerbService herbService; @Autowired private IDryHerbFormulaService dryHerbFormulaService; @Autowired private IDryEquipmentService equipmentService; @Autowired private RedisUtil redisUtil; @Autowired private IDryProdRecordService prodRecordService; @Autowired private IDryFaultRecordService faultRecordService; private String token; @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Value(value = "${jeecg.mqtt.enable}") private boolean mqttEnable; public String getTemporaryToken() { if (token == null) { RedisUtil redisUtil = SpringContextUtils.getBean(RedisUtil.class); // 模拟登录生成Token token = JwtUtil.sign("admin", "b668043e3ea4bc2d"); // 设置Token缓存有效时间为 5 分钟 redisUtil.set(CommonConstant.PREFIX_USER_TOKEN + token, token); redisUtil.expire(CommonConstant.PREFIX_USER_TOKEN + token, 5 * 60 * 1000); } return token; } @Override @Transactional public Result realTimeDataHandle(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid()+""); log.info("实时数据:"+realTimeDataVo.toString()); // 1 查询或创建工单 // 1.1 从redis取出工单缓存 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid()); // 1.2 如果有缓存记录 if(orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) { // 1.3 没有缓存记录再查询数据库 } else { // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { log.error("工单不存在,工单号:"+realTimeDataVo.getWorkorder()+",设备:" + realTimeDataVo.getMachineid() +",药材:" + realTimeDataVo.getName()); return Result.error("工单不存在"); } // 2 更新工单实时数据 // 2.1 将工单中的数据替换为最新数据 realTimeDataVo.setOrderId(orderVo.getId()); orderVo.setInitial(realTimeDataVo.getMoisture1()); orderVo.setDryTime(realTimeDataVo.getTime3()); orderVo.setDelay(realTimeDataVo.getDelay()); orderVo.setTurn(realTimeDataVo.getTurntime()); orderVo.setYield(realTimeDataVo.getWeight3()); orderVo.setStart(realTimeDataVo.getStart()); orderVo.setAuto(realTimeDataVo.getAuto()); orderVo.setPlcdisable(realTimeDataVo.getPlcdisable()); orderVo.setLowalarm(realTimeDataVo.getLowalarm()); orderVo.setWind(realTimeDataVo.getWind()); orderVo.setOriginWeight(realTimeDataVo.getWeight2()); orderVo.setWatt(realTimeDataVo.getWatt()); orderVo.setSteam(realTimeDataVo.getSteam()); orderVo.setEnvHum(realTimeDataVo.getEnvhum()); orderVo.setEnvTemp(realTimeDataVo.getEnvtemp()); orderVo.setRemain(realTimeDataVo.getAi_total_time()); orderVo.setCurRemain(realTimeDataVo.getAi_time()); orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); orderVo.setEqp_status(realTimeDataVo.getEqp_status()); // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); orderVo.setWarning(realTimeDataVo.getEqp_warning()); orderVo.setFault(realTimeDataVo.getEqp_fault()); orderVo.setLevel(realTimeDataVo.getLevel()); DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo); // 2.2 保存工单含水率变化 或 重量变化 saveOrderTrendVo(trendVo, orderVo); orderVo.setTrendVo(trendVo); orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2()); // 2.3 更新到redis缓存 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60); return Result.ok(); } @Override @Transactional public Result realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) { TenantContext.setTenant(realTimeDataParentVo.getTenantid()+""); log.info("实时数据:"+realTimeDataParentVo.toString()); if (realTimeDataParentVo.getRealTime() != null) { RealTimeDataVo realTimeDataVo = realTimeDataParentVo.getRealTime(); // 1 查询或创建工单 // 1.1 从redis取出工单缓存 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataParentVo.getTenantid()+"_"+realTimeDataParentVo.getMachineid()); // 1.2 如果有缓存记录 if(orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { // 1.3 没有缓存记录再查询数据库 } else { // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { log.error("工单不存在,工单号:"+realTimeDataParentVo.getWorkorder()+",设备:" + realTimeDataParentVo.getMachineid() +",药材:" + realTimeDataVo.getName()); return Result.error("工单不存在"); } // 2 更新工单实时数据 // 2.1 将工单中的数据替换为最新数据 realTimeDataVo.setOrderId(orderVo.getId()); orderVo.setInitial(realTimeDataVo.getMoisture1()); orderVo.setDryTime(realTimeDataVo.getTime3()); orderVo.setDelay(realTimeDataVo.getDelay()); orderVo.setTurn(realTimeDataVo.getTurntime()); orderVo.setYield(realTimeDataVo.getWeight3()); orderVo.setStart(realTimeDataVo.getStart()); orderVo.setAuto(realTimeDataVo.getAuto()); orderVo.setPlcdisable(realTimeDataVo.getPlcdisable()); orderVo.setLowalarm(realTimeDataVo.getLowalarm()); orderVo.setWind(realTimeDataVo.getWind()); orderVo.setOriginWeight(realTimeDataVo.getWeight2()); orderVo.setWatt(realTimeDataVo.getWatt()); orderVo.setSteam(realTimeDataVo.getSteam()); orderVo.setEnvHum(realTimeDataVo.getEnvhum()); orderVo.setEnvTemp(realTimeDataVo.getEnvtemp()); orderVo.setRemain(realTimeDataVo.getAi_total_time()); orderVo.setCurRemain(realTimeDataVo.getAi_time()); orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); orderVo.setEqp_status(realTimeDataVo.getEqp_status()); // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); orderVo.setWarning(realTimeDataVo.getEqp_warning()); orderVo.setFault(realTimeDataVo.getEqp_fault()); orderVo.setLevel(realTimeDataVo.getLevel()); DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo); // 2.2 保存工单含水率变化 或 重量变化 saveOrderTrendVo(trendVo, orderVo); orderVo.setTrendVo(trendVo); orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2()); // 2.3 更新到redis缓存 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid()+"_"+realTimeDataVo.getMachineid(),orderVo, 60*60); } if (realTimeDataParentVo.getReport() != null) { saveReport(realTimeDataParentVo); } return Result.ok(); } private void saveReport(RealTimeDataParentVo realTimeDataParentVo) { RealTimeReportVo report = realTimeDataParentVo.getReport(); if(report.getReport_flag()) { DryProdRecord prodRecord = new DryProdRecord(); prodRecord.setReportHeadName(report.getReport_head_name()); prodRecord.setReportHeadBatch(report.getReport_head_batch()); prodRecord.setReportHeadNum(report.getReport_head_num()); prodRecord.setReportHeadMachine(report.getReport_head_machine()); prodRecord.setReportHeadAccepter(report.getReport_head_accepter()); prodRecord.setReportHeadDate(report.getReport_head_date()); prodRecord.setReportHeadLeader(report.getReport_head_leader()); prodRecord.setReportHeadTecher(report.getReport_head_techer()); prodRecord.setReportCheckField(report.getReport_check_field()?1:0); prodRecord.setReportCheckFile(report.getReport_check_file()?1:0); prodRecord.setReportCheckTag(report.getReport_check_tag()?1:0); prodRecord.setReportCheckTool(report.getReport_check_tool()?1:0); prodRecord.setReportCheckMan(report.getReport_check_man()); prodRecord.setReportCheckStatus(report.getReport_check_status()?1:0); prodRecord.setReportCheckQa(report.getReport_check_qa()); prodRecord.setReportCheckRecord(report.getReport_check_record()); prodRecord.setReportProductView(report.getReport_product_view()?1:0); prodRecord.setReportProductWind(report.getReport_product_wind()?1:0); prodRecord.setReportProductSun(report.getReport_product_sun()?1:0); prodRecord.setReportProductLowDry(report.getReport_product_low_dry()?1:0); prodRecord.setReportProductDry(report.getReport_product_dry()?1:0); prodRecord.setReportProductStart(report.getReport_product_start()); prodRecord.setReportProductEnd(report.getReport_product_end()); prodRecord.setReportProductTotal(report.getReport_product_total()); prodRecord.setReportProductCheck(report.getReport_product_check()?1:0); prodRecord.setReportProductMan1(report.getReport_product_man1()); prodRecord.setReportProductMan2(report.getReport_product_man2()); prodRecord.setReportProductWeight(report.getReport_product_weight()); prodRecord.setReportProductWaste(report.getReport_product_waste()); prodRecord.setReportProductUse(report.getReport_product_use()); prodRecord.setReportProductQa(report.getReport_product_qa()); prodRecord.setReportCleanMachine(report.getReport_clean_machine()?1:0); prodRecord.setReportCleanWaste(report.getReport_clean_waste()?1:0); prodRecord.setReportCleanTool(report.getReport_clean_tool()?1:0); prodRecord.setReportCleanDoor(report.getReport_clean_door()?1:0); prodRecord.setReportCleanBox(report.getReport_clean_box()?1:0); prodRecord.setReportCleanRecord(report.getReport_clean_record()?1:0); prodRecord.setReportCleanDate(report.getReport_clean_date()); prodRecord.setReportCleanMan(report.getReport_clean_man()); prodRecord.setReportCleanConfirm(report.getReport_clean_confirm()?1:0); prodRecord.setReportCleanQa(report.getReport_clean_qa()); prodRecordService.save(prodRecord); } } /** * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条 * @param realTimeDataVo * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { DryOrderVo orderVo; LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); queryWrapper.eq(DryOrder::getTenantId, realTimeDataVo.getTenantid()); // 1 查询数据库 DryOrder one = dryOrderService.getOne(queryWrapper); // 2 数据库有记录,更新到缓存 if (one != null) { // 转换为缓存数据结构 orderVo = BeanUtil.toBean(one, DryOrderVo.class); if (one.getTemps() != null) { Map map = JSONObject.parseObject(one.getTemps(), new TypeReference>(){}); orderVo.setBellowsTemp(map); } // 查询称重记录,添加到缓存数据结构 List trendVos = dryOrderTrendService.listByOrderId(one.getId()); if (trendVos != null && trendVos.size() > 0) { DryOrderTrendVo oldVo = trendVos.get(trendVos.size() - 1); orderVo.setTrendVo(oldVo); orderVo.setDetailList(trendVos); } // 3 数据库没有则新增一条数据 } else { orderVo = saveNewOrder(realTimeDataVo); } return orderVo; } /** * 保存新工单 * @param realTimeDataVo * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { DryOrderVo orderVo; // 查询设备 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); if (equ == null) { log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); return null; } // 查询药材 DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); if (herbFormula == null) { log.error("未找到药材:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); return null; } // 创建新工单 orderVo = new DryOrderVo(realTimeDataVo); orderVo.setHerbId(herbFormula.getId()); orderVo.setEquId(equ.getId()); DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); boolean save = dryOrderService.save(dryOrder); return orderVo; } /** * 查询设备,新设备则添加到设备主数据 * @param realTimeDataVo * @return */ private DryEquipment queryEquipmentByCodeTenant(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); DryEquipment one = equipmentService.getOne(queryWrapper); if (one == null) { log.error(role+"保存实时数据,未找到设备:"+realTimeDataVo.getMachineid()); // one = new DryEquipment(realTimeDataVo); // equipmentService.save(one); if (MqttConstant.ROLE_ADMIN.equals(role)) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); JSONObject object = new JSONObject(); object.put("code", realTimeDataVo.getMachineid()); object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage); }catch (MqttException e) { e.printStackTrace(); } } return null; } return one; } /** * 查询药材,新药材添加到数据库 * @param realTimeDataVo * @return */ private DryHerbFormula queryHerbByIndexTenant(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryHerbFormula::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryHerbFormula::getName, realTimeDataVo.getName()); DryHerbFormula one = dryHerbFormulaService.getOne(queryWrapper); if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); one.setEqpType(dryEquipment.getType()); dryHerbFormulaService.save(one); } return one; } /** * 保存含水率变化记录 * @param trendVo * @param orderVo */ private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) { //判断 实时含水率 或 实时重量有没有变化,有变化则更新 if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 || orderVo.getTrendVo()!=null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() ) { DryOrder byId = dryOrderService.getById(orderVo.getId()); // 将最新结果更新到工单 if (byId != null) { BeanUtil.copyProperties(orderVo, byId); byId.setTemps(JSONObject.toJSONString(orderVo.getBellowsTemp())); dryOrderService.updateById(byId); } // 保存含水率变化 orderVo.getDetailList().add(trendVo); DryOrderTrend dryOrderTrend = BeanUtil.toBean(trendVo, DryOrderTrend.class); dryOrderTrendService.save(dryOrderTrend); } } /** * 查询机台实时数据 * @param realTimeDataVo * @return */ @Override public Result queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid()+""); // 查询所有机台,查询语句组装 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getEnable, "Y"); queryWrapper.orderByAsc(DryEquipment::getCode); // 查询所有机台 List dryEquipments = equipmentService.list(queryWrapper); // 机台列表,用于效率对比 List list = new ArrayList<>(); // 效率列表,用于效率对比 List dList = new ArrayList<>(); // 查询当前机台工单 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid()); try { // 如果有可用机台 if (dryEquipments != null && dryEquipments.size() > 0) { // 查询其它机台工单 dryEquipments.stream().forEach(item -> { // 获取工单 DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); list.add(item.getName().substring(0, item.getName().indexOf('#')+1)); if (order != null) { // 计算干燥效率,用于对比 DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1); double v = order.getOriginWeight() - dryOrderTrendVo.getWeight(); if (v > 0 && dryOrderTrendVo.getTotalTime()>0) { DecimalFormat df = new DecimalFormat("#.00"); dList.add(Double.valueOf(df.format(v / dryOrderTrendVo.getTotalTime() * 60))); } else { dList.add(50d); } } else { // 如果没有生产,效率置0 dList.add(0d); } }); } if (orderVo != null) { // 将效率对比数据放入当前机台实时数据中返回 orderVo.setCompEqpNum(list); orderVo.setCompEqpEffic(dList); // 查询近十次效率和能能耗平均 dryOrderService.queryRecentOrderAvg(orderVo); } }catch (Exception e) { e.printStackTrace(); } return Result.ok(orderVo); } @Override public Result sendSocketMsg(CommandMessageVo msgVo) { DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(msgVo.getTenantId() + "", msgVo.getMachineId()); log.info("获取设备:" + dryEquipment.toString()); // managedSessions.keySet().forEach(addr -> { // ObjectOutputStream oos = null; try { // Socket socket = SocketServerConfig.clientMap.get(addr); IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp()); if (session == null) { return Result.error("未获取到session,请检查客户端配置或设备ip配置是否正常"); } SocketMsgVo smv = new SocketMsgVo(msgVo); session.write(JSONObject.toJSONString(smv)); // oos = new ObjectOutputStream(socket.getOutputStream()); // String s = JSONObject.toJSONString(new SocketMsgVo(msgVo)); // oos.writeUTF(s); // oos.flush(); } catch (Exception e) { throw new RuntimeException(e); } finally { } // }); return Result.OK(); } @Override public Result queryWorkshopStatistics(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getEnable, "Y"); List dryEquipments = equipmentService.list(queryWrapper); DryOrderVo orderVo = new DryOrderVo(); if (dryEquipments != null && dryEquipments.size() > 0) { dryEquipments.stream().forEach(item -> { DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); if (order != null) { orderVo.setEnvHum(order.getEnvHum()); orderVo.setEnvTemp(order.getEnvTemp()); double watt = order.getWatt() - order.getDetailList().get(0).getWatt(); orderVo.setWatt(orderVo.getWatt()==null? watt : orderVo.getWatt() + watt); double steam = order.getSteam() - order.getDetailList().get(0).getSteam(); orderVo.setSteam(orderVo.getSteam()==null? steam : orderVo.getSteam() + steam); orderVo.setOriginWeight(orderVo.getOriginWeight()==null? order.getOriginWeight(): orderVo.getOriginWeight() + order.getOriginWeight()); double yield = order.getOriginWeight()*(1-(order.getInitial()/100))/(1-(order.getTarget()/100)); orderVo.setYield(orderVo.getYield()==null? yield: orderVo.getYield() + yield); double sub = order.getOriginWeight() - order.getYield(); orderVo.setReduce(orderVo.getReduce()==null? sub: orderVo.getReduce() + sub); } }); } //redisUtil.get(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode() return Result.OK(orderVo); } @Override public Result statisticsDataHandle(StatisticsDataVo statsDataVo) { return null; } @Override public Result fitFaultRecord(RealTimeDataVo vo) { TenantContext.setTenant(vo.getTenantid()+""); ThreadUtil.execute(() -> { try { //解析存储报警数据 List faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); //处理结束后,将redis中实时数据发送至云服务器 Map toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); if(mqttEnable && !toCloudFaultMap.isEmpty()){ MqMessage< Map> message = new MqMessage<>(); message.setData(toCloudFaultMap); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); } //要保存的历史故障 if(!faultRecords1.isEmpty()){ MqMessage> message = new MqMessage<>(); message.setData(faultRecords1); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); } } catch (Exception e) { e.printStackTrace(); } }); return null; } @Override public void fitFaultRecord(RealTimeDataParentVo vo) { TenantContext.setTenant(vo.getTenantid()+""); ThreadUtil.execute(() -> { try { //解析存储报警数据 List faultRecords1 = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List faultRecords2 = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); //处理结束后,将redis中实时数据发送至云服务器 Map toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); if(mqttEnable && !toCloudFaultMap.isEmpty()){ MqMessage< Map> message = new MqMessage<>(); message.setData(toCloudFaultMap); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); } //要保存的历史故障 if(!faultRecords1.isEmpty()){ MqMessage> message = new MqMessage<>(); message.setData(faultRecords1); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); } } catch (Exception e) { e.printStackTrace(); } }); } /** * 解析存储故障数据 * TODO 保证原子性 * @param fault 故障数据 * @param orderId 工单 * @param tenantId 租户 * @param machineId 设备 * @param faultType 故障类型 * @return 组装好故障数据 */ private List fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){ List result = new ArrayList<>(); //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警", System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"--"+fault); //redis中的故障 Map rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // 键转换为字符串 entry -> entry.getValue() )); //没有生成工单的故障数据不存储 if(StringUtils.isEmpty(orderId)){ return result; } if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){ return result; } //1.解析数据 String[] eqpFaults = fault.split(","); Map addFauMap = new HashMap<>(); Map realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //避免空字符串 if(StringUtils.isEmpty(eqpFault.trim())) continue; //1.1检查mqtt中是否已存在这个故障 String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim(); realFauMap.put(redisKey, new DryFaultRecord()); DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); //1.2如果redis不存在则存入(存故障开始) if(rFault ==null){ //组装缓存数据 // DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); // addFauMap.put(redisKey,faultRecord); Map equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); addFauMap.put(redisKey,vo); }else { //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) if(rFault.getECount()!=null && rFault.getECount() > 1){ rFault.setECount(1); redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); System.err.println("报警次数重置 clear clear ,key-"+redisKey); } } } //1.3缓存至redis //合并数据 addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //没有新故障数据不用覆盖 if(!addFauMap.isEmpty()){ redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); } //2检测已结束的故障 //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库 Map curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); curFauMap.keySet().stream() //特别注意,多个报警类型共用方法需要区分类型 .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); vo.setECount(vo.getECount()+1); if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ //更新次数 redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); System.err.println("报警次数更新,key-"+key); } if(vo.getECount()>=3){ vo.setEndTime(new Date()); //TODO 结束超过某个时间区间判定为错误数据 faultRecordService.save(vo); redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); result.add(vo); System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"存入数据库"); } }); return result; } }