| | |
| | | package org.jeecg.modules.dry.service.impl; |
| | | |
| | | import cn.hutool.core.bean.BeanUtil; |
| | | import cn.hutool.core.thread.ThreadUtil; |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.alibaba.fastjson.TypeReference; |
| | | import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.mina.core.service.IoAcceptor; |
| | | import org.apache.mina.core.service.IoHandler; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.apache.mina.core.session.IoSession; |
| | | import org.jeecg.common.api.CommonAPI; |
| | | import org.eclipse.paho.client.mqttv3.MqttException; |
| | | import org.eclipse.paho.client.mqttv3.MqttMessage; |
| | | import org.jeecg.common.api.vo.Result; |
| | | import org.jeecg.common.config.TenantContext; |
| | | import org.jeecg.common.config.mqtoken.UserTokenContext; |
| | | import org.jeecg.common.constant.CommonCacheConstant; |
| | | import org.jeecg.common.constant.CommonConstant; |
| | | import org.jeecg.common.constant.MqttConstant; |
| | | import org.jeecg.common.system.util.JwtUtil; |
| | | import org.jeecg.common.util.DateUtils; |
| | | import org.jeecg.common.util.RedisUtil; |
| | | import org.jeecg.common.util.SpringContextUtils; |
| | | import org.jeecg.modules.dry.common.CacheConstants; |
| | | import org.jeecg.modules.dry.entity.*; |
| | | import org.jeecg.modules.dry.mqtt.MqMessage; |
| | | import org.jeecg.modules.dry.mqtt.MqttUtil; |
| | | import org.jeecg.modules.dry.service.*; |
| | | import org.jeecg.modules.dry.socket.ServerHandler; |
| | | import org.jeecg.modules.dry.socket.SocketServerConfig; |
| | | import org.jeecg.modules.dry.util.DryUtil; |
| | | import org.jeecg.modules.dry.vo.*; |
| | | import org.jetbrains.annotations.NotNull; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.ObjectOutputStream; |
| | | import java.net.Socket; |
| | | import java.text.DecimalFormat; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Slf4j |
| | | @Service |
| | |
| | | private IDryHerbService herbService; |
| | | |
| | | @Autowired |
| | | private IDryHerbFormulaService dryHerbFormulaService; |
| | | |
| | | @Autowired |
| | | private IDryEquipmentService equipmentService; |
| | | |
| | | @Autowired |
| | |
| | | @Autowired |
| | | private IDryProdRecordService prodRecordService; |
| | | |
| | | @Autowired |
| | | private IDryFaultRecordService faultRecordService; |
| | | |
| | | |
| | | private String token; |
| | | |
| | | @Value(value = "${jeecg.mqtt.role}") |
| | | private String role; |
| | | |
| | | @Autowired |
| | | private MqttUtil mqttUtil; |
| | | |
| | | public String getTemporaryToken() { |
| | | if (token == null) { |
| | |
| | | // 根据租户id和工单号查询数据库是否有记录,有则返回,没有则新增一条再返回 |
| | | orderVo = getOrSaveDryOrderVoDB(realTimeDataVo); |
| | | } |
| | | if (orderVo == null) { |
| | | log.error("工单不存在,工单号:"+realTimeDataVo.getWorkorder()+",设备:" + realTimeDataVo.getMachineid() +",药材:" + realTimeDataVo.getName()); |
| | | return Result.error("工单不存在"); |
| | | } |
| | | |
| | | // 2 更新工单实时数据 |
| | | // 2.1 将工单中的数据替换为最新数据 |
| | |
| | | orderVo.setState_valve(realTimeDataVo.getState_valve()); |
| | | orderVo.setOrderStatus(realTimeDataVo.getWorkorder_status()); |
| | | orderVo.setEqp_status(realTimeDataVo.getEqp_status()); |
| | | orderVo.setEqp_state(realTimeDataVo.getEqp_state()); |
| | | // orderVo.setEqp_state(realTimeDataVo.getEqp_state()); |
| | | orderVo.setWarning(realTimeDataVo.getEqp_warning()); |
| | | orderVo.setFault(realTimeDataVo.getEqp_fault()); |
| | | orderVo.setLevel(realTimeDataVo.getLevel()); |
| | |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | | @NotNull |
| | | private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { |
| | | DryOrderVo orderVo; |
| | | // 查询药材 |
| | | DryHerb herb = queryHerbByIndexTenant(realTimeDataVo); |
| | | |
| | | // 查询设备 |
| | | DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); |
| | | if (equ == null) { |
| | | log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); |
| | | return null; |
| | | } |
| | | // 查询药材 |
| | | DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); |
| | | |
| | | if (herbFormula == null) { |
| | | log.error("未找到药材:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); |
| | | return null; |
| | | } |
| | | // 创建新工单 |
| | | orderVo = new DryOrderVo(realTimeDataVo); |
| | | orderVo.setHerbId(herb.getId()); |
| | | |
| | | orderVo.setHerbId(herbFormula.getId()); |
| | | orderVo.setEquId(equ.getId()); |
| | | DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); |
| | | boolean save = dryOrderService.save(dryOrder); |
| | |
| | | queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); |
| | | DryEquipment one = equipmentService.getOne(queryWrapper); |
| | | if (one == null) { |
| | | one = new DryEquipment(realTimeDataVo); |
| | | equipmentService.save(one); |
| | | log.error(role+"保存实时数据,未找到设备:"+realTimeDataVo.getMachineid()); |
| | | // one = new DryEquipment(realTimeDataVo); |
| | | // equipmentService.save(one); |
| | | if (MqttConstant.ROLE_ADMIN.equals(role)) { |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(0); |
| | | JSONObject object = new JSONObject(); |
| | | object.put("code", realTimeDataVo.getMachineid()); |
| | | object.put("tenantId", realTimeDataVo.getTenantid()); |
| | | mqttMessage.setPayload(object.toJSONString().getBytes()); |
| | | try { |
| | | mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX ,mqttMessage); |
| | | }catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | } |
| | | return null; |
| | | } |
| | | return one; |
| | | } |
| | |
| | | * @param realTimeDataVo |
| | | * @return |
| | | */ |
| | | private DryHerb queryHerbByIndexTenant(RealTimeDataVo realTimeDataVo) { |
| | | LambdaQueryWrapper<DryHerb> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(DryHerb::getTenantId, realTimeDataVo.getTenantid()); |
| | | queryWrapper.eq(DryHerb::getCode, realTimeDataVo.getIndex()); |
| | | DryHerb one = herbService.getOne(queryWrapper); |
| | | private DryHerbFormula queryHerbByIndexTenant(RealTimeDataVo realTimeDataVo) { |
| | | LambdaQueryWrapper<DryHerbFormula> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(DryHerbFormula::getTenantId, realTimeDataVo.getTenantid()); |
| | | queryWrapper.eq(DryHerbFormula::getName, realTimeDataVo.getName()); |
| | | DryHerbFormula one = dryHerbFormulaService.getOne(queryWrapper); |
| | | if (one == null) { |
| | | one = new DryHerb(realTimeDataVo); |
| | | herbService.save(one); |
| | | one = new DryHerbFormula(realTimeDataVo); |
| | | DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid()); |
| | | one.setEqpType(dryEquipment.getType()); |
| | | dryHerbFormulaService.save(one); |
| | | } |
| | | return one; |
| | | } |
| | |
| | | private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) { |
| | | //判断 实时含水率 或 实时重量有没有变化,有变化则更新 |
| | | if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 |
| | | || trendVo.getWeight() < orderVo.getTrendVo().getWeight() |
| | | || orderVo.getTrendVo()!=null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() |
| | | ) { |
| | | DryOrder byId = dryOrderService.getById(orderVo.getId()); |
| | | // 将最新结果更新到工单 |
| | |
| | | public Result<?> queryMachineRealTImeData(RealTimeDataVo realTimeDataVo) { |
| | | TenantContext.setTenant(realTimeDataVo.getTenantid()+""); |
| | | |
| | | // 查询所有机台查询组装 |
| | | // 查询所有机台,查询语句组装 |
| | | LambdaQueryWrapper<DryEquipment> queryWrapper = new LambdaQueryWrapper<>(); |
| | | queryWrapper.eq(DryEquipment::getTenantId, realTimeDataVo.getTenantid()); |
| | | queryWrapper.eq(DryEquipment::getEnable, "Y"); |
| | |
| | | dList.add(50d); |
| | | } |
| | | |
| | | } else { |
| | | // 如果没有生产,效率置0 |
| | | dList.add(0d); |
| | | } |
| | | // 如果没有生产,效率置0 |
| | | dList.add(0d); |
| | | |
| | | }); |
| | | } |
| | | |
| | |
| | | //redisUtil.get(CacheConstants.RedisKeyEnum.WORK_ORDER.getCode() |
| | | return Result.OK(orderVo); |
| | | } |
| | | |
| | | @Override |
| | | public Result<?> statisticsDataHandle(StatisticsDataVo statsDataVo) { |
| | | return null; |
| | | } |
| | | |
| | | @Override |
| | | public Result<?> fitFultRecord(RealTimeDataVo vo) { |
| | | TenantContext.setTenant(vo.getTenantid()+""); |
| | | ThreadUtil.execute(() -> { |
| | | try { |
| | | //解析存储报警数据 |
| | | List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); |
| | | List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); |
| | | faultRecords1.addAll(faultRecords2); |
| | | |
| | | |
| | | //处理结束后,将redis中实时数据发送至云服务器 |
| | | Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | if(!toCloudFaultMap.isEmpty()){ |
| | | MqMessage< Map<Object, Object>> message = new MqMessage<>(); |
| | | message.setData(toCloudFaultMap); |
| | | message.setTentId(vo.getTenantid()+""); |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(0); |
| | | mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); |
| | | } |
| | | |
| | | |
| | | //要保存的历史故障 |
| | | if(!faultRecords1.isEmpty()){ |
| | | MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); |
| | | message.setData(faultRecords1); |
| | | message.setTentId(vo.getTenantid()+""); |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(0); |
| | | mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); |
| | | mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | |
| | | return null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 解析存储故障数据 |
| | | * TODO 保证原子性 |
| | | * @param fault 故障数据 |
| | | * @param orderId 工单 |
| | | * @param tenantId 租户 |
| | | * @param machineId 设备 |
| | | * @param faultType 故障类型 |
| | | * @return 组装好故障数据 |
| | | */ |
| | | private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){ |
| | | List<DryFaultRecord> result = new ArrayList<>(); |
| | | //数据样本:"eqp_fault": "滚筒降超时-报警,风机过流报警,滚筒升超时-报警,风箱升报警", |
| | | System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"--"+fault); |
| | | //redis中的故障 |
| | | Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | Map<String, Object> redFauMap = rFauMap.entrySet().stream() |
| | | .collect(Collectors.toMap( |
| | | entry -> entry.getKey().toString(), // 键转换为字符串 |
| | | entry -> entry.getValue() |
| | | )); |
| | | |
| | | //没有生成工单的故障数据不存储 |
| | | if(StringUtils.isEmpty(orderId)){ |
| | | return result; |
| | | } |
| | | |
| | | if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){ |
| | | return result; |
| | | } |
| | | //1.解析数据 |
| | | String[] eqpFaults = fault.split(","); |
| | | Map<String,DryFaultRecord> addFauMap = new HashMap<>(); |
| | | Map<String,DryFaultRecord> realFauMap = new HashMap<>(); |
| | | for (int i = 0; i < eqpFaults.length; i++) { |
| | | String eqpFault = eqpFaults[i]; |
| | | //避免空字符串 |
| | | if(StringUtils.isEmpty(eqpFault.trim())) continue; |
| | | //1.1检查mqtt中是否已存在这个故障 |
| | | String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim(); |
| | | |
| | | |
| | | realFauMap.put(redisKey, new DryFaultRecord()); |
| | | DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); |
| | | //1.2如果redis不存在则存入(存故障开始) |
| | | if(rFault ==null){ |
| | | //组装缓存数据 |
| | | // DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); |
| | | // addFauMap.put(redisKey,faultRecord); |
| | | Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); |
| | | String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); |
| | | DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); |
| | | addFauMap.put(redisKey,vo); |
| | | }else { |
| | | //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数) |
| | | if(rFault.getECount()!=null && rFault.getECount() > 1){ |
| | | rFault.setECount(1); |
| | | redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); |
| | | System.err.println("报警次数重置 clear clear ,key-"+redisKey); |
| | | } |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | //1.3缓存至redis |
| | | //合并数据 |
| | | addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); |
| | | //没有新故障数据不用覆盖 |
| | | if(!addFauMap.isEmpty()){ |
| | | redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); |
| | | } |
| | | |
| | | //2检测已结束的故障 |
| | | //2.1如果实时数据不存在redis存在则代表故障结束,存入数据库 |
| | | Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); |
| | | curFauMap.keySet().stream() |
| | | //特别注意,多个报警类型共用方法需要区分类型 |
| | | .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) |
| | | .forEach(key -> { |
| | | DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); |
| | | vo.setECount(vo.getECount()+1); |
| | | if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ |
| | | //更新次数 |
| | | redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); |
| | | System.err.println("报警次数更新,key-"+key); |
| | | } |
| | | |
| | | if(vo.getECount()>=3){ |
| | | vo.setEndTime(new Date()); |
| | | //TODO 结束超过某个时间区间判定为错误数据 |
| | | faultRecordService.save(vo); |
| | | redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); |
| | | result.add(vo); |
| | | System.err.println((faultType == 1 ? "类型:故障" : "类型:报警") + DateUtils.formatDateTime()+"存入数据库"); |
| | | } |
| | | }); |
| | | |
| | | |
| | | |
| | | return result; |
| | | } |
| | | |
| | | |
| | | |
| | | } |