From 4a60ced80b215fcb2e2d4664b20cd744313ccc10 Mon Sep 17 00:00:00 2001
From: zhuguifei <zhuguifei@zhuguifeideiMac.local>
Date: 星期五, 25 七月 2025 15:08:07 +0800
Subject: [PATCH] 接收mqtt数据高并发处理
---
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java | 152 ++++++++++++++++++------------
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java | 112 +++++++++++++++------
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java | 4
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java | 15 ++
jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java | 5
jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java | 12 +
6 files changed, 199 insertions(+), 101 deletions(-)
diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
index 6264a3b..2b95f9f 100644
--- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
+++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -46,7 +46,7 @@
//鏈嶅姟绔笅琛屾寚浠ゅ墠缂�锛堣繑鍥炵粰绉诲姩绔級
String SERVICE_DOWN_PREFIX = "service/down/res";
//杩斿洖绉诲姩绔煡璇㈣澶囩姸鎬�
- String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/%s/statu";
+ String SERVICE_RES_EQU_STATU = SERVICE_DOWN_PREFIX + "/equ/statu/%s";
//杩斿洖绉诲姩绔繙绋嬭姹傛寚浠�
String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd";
@@ -63,6 +63,11 @@
String SERVICE_BROADCAST_TENANT_REAL_FAULT = SERVICE_BROADCAST_PREFIX + "/real/fault/%s";
//鏈嶅姟绔悜绉诲姩绔洖澶嶄竴娆¤澶囧疄鏃舵晠闅滃憡璀�
String SERVICE_ONECE_TENANT_REAL_FAULT = "service/onece" + "/real/fault/%s";
+ // 鐩戞祴瀹㈡埛绔繛鎺ユ垨鏂紑锛屾帹閫佹秷鎭埌绉熸埛鍐呮墍鏈夌Щ鍔ㄧ鎻愰啋鏇存柊骞茬嚗璁惧杩炴帴淇℃伅锛�%s-绉熸埛锛�
+ String SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU = SERVICE_BROADCAST_PREFIX + "/update/equ/statu/%s";
+
+ //鏈嶅姟绔悜绉诲姩绔彂閫佸共鐕ヨ澶囧疄鏃舵暟鎹紙%s-绉熸埛锛�
+ String SERVICE_BROADCAST_TENANT_REAL_DATA = SERVICE_BROADCAST_PREFIX + "/real/data/%s";
@@ -77,7 +82,7 @@
String TENANT_UP_PREFIX = "tenant/up";
String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data";
- String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp";
+ String TENANT_UP_PREFIX_REALTIME_DATA_EQP = TENANT_UP_PREFIX + "/realTime/data/eqp/test";
String TENANT_UP_PREFIX_FAULT_DATA = TENANT_UP_PREFIX + "/fault/data";
String TENANT_UP_PREFIX_REAL_FAULT_DATA = TENANT_UP_PREFIX + "/real/fault/data";
@@ -96,12 +101,13 @@
/**************************start*******************************/
/**************************end*******************************/
- //redis缂撳瓨
+
//鎵�鏈夌鎴风殑瀹炴椂鎶ヨ锛�%s锛氱鎴穒d锛�
String MQTT_REAL_FAULT = "mqtt:real:fault:%s";
+
//service(cloud)
//鍦ㄧ嚎瀹㈡埛绔�
String MQTT_ONLINE_CLIENT = "mqtt:online:client:%s";
diff --git a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
index b74fdf1..c82fea9 100644
--- a/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
+++ b/jeecg-module-dry/jeecg-module-dry-api/src/main/java/org/jeecg/modules/dry/vo/DryFaultRecordVo.java
@@ -17,6 +17,8 @@
private Integer eCount;
//璁惧鍚嶇О
private String equName;
+ //璁惧缂栫爜
+ private String equCode;
//绉熸埛鍚嶇О
private String tenantName;
//鏁呴殰鏃堕棿
@@ -28,10 +30,11 @@
super(record.getOrderId(),record.getTenantId(),record.getFaultName(),record.getFaultType(),record.getStartTime(),record.getEndTime());
this.eCount = count;
}
- public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount, String equName, String tenantName) {
+ public DryFaultRecordVo(String orderId, Integer tenantId, String faultName, Integer faultType, Date startTime, Date endTime, Integer eCount,String equCode, String equName, String tenantName) {
super(orderId, tenantId, faultName, faultType, startTime, endTime);
this.eCount = eCount;
this.equName = equName;
+ this.equCode = equCode;
this.tenantName = tenantName;
}
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
index 4fcdc30..4ec431f 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/controller/MobileController.java
@@ -167,12 +167,23 @@
String st = client.getString("connectedAt");
vo.setUpTime(st);
vo.setClientId(clientid);
+ }else{
+ vo.setClientId(clientid);
+ vo.setOnline(false);
}
return vo;
}).collect(Collectors.toList());
//鎺掑簭
- collect.sort(Comparator.comparing(obj -> obj.getCode(), Comparator.nullsLast(Comparator.naturalOrder())));
- collect.sort(Comparator.comparing(obj -> obj.getOnline(), Comparator.nullsLast(Comparator.naturalOrder())));
+ collect.sort(
+ Comparator.comparing(
+ MoEquVo::getOnline,
+ Comparator.nullsLast(Comparator.reverseOrder()) // true 鍦ㄥ墠锛宖alse 鍦ㄥ悗锛宯ull 鏈�鍚�
+ )
+ .thenComparing(
+ DryEquipment::getCode,
+ Comparator.nullsLast(Comparator.naturalOrder()) // code 鍗囧簭锛宯ull 鏈�鍚�
+ )
+ );
BeanUtils.copyProperties(pageList, page);
page.setRecords(collect);
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
index a490a0a..65d41d6 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -73,7 +73,7 @@
mqttConnOpt.setAutomaticReconnect(false);//璁剧疆鏄惁鑷姩閲嶈繛
//閬楀槺娑堟伅 TODO qos2闇�瑕佸湪璁惧涓婄嚎鏃跺仛娓呴櫎娑堟伅鎿嶄綔
- mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false);
+ //mqttConnOpt.setWill("downline", ("鎴戞槸" + mqttName + "_" + mqttClientId + "锛屾垜涓嬬嚎浜�").getBytes(), 2, false);
try {
MqttClient mqttClient = new MqttClient(broker, mqttClientId, persistence);
@@ -183,6 +183,8 @@
String clientid = obj.getString("clientid");
item.put("clientid", clientid);
//TODO 鏍¢獙绉熸埛id鏄惁瀛樺湪
+
+
if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) continue;
//username
item.put("username", obj.get("username"));
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
index 4558603..6fcfa4b 100644
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -6,6 +6,7 @@
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.poi.ss.formula.functions.T;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
@@ -42,7 +43,6 @@
public class MqttSampleCallback implements MqttCallback {
@Value(value = "${jeecg.mqtt.role}")
private String role;
-
@Autowired
@@ -88,7 +88,7 @@
String message = new String(mqttMessage.getPayload());
JSONObject messageJson = JSONObject.parseObject(message);
- if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected")) {
+ if (topic.startsWith("$SYS/brokers/") && topic.endsWith("connected") && !topic.endsWith("disconnected")) {
JSONObject client = (JSONObject) redisUtil.get(String.format(MqttConstant.MQTT_ONLINE_CLIENT, messageJson.get("clientid").toString().split("-")[1]) + messageJson.get("clientid"));
if (client == null) {
JSONObject item = new JSONObject();
@@ -102,7 +102,7 @@
String clientid = messageJson.getString("clientid");
item.put("clientid", clientid);
// 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
- if(!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
+ if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
//鏄惁杩炴帴
item.put("connected", true);
//鏍规嵁clientid瑙f瀽(娉ㄦ剰閰嶇疆鏂囦欢涓璫lientid鏍煎紡 渚嬶細client-1000)
@@ -113,6 +113,12 @@
item.put("code", info[2]);
redisUtil.hset(String.format(MqttConstant.MQTT_ONLINE_CLIENT, item.get("tenantId")), clientid, item);
System.err.println(String.format("璁惧: %s涓婄嚎", clientid));
+
+ // 鎺ㄩ�佸埌绉诲姩绔�
+ String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, item.get("tenantId"));
+ MqMessage<JSONObject> mqMessage = new MqMessage<>(item, item.get("tenantId").toString(), recTopic);
+ sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -124,8 +130,23 @@
if (topic.startsWith("$SYS/brokers/") && topic.endsWith("disconnected")) {
try {
String clientid = messageJson.getString("clientid");
- redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientid.split("-")[1]), clientid);
+ // 涓嶇鍚堢殑璁惧涓嶈繘琛岀鐞�
+ if (!clientid.matches("^[^-]+-[^-]+-[^-]+$")) return;
+ String tenantId = clientid.split("-")[1];
+ redisUtil.hdel(String.format(MqttConstant.MQTT_ONLINE_CLIENT, tenantId), clientid);
System.err.println(String.format("璁惧: %s涓嬬嚎", clientid));
+
+ //鎺ㄩ�佸埌绉诲姩绔�
+ JSONObject item = new JSONObject();
+ String[] info = clientid.split("-");
+ item.put("type", info[0]);
+ item.put("tenantId", info[1]);
+ item.put("code", info[2]);
+ item.put("connected", false);
+ String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, tenantId);
+ MqMessage<JSONObject> mqMessage = new MqMessage<>(item, tenantId, recTopic);
+ sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_UPDATE_EQU_STATU, mqMessage, 1);
+
} catch (Exception e) {
e.printStackTrace();
}
@@ -177,38 +198,55 @@
switch (topic) {
// 鏌ヨ璁惧鍦ㄧ嚎
case MqttConstant.MOBILE_QUERY_EQU_STATU:
- System.err.println("admin鏀跺埌" + topic);
- // 鏍规嵁璁惧id鏌ヨ璁惧mqtt鍦ㄧ嚎鐘舵��
- String clientId = messageJson.getString("clientId");
- JSONObject client = (JSONObject) redisUtil.hget(String.format(MqttConstant.MQTT_ONLINE_CLIENT, clientId.split("-")[1]), clientId);
+ log.info("admin鏀跺埌MQTT璇锋眰锛宼opic: {}", topic); // 鏀圭敤鏇磋鑼冪殑鏃ュ織璁板綍
- ThreadUtil.execute(() -> {
+ try {
- if (client == null || client.isEmpty()) {
- JSONObject res = new JSONObject();
- res.put("success", false);
- res.put("msg", "鏌ヨ澶辫触");
- try {
- MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
- } catch (Exception e) {
- e.printStackTrace();
- }
+ // 1. 鍙傛暟鎻愬彇
+ String clientId = messageJson.getString("clientId");
+ if (StringUtils.isEmpty(clientId)) {
return;
}
+ String deviceKey = clientId.split("-")[1]; // 鎻愬彇璁惧鏍囪瘑
- client.put("success", true);
- client.put("msg", "鏌ヨ鎴愬姛");
- try {
- MqttMessage sendMessage = new MqttMessage(client.toJSONString().getBytes());
- sendMessage.setQos(0);
- mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_RES_EQU_STATU, req), sendMessage);
- baseCommonService.addLog(client.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2);
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
+ // 2. 鏌ヨ璁惧鐘舵��
+ String redisKey = String.format(MqttConstant.MQTT_ONLINE_CLIENT, deviceKey);
+ JSONObject client = (JSONObject) redisUtil.hget(redisKey, clientId);
+
+ // 3. 寮傛澶勭悊鍝嶅簲
+ ThreadUtil.execute(() -> {
+ JSONObject response = new JSONObject();
+
+ // 3.1 澶勭悊鏌ヨ缁撴灉
+ if (client == null || client.isEmpty()) {
+ response.put("success", false);
+ response.put("msg", "鏌ヨ澶辫触锛岃澶囦笉瀛樺湪鎴栫绾�");
+ } else {
+ response = client; // 澶嶇敤鏌ヨ缁撴灉
+ response.put("success", true);
+ response.put("msg", "鏌ヨ鎴愬姛");
+ }
+
+ // 3.2 鍙戦�丮QTT鍝嶅簲
+ try {
+ String resTopic = String.format(MqttConstant.SERVICE_RES_EQU_STATU, req);
+ MqMessage<JSONObject> mqMessage = new MqMessage<>(
+ response,
+ response.getString("tenantId"),
+ resTopic
+ );
+
+ sendMqttMessage(resTopic, mqMessage, 2);
+ log.debug("璁惧鐘舵�佸搷搴斿彂閫佹垚鍔�: {}", response);
+
+ } catch (Exception e) {
+ log.error("MQTT鍝嶅簲鍙戦�佸け璐�", e);
+ }
+ });
+
+ } catch (Exception e) {
+ log.error("澶勭悊璁惧鐘舵�佹煡璇㈠紓甯�", e);
+ }
break;
// 鎺ユ敹璁惧瀹炴椂鏁版嵁 TODO 20250718鏆備笉浣跨敤锛屼娇鐢═ENANT_UP_PREFIX_REALTIME_DATA_EQP
@@ -227,10 +265,16 @@
case MqttConstant.TENANT_UP_PREFIX_REALTIME_DATA_EQP:
ThreadUtil.execute(() -> {
try {
+
RealTimeDataParentVo vo = JSON.parseObject(message, RealTimeDataParentVo.class);
- synchronized (realTimeDataService) {
- realTimeDataService.realTimeDataHandle(vo);
- }
+ // 鍚戝悇绉熸埛绉诲姩绔彂閫佹暟鎹�
+ String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, vo.getTenantid());
+ MqMessage<RealTimeDataVo> mqMessage = new MqMessage<>(vo.getRealTime(), vo.getTenantid() + "", recTopic);
+ sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_DATA, mqMessage, 1);
+
+
+ realTimeDataService.realTimeDataHandle(vo);
+
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
index 9178cb4..8e48a0b 100755
--- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
+++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -36,6 +36,8 @@
import java.text.DecimalFormat;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@Slf4j
@@ -79,6 +81,13 @@
@Value(value = "${jeecg.mqtt.enable}")
private boolean mqttEnable;
+
+ private static final ConcurrentHashMap<String, ReentrantLock> tenantLocks = new ConcurrentHashMap<>();
+
+ private ReentrantLock getLock(String tenantId, String type) {
+ String lockKey = tenantId + ":" + type;
+ return tenantLocks.computeIfAbsent(lockKey, k -> new ReentrantLock());
+ }
public String getTemporaryToken() {
if (token == null) {
@@ -226,7 +235,16 @@
}
if (realTimeDataParentVo.getFault() != null) {
- fitFaultRecord(realTimeDataParentVo);
+ ReentrantLock faultLock = getLock(realTimeDataParentVo.getTenantid() + "", "fault");
+ faultLock.lock();
+ try {
+ fitFaultRecord(realTimeDataParentVo);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ faultLock.unlock();
+ }
+
}
return Result.ok();
}
@@ -291,7 +309,7 @@
* @return
*/
private DryOrderVo getOrSaveDryOrderVoDB(RealTimeDataVo realTimeDataVo) {
- TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+ TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
DryOrderVo orderVo;
LambdaQueryWrapper<DryOrder> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(DryOrder::getCode, realTimeDataVo.getWorkorder());
@@ -329,7 +347,7 @@
* @return
*/
private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) {
- TenantContext.setTenant(realTimeDataVo.getTenantid() +"");
+ TenantContext.setTenant(realTimeDataVo.getTenantid() + "");
DryOrderVo orderVo;
// 鏌ヨ璁惧
@@ -357,16 +375,26 @@
new LambdaQueryWrapper<DryEqpType>()
.eq(DryEqpType::getTenantId, realTimeDataVo.getTenantid())
);
- if(eqpType == null){
- log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid() );
+ if (eqpType == null) {
+ log.error("鏈煡璇㈠埌绉熸埛璁惧绫诲瀷锛歿}", realTimeDataVo.getTenantid());
return null;
}
Optional.ofNullable(eqpType).ifPresent(type -> addEqu.setType(type.getId()));
- if (!equipmentService.save(addEqu)) {
- log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
- return null;
+ // 璁惧鏂板
+ ReentrantLock equipmentLock = getLock(realTimeDataVo.getTenantid() + "", "equipment");
+
+ equipmentLock.lock();
+ try {
+ if (!equipmentService.save(addEqu)) {
+ log.error("鏂板璁惧澶辫触锛氭暟鎹簱淇濆瓨寮傚父锛乪quipment={}", addEqu);
+ return null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ equipmentLock.unlock();
}
equ = addEqu;
@@ -380,14 +408,26 @@
log.error("鏈壘鍒拌嵂鏉愶細" + realTimeDataVo.getIndex() + "," + realTimeDataVo.getName() + ",鏈哄彴锛�" + realTimeDataVo.getMachineid());
return null;
}
- // 鍒涘缓鏂板伐鍗�
- orderVo = new DryOrderVo(realTimeDataVo);
- orderVo.setHerbId(herbFormula.getId());
- orderVo.setEquId(equ.getId());
- DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
- boolean save = dryOrderService.save(dryOrder);
- return orderVo;
+ // 宸ュ崟鏂板
+ ReentrantLock orderLock = getLock(realTimeDataVo.getTenantid() + "", "order");
+ orderLock.lock();
+ try {
+ // 鍒涘缓鏂板伐鍗�
+ orderVo = new DryOrderVo(realTimeDataVo);
+ orderVo.setHerbId(herbFormula.getId());
+ orderVo.setEquId(equ.getId());
+ DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class);
+ boolean save = dryOrderService.save(dryOrder);
+ return orderVo;
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ orderLock.unlock();
+ }
+
+
+ return null;
}
@@ -414,9 +454,9 @@
object.put("tenantId", realTimeDataVo.getTenantid());
mqttMessage.setPayload(object.toJSONString().getBytes());
try {
- if(mqttEnable){
- mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
- }
+ if (mqttEnable) {
+ mqttUtil.getMqttClient().publish(MqttConstant.SERVICE_REQ_PREFIX, mqttMessage);
+ }
} catch (MqttException e) {
e.printStackTrace();
}
@@ -441,7 +481,7 @@
if (one == null) {
one = new DryHerbFormula(realTimeDataVo);
DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(realTimeDataVo.getTenantid() + "", realTimeDataVo.getMachineid());
- if (dryEquipment!=null&&dryEquipment.getType()!=null) {
+ if (dryEquipment != null && dryEquipment.getType() != null) {
one.setEqpType(dryEquipment.getType());
}
@@ -657,51 +697,44 @@
@Override
public void fitFaultRecord(RealTimeDataParentVo vo) {
TenantContext.setTenant(vo.getTenantid() + "");
- ThreadUtil.execute(() -> {
- try {
- //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
- List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
- List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
- if(!errorList.isEmpty()){
- log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
- }
- if(!warnList.isEmpty()){
- log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
- }
+ //瑙f瀽瀛樺偍鎶ヨ鏁版嵁
+ List<DryFaultRecord> errorList = fitFault(vo.getFault().getError(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 1);
+ List<DryFaultRecord> warnList = fitFault(vo.getFault().getWarning(), vo.getWorkorder(), vo.getTenantid(), vo.getMachineid(), 2);
+ if (!errorList.isEmpty()) {
+ log.error("淇濆瓨鏁呴殰锛歿}", errorList.toString());
+ }
+ if (!warnList.isEmpty()) {
+ log.error("淇濆瓨鍛婅锛歿}", warnList.toString());
+ }
- //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
- if(!mqttEnable)return;
+ //浠ヤ笅涓轰簯鏈嶅姟澶勭悊鏁呴殰,鍘傚唴鏈湴鏈嶅姟鏃犻渶澶勭悊
+ if (!mqttEnable) return;
-
- //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault
- Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
+ //澶勭悊缁撴潫鍚庯紝灏唕edis涓疄鏃舵暟鎹彂閫佽嚦浜戞湇鍔″櫒 key = tenantId + machineId + eqpFault
+ Map<Object, Object> toCloudFaultMap = redisUtil.hmget(String.format(MqttConstant.MQTT_REAL_FAULT, vo.getTenantid()));
- Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
- .collect(Collectors.toMap(
- entry -> entry.getKey().toString(),
- entry -> (DryFaultRecordVo)entry.getValue()
- ));
+ Map<String, DryFaultRecordVo> dryFaultMap = toCloudFaultMap.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey().toString(),
+ entry -> (DryFaultRecordVo) entry.getValue()
+ ));
- String tenantId = vo.getTenantid() +"";
+ String tenantId = vo.getTenantid() + "";
- //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
- if (dryFaultMap.isEmpty()) {
- return;
- }
- String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
- //鏁版嵁杞崲
- List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
- MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
- //鍙戦�佸箍鎾�
- log.error("骞挎挱缁欙細{}" , recTopic);
- mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
+ //骞挎挱鍙戦�佺粰鍚勭鎴蜂笅绉诲姩璁惧
+ if (dryFaultMap.isEmpty()) {
+ return;
+ }
+ String recTopic = String.format(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, tenantId);
+ //鏁版嵁杞崲
+ List<DryFaultRecordVo> faultList = new ArrayList<DryFaultRecordVo>((dryFaultMap.values()));
+ MqMessage<List<DryFaultRecordVo>> mqMessage = new MqMessage<>(faultList, tenantId, recTopic);
+ //鍙戦�佸箍鎾�
+ log.error("骞挎挱缁欙細{}", recTopic);
+ mqttUtil.sendMqttMessage(MqttConstant.SERVICE_BROADCAST_TENANT_REAL_FAULT, mqMessage, 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
}
@@ -759,12 +792,11 @@
// addFauMap.put(redisKey,faultRecord);
Map<String, DryEquipment> equipmentMap = equipmentService.queryEquByTenantId(tenantId);
String tenantName = (String) redisUtil.hget(CommonCacheConstant.SYS_CACHE_TENANT, tenantId + "");
- DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, equipmentMap.get(machineId).getName(), tenantName);
+ DryFaultRecordVo vo = new DryFaultRecordVo(orderId, tenantId, eqpFault, faultType, new Date(), null, 1, machineId, equipmentMap.get(machineId).getName(), tenantName);
addFauMap.put(redisKey, vo);
} else {
- //TODO 鐗规畩鎯呭喌锛屽鏋渞edis鐨勬晠闅滃拰鏂�
-
-
+
+
//濡傛灉鏁版嵁宸插瓨鍦紝涓旇鏁板ぇ浜�1灏遍噸缃鏁帮紙璁℃暟3娆″悗鍒ゅ畾鏁呴殰缁撴潫锛�3娆′箣鍓嶉噸鏂颁笂鎶ユ晠闅滆鏄庢晠闅滆繕鍦ㄦ寔缁� 闇�瑕侀噸鏂拌鏁帮級
if (rFault.getECount() != null && rFault.getECount() > 1) {
rFault.setECount(1);
--
Gitblit v1.9.3