jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonCacheConstant.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,8 @@ package org.jeecg.common.constant; public interface CommonCacheConstant { //redisç¼åç§æ·æ°æ® String SYS_CACHE_TENANT = "sys:cache:tenant"; //redisç¼ååç§æ·ä¸è®¾å¤ä¿¡æ¯ String DRY_CACHE_TENANT_EQUS = "dry:cache:tenant::equs"; } jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -55,6 +55,7 @@ String TENANT_UP_PREFIX = "tenant/up"; String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data"; String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data"; String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment"; @@ -71,10 +72,19 @@ /**************************start*******************************/ /**************************end*******************************/ //redisç¼å //client String MQTT_REAL_FAULT = "mqtt:real:fault"; //service(cloud) //å¨çº¿å®¢æ·ç«¯ String MQTT_ONLINE_CLIENT = "mqtt:online:client::"; String MQTT_EQP_FAULT = "mqtt:eqp:fault"; //ææç§æ·ç宿¶æ¥è¦ï¼%sï¼ç§æ·idï¼ String MQTT_CLOUD_REAL_FAULT = "mqtt:real:fault:%s:"; } jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/entity/DryFaultRecord.java
@@ -40,6 +40,9 @@ @Excel(name = "å·¥åid", width = 15) @ApiModelProperty(value = "å·¥åid") private String orderId; @Excel(name = "ç§æ·id", width = 15) @ApiModelProperty(value = "ç§æ·id") private Integer tenantId; /**æ éåç§°*/ @Excel(name = "æ éåç§°", width = 15) @ApiModelProperty(value = "æ éåç§°") @@ -92,8 +95,9 @@ public DryFaultRecord() { } public DryFaultRecord(String orderId, String faultName,Integer faultType, Date startTime, Date endTime) { public DryFaultRecord(String orderId,Integer tenantId, String faultName,Integer faultType, Date startTime, Date endTime) { this.orderId = orderId; this.tenantId = tenantId; this.faultName = faultName; this.startTime = startTime; this.endTime = endTime; jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,32 @@ package org.jeecg.modules.dry.vo; import lombok.Data; import org.jeecg.modules.dry.entity.DryFaultRecord; import java.io.Serializable; import java.util.Date; /** */ @Data public class DryFaultRecordVo extends DryFaultRecord implements Serializable { private static final long serialVersionUID = 1L; //redisæ éç»æææ¯ private Integer eCount; //设å¤åç§° private String equName; //ç§æ·åç§° private String tenantName; public DryFaultRecordVo() { } public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) { super(orderId, tenantId, faultName, faultType, startTime, endTime); this.eCount = eCount; this.equName = equName; this.tenantName = tenantName; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqMessage.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,22 @@ package org.jeecg.modules.dry.mqtt; import lombok.Data; /** * Mqttæ¶æ¯è½½ä½ * @param <T> */ @Data public class MqMessage<T> { private T data; private String tentId; public MqMessage() { } public MqMessage(T data, String tentId) { this.data = data; this.tentId = tentId; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -216,11 +216,11 @@ case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { JSONObject jsonObject = JSON.parseObject(message); List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(faultRecords.toString()); faultRecordService.saveBatch(faultRecords); MqMessage<List<DryFaultRecord>> listMqMessage = JSON.parseObject(message, new TypeReference<MqMessage<List<DryFaultRecord>>>() { }); // List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(listMqMessage.toString()); faultRecordService.saveBatch(listMqMessage.getData()); } catch (Exception e) { e.printStackTrace(); jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/IDryEquipmentService.java
@@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; /** * @Description: å¹²ç¥æº @@ -16,4 +17,11 @@ DryEquipment selectByTenantIdEquipmentId(String tenantId, String equipmentId); /** * æ¥è¯¢ç§æ·ä¸ææè®¾å¤ * @param tenantId * @return */ Map<String,DryEquipment> queryEquByTenantId(Integer tenantId); } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryEquipmentServiceImpl.java
@@ -1,17 +1,23 @@ package org.jeecg.modules.dry.service.impl; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.mapper.DryEquipmentMapper; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; /** * @Description: å¹²ç¥æº @@ -24,6 +30,8 @@ @Autowired private RedisUtil redisUtil; @Override public DryEquipment selectByTenantIdEquipmentId(String tenantId, String equipmentId) { DryEquipment dryEquipment = (DryEquipment) redisUtil.hget(CacheConstants.RedisKeyEnum.EQP_MAP.getCode(), tenantId + equipmentId); @@ -39,4 +47,20 @@ } return dryEquipment; } @Override @Cacheable(cacheNames = CommonCacheConstant.DRY_CACHE_TENANT_EQUS, key = "#tenantId" , unless = "#result == null " ) public Map<String,DryEquipment> queryEquByTenantId(Integer tenantId) { TenantContext.setTenant(tenantId +""); QueryWrapper<DryEquipment> queryWrapper = new QueryWrapper<>(); queryWrapper.lambda().eq(DryEquipment::getTenantId,tenantId); List<DryEquipment> equipmentList = this.list(queryWrapper); Map<String, DryEquipment> userMap = equipmentList.stream() .collect(Collectors.toMap( DryEquipment::getCode, Function.identity(), (existingValue, newValue) -> existingValue // 妿é®å²çªï¼ä¿çæ§å¼ )); return userMap; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -3,48 +3,35 @@ import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IoSession; import org.apache.shiro.SecurityUtils; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.api.CommonAPI; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqMessage; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; import org.jeecg.modules.dry.socket.SocketServerConfig; import org.jeecg.modules.dry.util.DryUtil; import org.jeecg.modules.dry.vo.*; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.security.auth.login.LoginContext; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.Socket; import java.text.DecimalFormat; import java.util.*; import java.util.stream.Collectors; @@ -494,20 +481,41 @@ @Override public Result<?> fitFultRecord(RealTimeDataVo vo) { TenantContext.setTenant(vo.getTenantid()+""); ThreadUtil.execute(() -> { try { //è§£æå卿¥è¦æ°æ® List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); JSONObject json = new JSONObject(); json.put("data",JSON.toJSONString(faultRecords1)); if(faultRecords1.isEmpty()) return; MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(json).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); //å¤çç»æåï¼å°redisä¸å®æ¶æ°æ®åéè³äºæå¡å¨ Map<Object, Object> toCloudFaultMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); if(!toCloudFaultMap.isEmpty()){ MqMessage< Map<Object, Object>> message = new MqMessage<>(); message.setData(toCloudFaultMap); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload(JSON.toJSONString(message).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REAL_FAULT_DATA,mqttMessage); } //è¦ä¿åçå岿 é if(!faultRecords1.isEmpty()){ MqMessage<List<DryFaultRecord>> message = new MqMessage<>(); message.setData(faultRecords1); message.setTentId(vo.getTenantid()+""); MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(message).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); } } catch (Exception e) { e.printStackTrace(); @@ -533,7 +541,7 @@ //æ°æ®æ ·æ¬ï¼"eqp_fault": "æ»çéè¶ æ¶-æ¥è¦,飿ºè¿æµæ¥è¦,æ»çåè¶ æ¶-æ¥è¦,é£ç®±åæ¥è¦", System.err.println((faultType == 1 ? "ç±»åï¼æ é" : "ç±»åï¼æ¥è¦") + DateUtils.formatDateTime()+"--"+fault); //redisä¸çæ é Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); Map<String, Object> redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // é®è½¬æ¢ä¸ºå符串 @@ -550,50 +558,74 @@ } //1.è§£ææ°æ® String[] eqpFaults = fault.split(","); Map<String,DryFaultRecord> addFauMap = new HashMap<>(); Map<String,DryFaultRecord> realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //é¿å 空å符串 if(StringUtils.isEmpty(eqpFault)) continue; if(StringUtils.isEmpty(eqpFault.trim())) continue; //1.1æ£æ¥mqtt䏿¯å¦å·²åå¨è¿ä¸ªæ é String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault); String rFault = (String) redisUtil.get(redisKey); String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault).trim(); realFauMap.put(redisKey, new DryFaultRecord()); DryFaultRecordVo rFault = (DryFaultRecordVo) redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,redisKey); //1.2妿redisä¸åå¨ååå ¥ï¼åæ éå¼å§ï¼ if(rFault ==null){ //ç»è£ ç¼åæ°æ® DryFaultRecord faultRecord = new DryFaultRecord(orderId,eqpFault,faultType,new Date(),null); realFauMap.put(redisKey,faultRecord); // DryFaultRecord faultRecord = new DryFaultRecord(orderId,tenantId,eqpFault,faultType,new Date(),null); // addFauMap.put(redisKey,faultRecord); Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId); String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + ""); DryFaultRecordVo vo = new DryFaultRecordVo(orderId,tenantId,eqpFault,faultType,new Date(),null,1,equipmentMap.get(machineId).getName(),tenantName); addFauMap.put(redisKey,vo); }else { //å¦ææ°æ®å·²åå¨ï¼ä¸è®¡æ°å¤§äº1å°±é置计æ°ï¼è®¡æ°3次åå¤å®æ éç»æï¼3次ä¹å鿰䏿¥æ éè¯´ææ éè¿å¨æç» éè¦éæ°è®¡æ°ï¼ if(rFault.getECount()!=null && rFault.getECount() > 1){ rFault.setECount(1); redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,redisKey,rFault); System.err.println("æ¥è¦æ¬¡æ°éç½® clear clear ï¼key-"+redisKey); } } } //1.3ç¼åè³redis //åå¹¶æ°æ® realFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); addFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //æ²¡ææ°æ éæ°æ®ä¸ç¨è¦ç if(!realFauMap.isEmpty()){ redisUtil.hmset(MqttConstant.MQTT_EQP_FAULT,redFauMap); if(!addFauMap.isEmpty()){ redisUtil.hmset(MqttConstant.MQTT_REAL_FAULT,redFauMap); } //2æ£æµå·²ç»æçæ é //2.1妿宿¶æ°æ®ä¸åå¨redisåå¨å代表æ éç»æï¼åå ¥æ°æ®åº Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_REAL_FAULT); curFauMap.keySet().stream() //ç¹å«æ³¨æï¼å¤ä¸ªæ¥è¦ç±»åå ±ç¨æ¹æ³éè¦åºåç±»å .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecord)curFauMap.get(key)).getFaultType() == faultType) .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecordVo)curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { System.err.println((faultType == 1 ? "ç±»åï¼æ é" : "ç±»åï¼æ¥è¦") + DateUtils.formatDateTime()+"åå ¥æ°æ®åº"); DryFaultRecord record = (DryFaultRecord)redFauMap.get(key); record.setEndTime(new Date()); faultRecordService.save(record); redisUtil.hdel(MqttConstant.MQTT_EQP_FAULT,key); DryFaultRecordVo vo = (DryFaultRecordVo)redFauMap.get(key); vo.setECount(vo.getECount()+1); if(redisUtil.hget(MqttConstant.MQTT_REAL_FAULT,key.toString())!=null){ //æ´æ°æ¬¡æ° redisUtil.hset(MqttConstant.MQTT_REAL_FAULT,key.toString(),vo); System.err.println("æ¥è¦æ¬¡æ°æ´æ°ï¼key-"+key); } result.add(record); if(vo.getECount()>=3){ vo.setEndTime(new Date()); //TODO ç»æè¶ è¿æä¸ªæ¶é´åºé´å¤å®ä¸ºéè¯¯æ°æ® faultRecordService.save(vo); redisUtil.hdel(MqttConstant.MQTT_REAL_FAULT,key); result.add(vo); System.err.println((faultType == 1 ? "ç±»åï¼æ é" : "ç±»åï¼æ¥è¦") + DateUtils.formatDateTime()+"åå ¥æ°æ®åº"); } }); return result; } jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/config/init/RedisInitListener.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,32 @@ package org.jeecg.config.init; import org.jeecg.common.constant.CommonCacheConstant; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.system.entity.SysTenant; import org.jeecg.modules.system.service.ISysTenantService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Component public class RedisInitListener implements ApplicationListener<ApplicationStartedEvent> { @Autowired private RedisUtil redisUtil; @Autowired private ISysTenantService sysTenantService; @Override public void onApplicationEvent(ApplicationStartedEvent event) { //æ¥è¯¢ææç§æ·ä¿¡æ¯å¹¶ç¼åè³redis List<SysTenant> tenantList = sysTenantService.list(); //list转map Map<String, Object> tenantMap = tenantList.stream() .collect(Collectors.toMap(t -> String.valueOf(t.getId()), t -> (Object) t.getName(), (existingValue, newValue) -> existingValue)); redisUtil.hmset(CommonCacheConstant.SYS_CACHE_TENANT,tenantMap); } }