干燥机配套车间生产管理系统/云平台服务端
zhuguifei
5 天以前 4a60ced80b215fcb2e2d4664b20cd744313ccc10
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
@@ -79,6 +81,13 @@
    @Value(value = "${jeecg.mqtt.enable}")
    private boolean mqttEnable;
    private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
    private ReentrantLock getLock(String tenantId, String type) {
        String lockKey = tenantId + ":" + type;
        return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
    }
    public String getTemporaryToken() {
        if (token == null) {
@@ -226,7 +235,16 @@
        }
        if (realTimeDataParentVo.getFault() != null) {
            fitFaultRecord(realTimeDataParentVo);
            ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
            faultLock.lock();
            try {
                fitFaultRecord(realTimeDataParentVo);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                faultLock.unlock();
            }
        }
        return Result.ok();
    }
@@ -291,7 +309,7 @@
     * @return
     */
    private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        DryOrderVo orderVo;
        LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +347,7 @@
     * @return
     */
    private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
        TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
        TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
        DryOrderVo orderVo;
        // 查询设备
@@ -357,16 +375,26 @@
                    new LambdaQueryWrapper<DryEqpType>()
                            .eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
            );
            if(eqpType == null){
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid() );
            if (eqpType == null) {
                log.error("未查询到租户设备类型:{}", realTimeDataVo.getTenantid());
                return null;
            }
            Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
            if (!equipmentService.save(addEqu)) {
                log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                return null;
            // 设备新增
            ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
            equipmentLock.lock();
            try {
                if (!equipmentService.save(addEqu)) {
                    log.error("新增设备失败:数据库保存异常!equipment={}", addEqu);
                    return null;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                equipmentLock.unlock();
            }
            equ = addEqu;
@@ -380,14 +408,26 @@
            log.error("未找到药材:" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",机台:" + realTimeDataVo.getMachineid());
            return null;
        }
        // 创建新工单
        orderVo = new DryOrderVo(realTimeDataVo);
        orderVo.setHerbId(herbFormula.getId());
        orderVo.setEquId(equ.getId());
        DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
        boolean save = dryOrderService.save(dryOrder);
        return orderVo;
        // 工单新增
        ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
        orderLock.lock();
        try {
            // 创建新工单
            orderVo = new DryOrderVo(realTimeDataVo);
            orderVo.setHerbId(herbFormula.getId());
            orderVo.setEquId(equ.getId());
            DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
            boolean save = dryOrderService.save(dryOrder);
            return orderVo;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            orderLock.unlock();
        }
        return null;
    }
@@ -414,9 +454,9 @@
                object.put("tenantId", realTimeDataVo.getTenantid());
                mqttMessage.setPayload(object.toJSONString().getBytes());
                try {
                   if(mqttEnable){
                       mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
                   }
                    if (mqttEnable) {
                        mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
                    }
                } catch (MqttException e) {
                    e.printStackTrace();
                }
@@ -441,7 +481,7 @@
        if (one == null) {
            one = new DryHerbFormula(realTimeDataVo);
            DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
            if (dryEquipment!=null&&dryEquipment.getType()!=null) {
            if (dryEquipment != null && dryEquipment.getType() != null) {
                one.setEqpType(dryEquipment.getType());
            }
@@ -657,51 +697,44 @@
    @Override
    public void fitFaultRecord(RealTimeDataParentVo vo) {
        TenantContext.setTenant(vo.getTenantid() + "");
        ThreadUtil.execute(() -> {
            try {
                //解析存储报警数据
                List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
                List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
                if(!errorList.isEmpty()){
                   log.error("保存故障:{}", errorList.toString());
                }
                if(!warnList.isEmpty()){
                    log.error("保存告警:{}", warnList.toString());
                }
        //解析存储报警数据
        List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
        List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
        if (!errorList.isEmpty()) {
            log.error("保存故障:{}", errorList.toString());
        }
        if (!warnList.isEmpty()) {
            log.error("保存告警:{}", warnList.toString());
        }
                //以下为云服务处理故障,厂内本地服务无需处理
                if(!mqttEnable)return;
        //以下为云服务处理故障,厂内本地服务无需处理
        if (!mqttEnable) return;
                //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
                Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
        //处理结束后,将redis中实时数据发送至云服务器  key = tenantId + machineId + eqpFault
        Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
                Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                        .collect(Collectors.toMap(
                                entry -> entry.getKey().toString(),
                                entry -> (DryFaultRecordVo)entry.getValue()
                        ));
        Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
                .collect(Collectors.toMap(
                        entry -> entry.getKey().toString(),
                        entry -> (DryFaultRecordVo) entry.getValue()
                ));
                String tenantId = vo.getTenantid() +"";
        String tenantId = vo.getTenantid() + "";
                //广播发送给各租户下移动设备
                if (dryFaultMap.isEmpty()) {
                    return;
                }
                String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
                //数据转换
                List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
                MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
                //发送广播
                log.error("广播给:{}" , recTopic);
                mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
        //广播发送给各租户下移动设备
        if (dryFaultMap.isEmpty()) {
            return;
        }
        String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
        //数据转换
        List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
        MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
        //发送广播
        log.error("广播给:{}", recTopic);
        mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
@@ -759,12 +792,11 @@
//                addFauMap.put(redisKey,faultRecord);
                Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
                String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
                DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
                addFauMap.put(redisKey, vo);
            } else {
                //TODO 特殊情况,如果redis的故障和新
                //如果数据已存在,且计数大于1就重置计数(计数3次后判定故障结束,3次之前重新上报故障说明故障还在持续 需要重新计数)
                if (rFault.getECount() != null && rFault.getECount() > 1) {
                    rFault.setECount(1);