package org.jeecg.modules.dry.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.session.IoSession; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqMessage; import org.jeecg.modules.dry.mqtt.MqttConfig; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; import org.jeecg.modules.dry.vo.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.text.DecimalFormat; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service public class DryRealTimeDataServiceImpl implements IDryRealTimeDataService { @Autowired private IDryOrderService dryOrderService; @Autowired private IDryOrderTrendService dryOrderTrendService; @Autowired private IDryHerbService herbService; @Autowired private IDryHerbFormulaService dryHerbFormulaService; @Autowired private IDryEquipmentService equipmentService; @Autowired private IDryEqpTypeService dryEqpTypeService; @Autowired private RedisUtil redisUtil; @Autowired private IDryProdRecordService prodRecordService; @Autowired private IDryFaultRecordService faultRecordService; private String token; @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; @Value(value = "${jeecg.mqtt.enable}") private boolean mqttEnable; public String getTemporaryToken() { if (token == null) { RedisUtil redisUtil = SpringContextUtils.getBean(RedisUtil.class); // 模拟登录生成Token token = JwtUtil.sign("admin", "b668043e3ea4bc2d"); // 设置Token缓存有效时间为 5 分钟 redisUtil.set(CommonConstant.PREFIX_USER_TOKEN + token, token); redisUtil.expire(CommonConstant.PREFIX_USER_TOKEN + token, 5 * 60 * 1000); } return token; } @Override @Transactional public Result realTimeDataHandle(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); log.info("实时数据:" + realTimeDataVo.toString()); // 1 查询或创建工单 // 1.1 从redis取出工单缓存 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid()); // 1.2 如果有缓存记录 if (orderVo != null && orderVo.getCode().equals(realTimeDataVo.getWorkorder())) { // 1.3 没有缓存记录再查询数据库 } else { // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { log.error("工单不存在,工单号:" + realTimeDataVo.getWorkorder() + ",设备:" + realTimeDataVo.getMachineid() + ",药材:" + realTimeDataVo.getName()); return Result.error("工单不存在"); } // 2 更新工单实时数据 // 2.1 将工单中的数据替换为最新数据 realTimeDataVo.setOrderId(orderVo.getId()); orderVo.setInitial(realTimeDataVo.getMoisture1()); orderVo.setDryTime(realTimeDataVo.getTime3()); orderVo.setDelay(realTimeDataVo.getDelay()); orderVo.setTurn(realTimeDataVo.getTurntime()); orderVo.setYield(realTimeDataVo.getWeight3()); orderVo.setStart(realTimeDataVo.getStart()); orderVo.setAuto(realTimeDataVo.getAuto()); orderVo.setPlcdisable(realTimeDataVo.getPlcdisable()); orderVo.setLowalarm(realTimeDataVo.getLowalarm()); orderVo.setWind(realTimeDataVo.getWind()); orderVo.setOriginWeight(realTimeDataVo.getWeight2()); orderVo.setWatt(realTimeDataVo.getWatt()); orderVo.setSteam(realTimeDataVo.getSteam()); orderVo.setEnvHum(realTimeDataVo.getEnvhum()); orderVo.setEnvTemp(realTimeDataVo.getEnvtemp()); orderVo.setRemain(realTimeDataVo.getAi_total_time()); orderVo.setCurRemain(realTimeDataVo.getAi_time()); orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); orderVo.setEqp_status(realTimeDataVo.getEqp_status()); // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); orderVo.setWarning(realTimeDataVo.getEqp_warning()); orderVo.setFault(realTimeDataVo.getEqp_fault()); orderVo.setLevel(realTimeDataVo.getLevel()); DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo); // 2.2 保存工单含水率变化 或 重量变化 saveOrderTrendVo(trendVo, orderVo); orderVo.setTrendVo(trendVo); orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2()); // 2.3 更新到redis缓存 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60); return Result.ok(); } @Override @Transactional public Result realTimeDataHandle(RealTimeDataParentVo realTimeDataParentVo) { TenantContext.setTenant(realTimeDataParentVo.getTenantid() + ""); log.info("实时数据:" + realTimeDataParentVo.toString()); if (realTimeDataParentVo.getRealTime() != null) { RealTimeDataVo realTimeDataVo = realTimeDataParentVo.getRealTime(); // 1 查询或创建工单 // 1.1 从redis取出工单缓存 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataParentVo.getTenantid() + "_" + realTimeDataParentVo.getMachineid()); // 1.2 如果有缓存记录 if (orderVo != null && orderVo.getCode().equals(realTimeDataParentVo.getWorkorder())) { // 1.3 没有缓存记录再查询数据库 } else { // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回 orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); } if (orderVo == null) { log.error("工单不存在,工单号:" + realTimeDataParentVo.getWorkorder() + ",设备:" + realTimeDataParentVo.getMachineid() + ",药材:" + realTimeDataVo.getName()); return Result.error("工单不存在"); } // 2 更新工单实时数据 // 2.1 将工单中的数据替换为最新数据 realTimeDataVo.setOrderId(orderVo.getId()); orderVo.setInitial(realTimeDataVo.getMoisture1()); orderVo.setDryTime(realTimeDataVo.getTime3()); orderVo.setDelay(realTimeDataVo.getDelay()); orderVo.setTurn(realTimeDataVo.getTurntime()); orderVo.setYield(realTimeDataVo.getWeight3()); orderVo.setStart(realTimeDataVo.getStart()); orderVo.setAuto(realTimeDataVo.getAuto()); orderVo.setPlcdisable(realTimeDataVo.getPlcdisable()); orderVo.setLowalarm(realTimeDataVo.getLowalarm()); orderVo.setWind(realTimeDataVo.getWind()); orderVo.setOriginWeight(realTimeDataVo.getWeight2()); orderVo.setWatt(realTimeDataVo.getWatt()); orderVo.setSteam(realTimeDataVo.getSteam()); orderVo.setEnvHum(realTimeDataVo.getEnvhum()); orderVo.setEnvTemp(realTimeDataVo.getEnvtemp()); orderVo.setRemain(realTimeDataVo.getAi_total_time()); orderVo.setCurRemain(realTimeDataVo.getAi_time()); orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); orderVo.setEqp_status(realTimeDataVo.getEqp_status()); // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); orderVo.setWarning(realTimeDataVo.getEqp_warning()); orderVo.setFault(realTimeDataVo.getEqp_fault()); orderVo.setLevel(realTimeDataVo.getLevel()); DryOrderTrendVo trendVo = new DryOrderTrendVo(realTimeDataVo); // 2.2 保存工单含水率变化 或 重量变化 saveOrderTrendVo(trendVo, orderVo); orderVo.setTrendVo(trendVo); orderVo.getBellowsTemp().put(realTimeDataVo.getTime3(), realTimeDataVo.getTemp2()); // 2.3 更新到redis缓存 redisUtil.hset(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid(), orderVo, 60 * 60); } if (realTimeDataParentVo.getReport() != null) { saveReport(realTimeDataParentVo); } if (realTimeDataParentVo.getFault() != null) { fitFaultRecord(realTimeDataParentVo); } return Result.ok(); } private void saveReport(RealTimeDataParentVo realTimeDataParentVo) { RealTimeReportVo report = realTimeDataParentVo.getReport(); if (report.getReport_flag()) { DryProdRecord prodRecord = new DryProdRecord(); prodRecord.setReportHeadName(report.getReport_head_name()); prodRecord.setReportHeadBatch(report.getReport_head_batch()); prodRecord.setReportHeadNum(report.getReport_head_num()); prodRecord.setReportHeadMachine(report.getReport_head_machine()); prodRecord.setReportHeadAccepter(report.getReport_head_accepter()); prodRecord.setReportHeadDate(report.getReport_head_date()); prodRecord.setReportHeadLeader(report.getReport_head_leader()); prodRecord.setReportHeadTecher(report.getReport_head_techer()); prodRecord.setReportCheckField(report.getReport_check_field() ? 1 : 0); prodRecord.setReportCheckFile(report.getReport_check_file() ? 1 : 0); prodRecord.setReportCheckTag(report.getReport_check_tag() ? 1 : 0); prodRecord.setReportCheckTool(report.getReport_check_tool() ? 1 : 0); prodRecord.setReportCheckMan(report.getReport_check_man()); prodRecord.setReportCheckStatus(report.getReport_check_status() ? 1 : 0); prodRecord.setReportCheckQa(report.getReport_check_qa()); prodRecord.setReportCheckRecord(report.getReport_check_record()); prodRecord.setReportProductView(report.getReport_product_view() ? 1 : 0); prodRecord.setReportProductWind(report.getReport_product_wind() ? 1 : 0); prodRecord.setReportProductSun(report.getReport_product_sun() ? 1 : 0); prodRecord.setReportProductLowDry(report.getReport_product_low_dry() ? 1 : 0); prodRecord.setReportProductDry(report.getReport_product_dry() ? 1 : 0); prodRecord.setReportProductStart(report.getReport_product_start()); prodRecord.setReportProductEnd(report.getReport_product_end()); prodRecord.setReportProductTotal(report.getReport_product_total()); prodRecord.setReportProductCheck(report.getReport_product_check() ? 1 : 0); prodRecord.setReportProductMan1(report.getReport_product_man1()); prodRecord.setReportProductMan2(report.getReport_product_man2()); prodRecord.setReportProductWeight(report.getReport_product_weight()); prodRecord.setReportProductWaste(report.getReport_product_waste()); prodRecord.setReportProductUse(report.getReport_product_use()); prodRecord.setReportProductQa(report.getReport_product_qa()); prodRecord.setReportCleanMachine(report.getReport_clean_machine() ? 1 : 0); prodRecord.setReportCleanWaste(report.getReport_clean_waste() ? 1 : 0); prodRecord.setReportCleanTool(report.getReport_clean_tool() ? 1 : 0); prodRecord.setReportCleanDoor(report.getReport_clean_door() ? 1 : 0); prodRecord.setReportCleanBox(report.getReport_clean_box() ? 1 : 0); prodRecord.setReportCleanRecord(report.getReport_clean_record() ? 1 : 0); prodRecord.setReportCleanDate(report.getReport_clean_date()); prodRecord.setReportCleanMan(report.getReport_clean_man()); prodRecord.setReportCleanConfirm(report.getReport_clean_confirm() ? 1 : 0); prodRecord.setReportCleanQa(report.getReport_clean_qa()); prodRecordService.save(prodRecord); } } /** * 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条 * * @param realTimeDataVo * @return */ private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder()); queryWrapper.eq(DryOrder::getTenantId, realTimeDataVo.getTenantid()); // 1 查询数据库 DryOrder one = dryOrderService.getOne(queryWrapper); // 2 数据库有记录,更新到缓存 if (one != null) { // 转换为缓存数据结构 orderVo = BeanUtil.toBean(one, DryOrderVo.class); if (one.getTemps() != null) { Map map = JSONObject.parseObject(one.getTemps(), new TypeReference>() { }); orderVo.setBellowsTemp(map); } // 查询称重记录,添加到缓存数据结构 List trendVos = dryOrderTrendService.listByOrderId(one.getId()); if (trendVos != null && trendVos.size() > 0) { DryOrderTrendVo oldVo = trendVos.get(trendVos.size() - 1); orderVo.setTrendVo(oldVo); orderVo.setDetailList(trendVos); } // 3 数据库没有则新增一条数据 } else { orderVo = saveNewOrder(realTimeDataVo); } return orderVo; } /** * 保存新工单 * * @param realTimeDataVo * @return */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() +""); DryOrderVo orderVo; // 查询设备 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); if (equ == null) { log.error("未找到设备:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid()); log.error("新增设备:"); if (realTimeDataVo.getMachineid() == null || realTimeDataVo.getTenantid() == null) { log.error("新增设备失败:设备ID或租户ID为空!machineid={}, tenantid={}", realTimeDataVo.getMachineid(), realTimeDataVo.getTenantid()); return null; } DryEquipment addEqu = new DryEquipment(realTimeDataVo); try { String digits = StringUtils.getDigits(realTimeDataVo.getMachineid()); addEqu.setName(Integer.parseInt(digits) + "#干燥设备"); } catch (NumberFormatException e) { log.error("设备ID格式错误,无法提取数字部分:machineid={}", realTimeDataVo.getMachineid(), e); return null; } DryEqpType eqpType = dryEqpTypeService.getOne( new LambdaQueryWrapper() .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid()) ); if(eqpType == null){ log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() ); return null; } Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId())); if (!equipmentService.save(addEqu)) { log.error("新增设备失败:数据库保存异常!equipment={}", addEqu); return null; } equ = addEqu; log.info("新增设备成功:equipmentId={}", addEqu.getId()); } // 查询药材 DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); if (herbFormula == null) { log.error("未找到药材:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid()); return null; } // 创建新工单 orderVo = new DryOrderVo(realTimeDataVo); orderVo.setHerbId(herbFormula.getId()); orderVo.setEquId(equ.getId()); DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); boolean save = dryOrderService.save(dryOrder); return orderVo; } /** * 查询设备,新设备则添加到设备主数据 * * @param realTimeDataVo * @return */ private DryEquipment queryEquipmentByCodeTenant(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); DryEquipment one = equipmentService.getOne(queryWrapper); if (one == null) { log.error(role + "保存实时数据,未找到设备:" + realTimeDataVo.getMachineid()); // one = new DryEquipment(realTimeDataVo); // equipmentService.save(one); if (MqttConstant.ROLE_ADMIN.equals(role)) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); JSONObject object = new JSONObject(); object.put("code", realTimeDataVo.getMachineid()); object.put("tenantId", realTimeDataVo.getTenantid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { if(mqttEnable){ mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage); } } catch (MqttException e) { e.printStackTrace(); } } return null; } return one; } /** * 查询药材,新药材添加到数据库 * * @param realTimeDataVo * @return */ private DryHerbFormula queryHerbByIndexTenant(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryHerbFormula::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryHerbFormula::getName, realTimeDataVo.getName()); DryHerbFormula one = dryHerbFormulaService.getOne(queryWrapper); if (one == null) { one = new DryHerbFormula(realTimeDataVo); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); if (dryEquipment!=null&&dryEquipment.getType()!=null) { one.setEqpType(dryEquipment.getType()); } dryHerbFormulaService.save(one); } return one; } /** * 保存含水率变化记录 * * @param trendVo * @param orderVo */ private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) { //判断 实时含水率 或 实时重量有没有变化,有变化则更新 if (orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 || orderVo.getTrendVo() != null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() ) { DryOrder byId = dryOrderService.getById(orderVo.getId()); // 将最新结果更新到工单 if (byId != null) { BeanUtil.copyProperties(orderVo, byId); byId.setTemps(JSONObject.toJSONString(orderVo.getBellowsTemp())); dryOrderService.updateById(byId); } // 保存含水率变化 orderVo.getDetailList().add(trendVo); DryOrderTrend dryOrderTrend = BeanUtil.toBean(trendVo, DryOrderTrend.class); dryOrderTrendService.save(dryOrderTrend); } } /** * 查询机台实时数据 * * @param realTimeDataVo * @return */ @Override public Result queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) { TenantContext.setTenant(realTimeDataVo.getTenantid() + ""); // 查询所有机台,查询语句组装 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getEnable, "Y"); queryWrapper.orderByAsc(DryEquipment::getCode); // 查询所有机台 List dryEquipments = equipmentService.list(queryWrapper); // 机台列表,用于效率对比 List list = new ArrayList<>(); // 效率列表,用于效率对比 List dList = new ArrayList<>(); // 查询当前机台工单 DryOrderVo orderVo = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + realTimeDataVo.getMachineid()); try { // 如果有可用机台 if (dryEquipments != null && dryEquipments.size() > 0) { // 查询其它机台工单 dryEquipments.stream().forEach(item -> { // 获取工单 DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); list.add(item.getName().substring(0, item.getName().indexOf('#') + 1)); if (order != null) { // 计算干燥效率,用于对比 DryOrderTrendVo dryOrderTrendVo = order.getDetailList().get(order.getDetailList().size() - 1); double v = order.getOriginWeight() - dryOrderTrendVo.getWeight(); if (v > 0 && dryOrderTrendVo.getTotalTime() > 0) { DecimalFormat df = new DecimalFormat("#.00"); dList.add(Double.valueOf(df.format(v / dryOrderTrendVo.getTotalTime() * 60))); } else { dList.add(50d); } } else { // 如果没有生产,效率置0 dList.add(0d); } }); } if (orderVo != null) { // 将效率对比数据放入当前机台实时数据中返回 orderVo.setCompEqpNum(list); orderVo.setCompEqpEffic(dList); // 查询近十次效率和能能耗平均 dryOrderService.queryRecentOrderAvg(orderVo); } } catch (Exception e) { e.printStackTrace(); } return Result.ok(orderVo); } @Override public Result sendSocketMsg(CommandMessageVo msgVo) { DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(msgVo.getTenantId() + "", msgVo.getMachineId()); log.info("获取设备:" + dryEquipment.toString()); // managedSessions.keySet().forEach(addr -> { // ObjectOutputStream oos = null; try { // Socket socket = SocketServerConfig.clientMap.get(addr); IoSession session = ServerHandler.clientSocket.get(dryEquipment.getIp()); if (session == null) { return Result.error("未获取到session,请检查客户端配置或设备ip配置是否正常"); } SocketMsgVo smv = new SocketMsgVo(msgVo); session.write(JSONObject.toJSONString(smv)); // oos = new ObjectOutputStream(socket.getOutputStream()); // String s = JSONObject.toJSONString(new SocketMsgVo(msgVo)); // oos.writeUTF(s); // oos.flush(); } catch (Exception e) { throw new RuntimeException(e); } finally { } // }); return Result.OK(); } @Override public Result queryWorkshopStatistics(RealTimeDataVo realTimeDataVo) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); queryWrapper.eq(DryEquipment::getEnable, "Y"); List dryEquipments = equipmentService.list(queryWrapper); DryOrderVo orderVo = new DryOrderVo(); if (dryEquipments != null && dryEquipments.size() > 0) { dryEquipments.stream().forEach(item -> { DryOrderVo order = (DryOrderVo) redisUtil.hget(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode(), realTimeDataVo.getTenantid() + "_" + item.getCode()); if (order != null) { orderVo.setEnvHum(order.getEnvHum()); orderVo.setEnvTemp(order.getEnvTemp()); double watt = order.getWatt() - order.getDetailList().get(0).getWatt(); orderVo.setWatt(orderVo.getWatt() == null ? watt : orderVo.getWatt() + watt); double steam = order.getSteam() - order.getDetailList().get(0).getSteam(); orderVo.setSteam(orderVo.getSteam() == null ? steam : orderVo.getSteam() + steam); orderVo.setOriginWeight(orderVo.getOriginWeight() == null ? order.getOriginWeight() : orderVo.getOriginWeight() + order.getOriginWeight()); double yield = order.getOriginWeight() * (1 - (order.getInitial() / 100)) / (1 - (order.getTarget() / 100)); orderVo.setYield(orderVo.getYield() == null ? yield : orderVo.getYield() + yield); double sub = order.getOriginWeight() - order.getYield(); orderVo.setReduce(orderVo.getReduce() == null ? sub : orderVo.getReduce() + sub); } }); } //redisUtil.get(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode() return Result.OK(orderVo); } @Override public Result statisticsDataHandle(StatisticsDataVo statsDataVo) { return null; } @Override public Result fitFaultRecord(RealTimeDataVo vo) { TenantContext.setTenant(vo.getTenantid() + ""); ThreadUtil.execute(() -> { try { //解析存储报警数据 List faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); //处理结束后,将redis中实时数据发送至云服务器 Map toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); if (mqttEnable && !toCloudFaultMap.isEmpty()) { MqMessage> message = new MqMessage<>(); message.setData(toCloudFaultMap); message.setTentId(vo.getTenantid() + ""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA, mqttMessage); } //要保存的历史故障 if (!faultRecords1.isEmpty()) { MqMessage> message = new MqMessage<>(); message.setData(faultRecords1); message.setTentId(vo.getTenantid() + ""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA, mqttMessage); } } catch (Exception e) { e.printStackTrace(); } }); return null; } @Override public void fitFaultRecord(RealTimeDataParentVo vo) { TenantContext.setTenant(vo.getTenantid() + ""); ThreadUtil.execute(() -> { try { //解析存储报警数据 List errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); if(!errorList.isEmpty()){ log.error("保存故障:{}", errorList.toString()); } if(!warnList.isEmpty()){ log.error("保存告警:{}", warnList.toString()); } //以下为云服务处理故障,厂内本地服务无需处理 if(!mqttEnable)return; //处理结束后,将redis中实时数据发送至云服务器 key = tenantId + machineId + eqpFault Map toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid())); Map dryFaultMap = toCloudFaultMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), entry -> (DryFaultRecordVo)entry.getValue() )); String tenantId = vo.getTenantid() +""; //广播发送给各租户下移动设备 if (dryFaultMap.isEmpty()) { return; } String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId); //数据转换 List faultList = new ArrayList((dryFaultMap.values())); MqMessage> mqMessage = new MqMessage<>(faultList, tenantId, recTopic); //发送广播 log.error("广播给:{}" , recTopic); mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1); } catch (Exception e) { e.printStackTrace(); } }); } /** * 解析存储故障数据 * TODO 保证原子性 * * @param fault 故障数据 * @param orderId 工单 * @param tenantId 租户 * @param machineId 设备 * @param faultType 故障类型 * @return 组装好故障数据 */ private List fitFault(String fault, String orderId, Integer tenantId, String machineId, Integer faultType) { List result = new ArrayList<>(); if (StringUtils.isEmpty(fault)) return result; //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警", System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime() + "--" + fault); //redis中的故障 Map rFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); Map redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // 键转换为字符串 entry -> entry.getValue() )); //没有生成工单的故障数据不存储 if (StringUtils.isEmpty(orderId)) { return result; } if (StringUtils.isEmpty(fault) && rFauMap.isEmpty()) { return result; } //1.解析数据 String[] eqpFaults = fault.split(","); Map addFauMap = new HashMap<>(); Map realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //避免空字符串 if (StringUtils.isEmpty(eqpFault.trim())) continue; //1.1检查mqtt中是否已存在这个故障 String redisKey = String.format("%s_%s_%s", tenantId, machineId, eqpFault).trim(); realFauMap.put(redisKey, new DryFaultRecord()); DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey); //1.2如果redis不存在则存入(存故障开始) if (rFault == null) { //组装缓存数据 // DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); // addFauMap.put(redisKey,faultRecord); Map equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName); addFauMap.put(redisKey, vo); } else { //TODO 特殊情况,如果redis的故障和新 //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) if (rFault.getECount() != null && rFault.getECount() > 1) { rFault.setECount(1); redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redisKey, rFault); System.err.println("报警次数重置 clear clear ,key-" + redisKey); } } } //1.3缓存至redis //合并数据 addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //没有新故障数据不用覆盖 if (!addFauMap.isEmpty()) { redisUtil.hmset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), redFauMap); } //2检测已结束的故障 //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库 Map curFauMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId)); curFauMap.keySet().stream() //特别注意,多个报警类型共用方法需要区分类型 .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo) curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { DryFaultRecordVo vo = (DryFaultRecordVo) redFauMap.get(key); vo.setECount(vo.getECount() + 1); if (redisUtil.hget(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString()) != null) { //更新次数 redisUtil.hset(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key.toString(), vo); System.err.println("报警次数更新,key-" + key); } if (vo.getECount() >= 3) { vo.setEndTime(new Date()); //TODO 结束超过某个时间区间判定为错误数据 faultRecordService.save(vo); redisUtil.hdel(String.format(MqttConstant.MQTT_REAL_FAULT, tenantId), key); result.add(vo); System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime() + "存入数据库"); } }); return result; } }