jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -54,6 +54,7 @@ String TENANT_UP_PREFIX = "tenant/up"; String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data"; String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment"; @@ -67,8 +68,13 @@ /**************************æå¡ç«¯åç§æ·ç«¯è¯·æ±æ°æ®end*******************************/ /**************************start*******************************/ /**************************end*******************************/ //redisç¼å String MQTT_ONLINE_CLIENT = "mqtt:online:client::"; String MQTT_EQP_FAULT = "mqtt:eqp:fault"; } jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/entity/DryFaultRecord.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,102 @@ package org.jeecg.modules.dry.entity; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.Date; import java.math.BigDecimal; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableLogic; import lombok.Data; import com.fasterxml.jackson.annotation.JsonFormat; import org.springframework.format.annotation.DateTimeFormat; import org.jeecgframework.poi.excel.annotation.Excel; import org.jeecg.common.aspect.annotation.Dict; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.EqualsAndHashCode; import lombok.experimental.Accessors; /** * @Description: dry_eqp_fault * @Author: jeecg-boot * @Date: 2024-11-13 * @Version: V1.0 */ @Data @TableName("dry_fault_record") @Accessors(chain = true) @EqualsAndHashCode(callSuper = false) @ApiModel(value="dry_eqp_fault对象", description="dry_eqp_fault") public class DryFaultRecord implements Serializable { private static final long serialVersionUID = 1L; /**id*/ @TableId(type = IdType.ASSIGN_ID) @ApiModelProperty(value = "id") private String id; /**å·¥åid*/ @Excel(name = "å·¥åid", width = 15) @ApiModelProperty(value = "å·¥åid") private String orderId; /**æ éåç§°*/ @Excel(name = "æ éåç§°", width = 15) @ApiModelProperty(value = "æ éåç§°") private String faultName; /**æ écode*/ @Excel(name = "æ écode", width = 15) @ApiModelProperty(value = "å·¥åid") private String faultCode; /**æ éç±»å*/ @Excel(name = "æ éç±»å", width = 15) @ApiModelProperty(value = "æ éç±»å") private Integer faultType; /**æ éæè¿°*/ @Excel(name = "æ éæè¿°", width = 15) @ApiModelProperty(value = "æ éæè¿°") private String faultDesc; /**æ éç¶æ*/ @Excel(name = "æ éç¶æ", width = 15) @ApiModelProperty(value = "æ éç¶æ") private Integer faultStatu; /**å¼å§æ¶é´*/ @Excel(name = "å¼å§æ¶é´", width = 15, format = "yyyy-MM-dd HH:mm:ss") @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") @ApiModelProperty(value = "å¼å§æ¶é´") private Date startTime; /**ç»ææ¶é´*/ @Excel(name = "ç»ææ¶é´", width = 15, format = "yyyy-MM-dd HH:mm:ss") @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") @ApiModelProperty(value = "ç»ææ¶é´") private Date endTime; /**å建人*/ @ApiModelProperty(value = "å建人") private String createBy; /**åå»ºæ¥æ*/ @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd") @DateTimeFormat(pattern="yyyy-MM-dd") @ApiModelProperty(value = "åå»ºæ¥æ") private Date createTime; /**æ´æ°äºº*/ @ApiModelProperty(value = "æ´æ°äºº") private String updateBy; /**æ´æ°æ¥æ*/ @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd") @DateTimeFormat(pattern="yyyy-MM-dd") @ApiModelProperty(value = "æ´æ°æ¥æ") private Date updateTime; public DryFaultRecord() { } public DryFaultRecord(String orderId, String faultName,Integer faultType, Date startTime, Date endTime) { this.orderId = orderId; this.faultName = faultName; this.startTime = startTime; this.endTime = endTime; this.faultType = faultType; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryFaultRecordController.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,167 @@ package org.jeecg.modules.dry.controller; import java.util.Arrays; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.jeecg.common.api.vo.Result; import org.jeecg.common.system.query.QueryGenerator; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.service.IDryFaultRecordService; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.system.base.controller.JeecgController; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.ModelAndView; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.jeecg.common.aspect.annotation.AutoLog; import org.apache.shiro.authz.annotation.RequiresPermissions; /** * @Description: dry_eqp_fault * @Author: jeecg-boot * @Date: 2024-11-13 * @Version: V1.0 */ @Api(tags="dry_eqp_fault") @RestController @RequestMapping("/dry/dryEqpFault") @Slf4j public class DryFaultRecordController extends JeecgController<DryFaultRecord, IDryFaultRecordService> { @Autowired private IDryFaultRecordService dryEqpFaultService; /** * å页å表æ¥è¯¢ * * @param dryEqpFault * @param pageNo * @param pageSize * @param req * @return */ //@AutoLog(value = "dry_eqp_fault-å页å表æ¥è¯¢") @ApiOperation(value="dry_eqp_fault-å页å表æ¥è¯¢", notes="dry_eqp_fault-å页å表æ¥è¯¢") @GetMapping(value = "/list") public Result<IPage<DryFaultRecord>> queryPageList(DryFaultRecord dryEqpFault, @RequestParam(name="pageNo", defaultValue="1") Integer pageNo, @RequestParam(name="pageSize", defaultValue="10") Integer pageSize, HttpServletRequest req) { QueryWrapper<DryFaultRecord> queryWrapper = QueryGenerator.initQueryWrapper(dryEqpFault, req.getParameterMap()); Page<DryFaultRecord> page = new Page<DryFaultRecord>(pageNo, pageSize); IPage<DryFaultRecord> pageList = dryEqpFaultService.page(page, queryWrapper); return Result.OK(pageList); } /** * æ·»å * * @param dryEqpFault * @return */ @AutoLog(value = "dry_eqp_fault-æ·»å ") @ApiOperation(value="dry_eqp_fault-æ·»å ", notes="dry_eqp_fault-æ·»å ") @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:add") @PostMapping(value = "/add") public Result<String> add(@RequestBody DryFaultRecord dryEqpFault) { dryEqpFaultService.save(dryEqpFault); return Result.OK("æ·»å æåï¼"); } /** * ç¼è¾ * * @param dryEqpFault * @return */ @AutoLog(value = "dry_eqp_fault-ç¼è¾") @ApiOperation(value="dry_eqp_fault-ç¼è¾", notes="dry_eqp_fault-ç¼è¾") @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:edit") @RequestMapping(value = "/edit", method = {RequestMethod.PUT,RequestMethod.POST}) public Result<String> edit(@RequestBody DryFaultRecord dryEqpFault) { dryEqpFaultService.updateById(dryEqpFault); return Result.OK("ç¼è¾æå!"); } /** * éè¿idå é¤ * * @param id * @return */ @AutoLog(value = "dry_eqp_fault-éè¿idå é¤") @ApiOperation(value="dry_eqp_fault-éè¿idå é¤", notes="dry_eqp_fault-éè¿idå é¤") @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:delete") @DeleteMapping(value = "/delete") public Result<String> delete(@RequestParam(name="id",required=true) String id) { dryEqpFaultService.removeById(id); return Result.OK("å 餿å!"); } /** * æ¹éå é¤ * * @param ids * @return */ @AutoLog(value = "dry_eqp_fault-æ¹éå é¤") @ApiOperation(value="dry_eqp_fault-æ¹éå é¤", notes="dry_eqp_fault-æ¹éå é¤") @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:deleteBatch") @DeleteMapping(value = "/deleteBatch") public Result<String> deleteBatch(@RequestParam(name="ids",required=true) String ids) { this.dryEqpFaultService.removeByIds(Arrays.asList(ids.split(","))); return Result.OK("æ¹éå 餿å!"); } /** * éè¿idæ¥è¯¢ * * @param id * @return */ //@AutoLog(value = "dry_eqp_fault-éè¿idæ¥è¯¢") @ApiOperation(value="dry_eqp_fault-éè¿idæ¥è¯¢", notes="dry_eqp_fault-éè¿idæ¥è¯¢") @GetMapping(value = "/queryById") public Result<DryFaultRecord> queryById(@RequestParam(name="id",required=true) String id) { DryFaultRecord dryEqpFault = dryEqpFaultService.getById(id); if(dryEqpFault==null) { return Result.error("æªæ¾å°å¯¹åºæ°æ®"); } return Result.OK(dryEqpFault); } /** * 导åºexcel * * @param request * @param dryEqpFault */ @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:exportXls") @RequestMapping(value = "/exportXls") public ModelAndView exportXls(HttpServletRequest request, DryFaultRecord dryEqpFault) { return super.exportXls(request, dryEqpFault, DryFaultRecord.class, "dry_eqp_fault"); } /** * éè¿excelå¯¼å ¥æ°æ® * * @param request * @param response * @return */ @RequiresPermissions("org.jeecg.modules.dry.mqtt:dry_eqp_fault:importExcel") @RequestMapping(value = "/importExcel", method = RequestMethod.POST) public Result<?> importExcel(HttpServletRequest request, HttpServletResponse response) { return super.importExcel(request, response, DryFaultRecord.class); } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/DryRealTimeDataController.java
@@ -3,7 +3,9 @@ import ai.djl.modality.Classifications; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.druid.support.json.JSONUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; @@ -97,6 +99,8 @@ mqttMessage.setQos(0); mqttMessage.setPayload(JSONObject.toJSONString(realTimeDataVo).getBytes()); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA,mqttMessage); //å¤çæ éä¿¡æ¯ dryRealTimeDataService.fitFultRecord(realTimeDataVo); } } catch (MqttException e) { jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mapper/DryFaultRecordMapper.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,14 @@ package org.jeecg.modules.dry.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.jeecg.modules.dry.entity.DryFaultRecord; /** * @Description: dry_eqp_fault * @Author: jeecg-boot * @Date: 2024-11-13 * @Version: V1.0 */ public interface DryFaultRecordMapper extends BaseMapper<DryFaultRecord> { } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mapper/xml/DryFaultRecordMapper.xml
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,5 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="org.jeecg.modules.dry.mapper.DryFaultRecordMapper"> </mapper> jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -97,6 +97,9 @@ // 订é ç§æ·å®æ¶æ°æ® mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); System.out.println("admin订é " + MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA); // 订é ç§æ·æ¥è¦æ°æ® mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); System.out.println("admin订é " + MqttConstant.TENANT_UP_PREFIX_FAULT_DATA); mqttClient.subscribe(MqttConstant.TENANT_UP_PREFIX_EQU); System.out.println("admin订é " + MqttConstant.TENANT_UP_PREFIX_EQU); initClients(); @@ -150,7 +153,7 @@ * æå¡ç«¯ï¼adminè§è²ï¼å¯å¨æ¶æ¥è¯¢ææè®¾å¤å¹¶ç¼åå°redis */ private void initClients() { redisUtil.del(MqttConstant.MQTT_ONLINE_CLIENT); redisUtil.removeAll(MqttConstant.MQTT_ONLINE_CLIENT); JSONObject clients = emqxApi.queryEmqx(EmqxApi.CMD_CLIENTS); //TODO æ ¹æ®emqxè¿åç¼åå®ä½ç±» @@ -171,11 +174,15 @@ //æ¯å¦è¿æ¥ Boolean connected = obj.getBoolean("connected"); item.put("connected", connected); // String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); item.put("code", info[2]); //æ ¹æ®clientidè§£æ(注æé ç½®æä»¶ä¸clientidæ ¼å¼ ä¾ï¼client-1000) try { String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); }catch (Exception e){ e.printStackTrace(); } if (connected) { redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -2,7 +2,9 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; @@ -16,17 +18,17 @@ import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEqpType; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.IDryEqpTypeService; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryRealTimeDataService; import org.jeecg.modules.dry.service.IDryShopService; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @@ -56,16 +58,20 @@ @Autowired private IDryShopService dryShopService; @Autowired private IDryFaultRecordService faultRecordService; @Override public void connectionLost(Throwable throwable) { System.err.println("è¿æ¥æå¼ï¼ï¼æçº¿"); System.err.println("è¿æ¥æå¼ï¼ï¼"+throwable.toString()); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { public void messageArrived(String topic, MqttMessage mqttMessage) { System.out.println("æ¶å°æ¶æ¯: \n topicï¼" + topic + "\n Qosï¼" + mqttMessage.getQos() + "\n payloadï¼" + new String(mqttMessage.getPayload())); @@ -90,12 +96,15 @@ item.put("clientid", clientid); //æ¯å¦è¿æ¥ item.put("connected", true); // String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); item.put("code", info[2]); //æ ¹æ®clientidè§£æ(注æé ç½®æä»¶ä¸clientidæ ¼å¼ ä¾ï¼client-1000) try { String[] info = clientid.split("-"); item.put("type", info[0]); item.put("tenantId", info[1]); //item.put("code", info[2]); }catch (Exception e){ e.printStackTrace(); } redisUtil.set(MqttConstant.MQTT_ONLINE_CLIENT + clientid, item); System.err.println(String.format("设å¤: %sä¸çº¿", clientid)); } @@ -150,7 +159,7 @@ messageJson.put("timestamp", DateUtils.zone2Str(messageJson.get("timestamp").toString())); } // 宿¶æ°æ®ä¸ä¼ 太é¢ç¹ä¸æ°æ®å å®¹è¶ è¿å段大å°ä¸è®°å½æ¥å¿ if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA)){ if (!topic.equals(MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA) && !topic.equals(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA)){ baseCommonService.addLog(message, CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_1); } @@ -203,6 +212,24 @@ }); break; // æ¥æ¶è®¾å¤æ¥è¦æ°æ® case MqttConstant.TENANT_UP_PREFIX_FAULT_DATA: ThreadUtil.execute(() -> { try { JSONObject jsonObject = JSON.parseObject(message); List<DryFaultRecord> faultRecords = JSON.parseArray(jsonObject.get("data").toString(), DryFaultRecord.class); System.err.println(faultRecords.toString()); faultRecordService.saveBatch(faultRecords); } catch (Exception e) { e.printStackTrace(); } }); break; case MqttConstant.TENANT_UP_PREFIX_EQU: ThreadUtil.execute(() -> { try { jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/IDryFaultRecordService.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,15 @@ package org.jeecg.modules.dry.service; import com.baomidou.mybatisplus.extension.service.IService; import org.jeecg.modules.dry.entity.DryFaultRecord; /** * @Description: dry_eqp_fault * @Author: jeecg-boot * @Date: 2024-11-13 * @Version: V1.0 */ public interface IDryFaultRecordService extends IService<DryFaultRecord> { } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/IDryRealTimeDataService.java
@@ -15,4 +15,11 @@ Result<?> queryWorkshopStatistics(RealTimeDataVo realTimeDataVo); Result<?> statisticsDataHandle(StatisticsDataVo statsDataVo); /** * éè¿å®æ¶æ°æ®æ¶éå¤çæ éä¿¡æ¯ * @param realTimeDataVo * @return */ Result<?> fitFultRecord(RealTimeDataVo realTimeDataVo); } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryFaultRecordServiceImpl.java
¶Ô±ÈÐÂÎļþ @@ -0,0 +1,19 @@ package org.jeecg.modules.dry.service.impl; import org.jeecg.modules.dry.entity.DryFaultRecord; import org.jeecg.modules.dry.mapper.DryFaultRecordMapper; import org.jeecg.modules.dry.service.IDryFaultRecordService; import org.springframework.stereotype.Service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; /** * @Description: dry_eqp_fault * @Author: jeecg-boot * @Date: 2024-11-13 * @Version: V1.0 */ @Service public class DryFaultRecordServiceImpl extends ServiceImpl<DryFaultRecordMapper, DryFaultRecord> implements IDryFaultRecordService { } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -1,10 +1,14 @@ package org.jeecg.modules.dry.service.impl; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IoSession; @@ -20,6 +24,7 @@ import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; @@ -41,10 +46,8 @@ import java.io.ObjectOutputStream; import java.net.Socket; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.*; import java.util.stream.Collectors; @Slf4j @Service @@ -68,6 +71,9 @@ @Autowired private IDryProdRecordService prodRecordService; @Autowired private IDryFaultRecordService faultRecordService; private String token; @@ -336,7 +342,7 @@ private void saveOrderTrendVo(DryOrderTrendVo trendVo, DryOrderVo orderVo) { //夿 宿¶å«æ°´ç æ 宿¶ééææ²¡æååï¼æåååæ´æ° if(orderVo.getTrendVo() == null && trendVo != null && trendVo.getWeight() > 0 || trendVo.getWeight() < orderVo.getTrendVo().getWeight() || orderVo.getTrendVo()!=null && trendVo.getWeight() < orderVo.getTrendVo().getWeight() ) { DryOrder byId = dryOrderService.getById(orderVo.getId()); // å°ææ°ç»ææ´æ°å°å·¥å @@ -485,4 +491,112 @@ public Result<?> statisticsDataHandle(StatisticsDataVo statsDataVo) { return null; } @Override public Result<?> fitFultRecord(RealTimeDataVo vo) { ThreadUtil.execute(() -> { try { //è§£æå卿¥è¦æ°æ® List<DryFaultRecord> faultRecords1 = fitFault(vo.getEqp_fault(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1); List<DryFaultRecord> faultRecords2 = fitFault(vo.getEqp_warning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2); faultRecords1.addAll(faultRecords2); JSONObject json = new JSONObject(); json.put("data",JSON.toJSONString(faultRecords1)); if(faultRecords1.isEmpty()) return; MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); mqttMessage.setPayload((JSON.toJSONString(json).getBytes())); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_FAULT_DATA,mqttMessage); } catch (Exception e) { e.printStackTrace(); } }); return null; } /** * è§£æå卿 éæ°æ® * TODO ä¿è¯ååæ§ * @param fault æ éæ°æ® * @param orderId å·¥å * @param tenantId ç§æ· * @param machineId è®¾å¤ * @param faultType æ éç±»å * @return ç»è£ 好æ éæ°æ® */ private List<DryFaultRecord> fitFault(String fault, String orderId,Integer tenantId,String machineId,Integer faultType){ List<DryFaultRecord> result = new ArrayList<>(); //æ°æ®æ ·æ¬ï¼"eqp_fault": "æ»çéè¶ æ¶-æ¥è¦,飿ºè¿æµæ¥è¦,æ»çåè¶ æ¶-æ¥è¦,é£ç®±åæ¥è¦", System.err.println((faultType == 1 ? "ç±»åï¼æ é" : "ç±»åï¼æ¥è¦") + DateUtils.formatDateTime()+"--"+fault); //redisä¸çæ é Map<Object, Object> rFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); Map<String, Object> redFauMap = rFauMap.entrySet().stream() .collect(Collectors.toMap( entry -> entry.getKey().toString(), // é®è½¬æ¢ä¸ºå符串 entry -> entry.getValue() )); //没æçæå·¥åçæ éæ°æ®ä¸åå¨ if(StringUtils.isEmpty(orderId)){ return result; } if(StringUtils.isEmpty(fault) && rFauMap.isEmpty()){ return result; } //1.è§£ææ°æ® String[] eqpFaults = fault.split(","); Map<String,DryFaultRecord> realFauMap = new HashMap<>(); for (int i = 0; i < eqpFaults.length; i++) { String eqpFault = eqpFaults[i]; //é¿å 空å符串 if(StringUtils.isEmpty(eqpFault)) continue; //1.1æ£æ¥mqtt䏿¯å¦å·²åå¨è¿ä¸ªæ é String redisKey = String.format("%s_%s_%s", tenantId, machineId,eqpFault); String rFault = (String) redisUtil.get(redisKey); //1.2妿redisä¸åå¨ååå ¥ï¼åæ éå¼å§ï¼ if(rFault ==null){ //ç»è£ ç¼åæ°æ® DryFaultRecord faultRecord = new DryFaultRecord(orderId,eqpFault,faultType,new Date(),null); realFauMap.put(redisKey,faultRecord); } } //1.3ç¼åè³redis //åå¹¶æ°æ® realFauMap.forEach((key, value) -> redFauMap.putIfAbsent(key, value)); //æ²¡ææ°æ éæ°æ®ä¸ç¨è¦ç if(!realFauMap.isEmpty()){ redisUtil.hmset(MqttConstant.MQTT_EQP_FAULT,redFauMap); } //2æ£æµå·²ç»æçæ é //2.1妿宿¶æ°æ®ä¸åå¨redisåå¨å代表æ éç»æï¼åå ¥æ°æ®åº Map<Object, Object> curFauMap = redisUtil.hmget(MqttConstant.MQTT_EQP_FAULT); curFauMap.keySet().stream() //ç¹å«æ³¨æï¼å¤ä¸ªæ¥è¦ç±»åå ±ç¨æ¹æ³éè¦åºåç±»å .filter(key -> !realFauMap.containsKey(key) && ((DryFaultRecord)curFauMap.get(key)).getFaultType() == faultType) .forEach(key -> { System.err.println((faultType == 1 ? "ç±»åï¼æ é" : "ç±»åï¼æ¥è¦") + DateUtils.formatDateTime()+"åå ¥æ°æ®åº"); DryFaultRecord record = (DryFaultRecord)redFauMap.get(key); record.setEndTime(new Date()); faultRecordService.save(record); redisUtil.hdel(MqttConstant.MQTT_EQP_FAULT,key); result.add(record); }); return result; } }