jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/MqttConstant.java
@@ -5,6 +5,13 @@ */ public interface MqttConstant { /**************************** MQTT角色 start *************************************/ String ROLE_ADMIN = "admin"; String ROLE_USER = "user"; /**************************** MQTT角色 end *************************************/ /**************************系统订阅strat*******************************/ //上线订阅 String MQTT_TOPIC_ONLINE = "$SYS/brokers/+/clients/+/connected"; @@ -28,14 +35,7 @@ /**************************租户端向服务端发送数据start*******************************/ String TENANT_UP_PREFIX = "tenant/up"; String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; /**************************租户端向服务端发送数据end*******************************/ @@ -49,7 +49,27 @@ String SERVICE_RES_EQU_CMD = SERVICE_DOWN_PREFIX + "/%s/cmd"; /**************************服务端向移动端响应指令end*******************************/ /**************************租户端向服务端发送数据start*******************************/ String TENANT_UP_PREFIX = "tenant/up"; String TENANT_UP_PREFIX_REALTIME_DATA = TENANT_UP_PREFIX + "/realTime/data"; String TENANT_UP_PREFIX_EQU = TENANT_UP_PREFIX + "/equipment"; /**************************租户端向服务端发送数据end*******************************/ /**************************服务端向租户端请求数据start*******************************/ String SERVICE_REQ_PREFIX = "service/req"; String SERVICE_REQ_EQUIPMENT = SERVICE_REQ_PREFIX + "/%s/equipment"; String SERVICE_REQ_EQU_TOPIC = SERVICE_REQ_PREFIX + "/+/equipment"; /**************************服务端向租户端请求数据end*******************************/ //redis缓存 String MQTT_ONLINE_CLIENT = "mqtt:online:client::"; jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttConfig.java
@@ -104,6 +104,7 @@ case "user": //普通客户端只需订阅自身相关消息 mqttClient.subscribe(MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); mqttClient.subscribe(MqttConstant.SERVICE_REQ_PREFIX + "/" + mqttClientId.substring(mqttClientId.lastIndexOf("_")+1) + "/#"); System.err.println("user订阅" + MqttConstant.SERVICE_DOWN_PREFIX + "/" + mqttClientId + "/#"); break; jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/mqtt/MqttSampleCallback.java
@@ -3,22 +3,30 @@ import cn.hutool.core.thread.ThreadUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.config.TenantContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.dry.api.EmqxApi; import org.jeecg.modules.dry.entity.DryEquipment; import org.jeecg.modules.dry.entity.DryShop; import org.jeecg.modules.dry.service.IDryEquipmentService; import org.jeecg.modules.dry.service.IDryRealTimeDataService; import org.jeecg.modules.dry.service.IDryShopService; import org.jeecg.modules.dry.vo.DryEquipmentVo; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; @Slf4j @Component @Scope("prototype") public class MqttSampleCallback implements MqttCallback { @@ -35,6 +43,15 @@ @Autowired private IDryRealTimeDataService realTimeDataService; @Autowired private IDryEquipmentService equipmentService; @Autowired private IDryShopService dryShopService; @Override @@ -174,7 +191,26 @@ } break; case MqttConstant.TENANT_UP_PREFIX_EQU: try { DryEquipment equipment = (DryEquipment) messageJson.get("equipment"); DryEquipment dryEquipment = equipmentService.selectByTenantIdEquipmentId(equipment.getTenantId() + "", equipment.getCode()); if (dryEquipment == null) { equipmentService.save(equipment); } // 获取设备所属车间 DryShop shop = (DryShop) messageJson.get("shop"); shop.setTenantId(equipment.getTenantId()); dryShopService.save(shop); } catch (Exception e) { e.printStackTrace(); } } } @@ -218,6 +254,30 @@ break; case MqttConstant.SERVICE_REQ_EQU_TOPIC: log.debug("收到设备详细信息查询请求"); // 根据设备编码查询设备信息 String code = messageJson.getString("code"); DryEquipment equipmentVo = equipmentService.selectByTenantIdEquipmentId(TenantContext.getTenant(),code); // 根据设备车间id查询车间信息 DryShop shop = dryShopService.getById(equipmentVo.getShopId()); JSONObject res = new JSONObject(); res.put("tenant", TenantContext.getTenant()); res.put("equipment", equipmentVo); res.put("shop", shop); try { MqttMessage sendMessage = new MqttMessage(res.toJSONString().getBytes()); sendMessage.setQos(0); mqttUtil.getMqttClient().publish(MqttConstant.TENANT_UP_PREFIX_EQU, sendMessage); // baseCommonService.addLog(res.toString(), CommonConstant.LOG_TYPE_MQTT, CommonConstant.OPERATE_MQTT_2); } catch (Exception e) { e.printStackTrace(); } break; } } jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/service/impl/DryRealTimeDataServiceImpl.java
@@ -8,16 +8,23 @@ import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.service.IoHandler; import org.apache.mina.core.session.IoSession; import org.apache.shiro.SecurityUtils; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.jeecg.common.api.CommonAPI; import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.TenantContext; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.MqttConstant; import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.common.CacheConstants; import org.jeecg.modules.dry.entity.*; import org.jeecg.modules.dry.mqtt.MqttUtil; import org.jeecg.modules.dry.service.*; import org.jeecg.modules.dry.socket.ServerHandler; import org.jeecg.modules.dry.socket.SocketServerConfig; @@ -25,9 +32,11 @@ import org.jeecg.modules.dry.vo.*; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.security.auth.login.LoginContext; import java.io.IOException; import java.io.ObjectOutputStream; import java.net.Socket; @@ -63,7 +72,11 @@ private String token; @Value(value = "${jeecg.mqtt.role}") private String role; @Autowired private MqttUtil mqttUtil; public String getTemporaryToken() { if (token == null) { @@ -238,17 +251,23 @@ */ private DryOrderVo saveNewOrder(RealTimeDataVo realTimeDataVo) { DryOrderVo orderVo; // 查询药材 DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); // 查询设备 DryEquipment equ = queryEquipmentByCodeTenant(realTimeDataVo); if (equ == null) { log.error("未找到设备:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); return null; } // 查询药材 DryHerbFormula herbFormula = queryHerbByIndexTenant(realTimeDataVo); // 创建新工单 orderVo = new DryOrderVo(realTimeDataVo); if (herbFormula == null) { log.error("未找到药材:"+realTimeDataVo.getIndex() +","+realTimeDataVo.getName() +",机台:" + realTimeDataVo.getMachineid()); return null; } // 创建新工单 orderVo = new DryOrderVo(realTimeDataVo); orderVo.setHerbId(herbFormula.getId()); orderVo.setEquId(equ.getId()); DryOrder dryOrder = BeanUtil.toBean(orderVo, DryOrder.class); @@ -268,8 +287,23 @@ queryWrapper.eq(DryEquipment::getCode, realTimeDataVo.getMachineid()); DryEquipment one = equipmentService.getOne(queryWrapper); if (one == null) { one = new DryEquipment(realTimeDataVo); equipmentService.save(one); log.error(role+"保存实时数据,未找到设备:"+realTimeDataVo.getMachineid()); // one = new DryEquipment(realTimeDataVo); // equipmentService.save(one); if (MqttConstant.ROLE_ADMIN.equals(role)) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setQos(0); JSONObject object = new JSONObject(); object.put("code", realTimeDataVo.getMachineid()); mqttMessage.setPayload(object.toJSONString().getBytes()); try { mqttUtil.getMqttClient().publish(String.format(MqttConstant.SERVICE_REQ_EQUIPMENT, TenantContext.getTenant()) ,mqttMessage); }catch (MqttException e) { e.printStackTrace(); } } return null; } return one; }