From 426f50baf57ae5901a6b5a26d1567e3b270d2499 Mon Sep 17 00:00:00 2001 From: zhuguifei <312353457@qq.com> Date: 星期三, 20 十一月 2024 10:46:38 +0800 Subject: [PATCH] 完善租户故障解析接口,完成实时故障和历史故障上报云服务器 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 116 +++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 74 insertions(+), 42 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java index 8910d51..8def570 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java @@ -3,48 +3,35 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.apache.mina.core.service.IoAcceptor; -import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IoSession; -import org.apache.shiro.SecurityUtils; -import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.jeecg.common.api.CommonAPI; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; -import org.jeecg.common.config.mqtoken.UserTokenContext; +import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; -import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; +import org.jeecg.modules.dry.mqtt.MqMessage; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; -import org.jeecg.modules.dry.socket.SocketServerConfig; -import org.jeecg.modules.dry.util.DryUtil; import org.jeecg.modules.dry.vo.*; -import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import javax.security.auth.login.LoginContext; -import java.io.IOException; -import java.io.ObjectOutputStream; -import java.net.Socket; import java.text.DecimalFormat; import java.util.*; import java.util.stream.Collectors; @@ -494,20 +481,41 @@ @Override public Result<?> fitFultRecord(RealTimeDataVo vo) { + TenantContext.setTenant(vo.getTenantid()+""); ThreadUtil.execute(() -> { try { //瑙f瀽瀛樺偍鎶ヨ鏁版嵁 List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); - JSONObject json = new JSONObject(); - json.put("data",JSON.toJSONString(faultRecords1)); - if(faultRecords1.isEmpty()) return; - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setQos(0); - mqttMessage.setPayload((JSON.toJSONString(json).getBytes())); - mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); + + //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 + Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); + if(!toCloudFaultMap.isEmpty()){ + MqMessage< Map<Object, Object>> message = new MqMessage<>(); + message.setData(toCloudFaultMap); + message.setTentId(vo.getTenantid()+""); + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(0); + mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); + } + + + //瑕佷繚瀛樼殑鍘嗗彶鏁呴殰 + if(!faultRecords1.isEmpty()){ + MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); + message.setData(faultRecords1); + message.setTentId(vo.getTenantid()+""); + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setQos(0); + mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); + mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); + } + + + } catch (Exception e) { e.printStackTrace(); @@ -533,7 +541,7 @@ //鏁版嵁鏍锋湰锛�"eqp_fault": "婊氱瓛闄嶈秴鏃�-鎶ヨ,椋庢満杩囨祦鎶ヨ,婊氱瓛鍗囪秴鏃�-鎶ヨ,椋庣鍗囨姤璀�", System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime()+"--"+fault); //redis涓殑鏁呴殰 - Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); + Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map<String, Object> redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // 閿浆鎹负瀛楃涓� @@ -550,50 +558,74 @@ } //1.瑙f瀽鏁版嵁 String[] eqpFaults = fault.split(","); + Map<String,DryFaultRecord> addFauMap = new HashMap<>(); Map<String,DryFaultRecord> realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //閬垮厤绌哄瓧绗︿覆 - if(StringUtils.isEmpty(eqpFault)) continue; + if(StringUtils.isEmpty(eqpFault.trim())) continue; //1.1妫�鏌qtt涓槸鍚﹀凡瀛樺湪杩欎釜鏁呴殰 - String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault); - String rFault = (String) redisUtil.get(redisKey); + String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim(); + + + realFauMap.put(redisKey, new DryFaultRecord()); + DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); //1.2濡傛灉redis涓嶅瓨鍦ㄥ垯瀛樺叆锛堝瓨鏁呴殰寮�濮嬶級 if(rFault ==null){ //缁勮缂撳瓨鏁版嵁 - DryFaultRecord faultRecord = new DryFaultRecord(orderId,eqpFault,faultType,new Date(),null); - realFauMap.put(redisKey,faultRecord); +// DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); +// addFauMap.put(redisKey,faultRecord); + Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); + String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); + DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); + addFauMap.put(redisKey,vo); + }else { + //濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級 + if(rFault.getECount()!=null && rFault.getECount() > 1){ + rFault.setECount(1); + redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); + System.err.println("鎶ヨ娆℃暟閲嶇疆 clear clear 锛宬ey-"+redisKey); + } + } } //1.3缂撳瓨鑷硆edis //鍚堝苟鏁版嵁 - realFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); + addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //娌℃湁鏂版晠闅滄暟鎹笉鐢ㄨ鐩� - if(!realFauMap.isEmpty()){ - redisUtil.hmset(MqttConstant.MQTT_EQP_FAULT,redFauMap); + if(!addFauMap.isEmpty()){ + redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); } - - //2妫�娴嬪凡缁撴潫鐨勬晠闅� //2.1濡傛灉瀹炴椂鏁版嵁涓嶅瓨鍦╮edis瀛樺湪鍒欎唬琛ㄦ晠闅滅粨鏉燂紝瀛樺叆鏁版嵁搴� - Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); + Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); curFauMap.keySet().stream() //鐗瑰埆娉ㄦ剰锛屽涓姤璀︾被鍨嬪叡鐢ㄦ柟娉曢渶瑕佸尯鍒嗙被鍨� - .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecord)curFauMap.get(key)).getFaultType() == faultType) + .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { - System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime()+"瀛樺叆鏁版嵁搴�"); - DryFaultRecord record = (DryFaultRecord)redFauMap.get(key); - record.setEndTime(new Date()); - faultRecordService.save(record); - redisUtil.hdel(MqttConstant.MQTT_EQP_FAULT,key); + DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); + vo.setECount(vo.getECount()+1); + if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ + //鏇存柊娆℃暟 + redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); + System.err.println("鎶ヨ娆℃暟鏇存柊锛宬ey-"+key); + } - result.add(record); - + if(vo.getECount()>=3){ + vo.setEndTime(new Date()); + //TODO 缁撴潫瓒呰繃鏌愪釜鏃堕棿鍖洪棿鍒ゅ畾涓洪敊璇暟鎹� + faultRecordService.save(vo); + redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); + result.add(vo); + System.err.println((faultType == 1 ? "绫诲瀷锛氭晠闅�" : "绫诲瀷锛氭姤璀�") + DateUtils.formatDateTime()+"瀛樺叆鏁版嵁搴�"); + } }); + + return result; } -- Gitblit v1.9.3