package org.jeecg.modules.dry.websocket; import cn.hutool.core.bean.BeanUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.constant.DrySocketConst; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.dry.service.IDryRealTimeDataService; import org.jeecg.modules.dry.vo.RealTimeDataVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Collection; import java.util.HashMap; import java.util.Map; @Slf4j @Component @ServerEndpoint("/drySocket/{tenantId}/{machineId}") public class DrySocket { /** * 当前 session */ private Session session; /** * 当前租户id */ private String tenantId; /** * 设备id,用于标识同一租户,不同设备的数据 */ private String machineId; /** * 当前socket唯一id */ private String socketId; /** * 租户连接池,包含单个租户的所有socket连接; * 因为一个租户可能打开多个设备,多个设备就会有多个连接; * key是tenantId,value是Map对象;子Map的key是machineId,value是drySocket对象 */ private static Map> tenantPool = new HashMap<>(); /** * 连接池,包含所有WebSocket连接; * key是socketId,value是drySocket对象 */ private static Map machinePool = new HashMap<>(); /** * 获取某个租户所有的设备 */ public static Map getTenantPool(String tenantId) { return tenantPool.computeIfAbsent(tenantId, k -> new HashMap<>(5)); } /** * 向当前租户发送消息 * * @param message 消息内容 */ public void sendMessage(String message) { try { this.session.getAsyncRemote().sendText(message); } catch (Exception e) { log.error("【drySocket】消息发送失败:" + e.getMessage()); } } /** * 封装消息json * * @param data 消息内容 */ public static String packageMessage(String type, Object data) { JSONObject message = new JSONObject(); message.put(DrySocketConst.TYPE, type); message.put(DrySocketConst.DATA, data); return message.toJSONString(); } /** * 向指定租户的所有设备发送消息 * * @param tenantId 接收消息的租户ID * @param message 消息内容 */ public static void sendMessageTo(String tenantId, String message) { Collection values = getTenantPool(tenantId).values(); if (values.size() > 0) { for (DrySocket socketItem : values) { socketItem.sendMessage(message); } } else { log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"不存在或未在线!"); } } /** * 向指定租户的指定设备发送消息 * * @param tenantId 接收消息的租户ID * @param message 消息内容 */ public static void sendMessageTo(String tenantId, String machineId, String message) { DrySocket socketItem = getTenantPool(tenantId).get(machineId); if (socketItem != null) { socketItem.sendMessage(message); } else { log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"的machineId\"" + machineId + "\"不存在或未在线!"); } } /** * 向多个租户的所有设备发送消息 * * @param tenantIds 接收消息的租户ID数组 * @param message 消息内容 */ public static void sendMessageTo(String[] tenantIds, String message) { for (String tenantId : tenantIds) { DrySocket.sendMessageTo(tenantId, message); } } /** * 向所有租户的所有设备发送消息 * * @param message 消息内容 */ public static void sendMessageToAll(String message) { for (DrySocket socketItem : machinePool.values()) { socketItem.sendMessage(message); } } /** * websocket 开启连接 */ @OnOpen public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("machineId") String machineId) { try { this.tenantId = tenantId; this.machineId = machineId; this.socketId = tenantId + machineId; this.session = session; machinePool.put(this.socketId, this); getTenantPool(tenantId).put(this.machineId, this); log.info("【drySocket】有新的连接,总数为:" + machinePool.size()); } catch (Exception ignored) { } } /** * websocket 断开连接 */ @OnClose public void onClose() { try { machinePool.remove(this.socketId); getTenantPool(this.tenantId).remove(this.machineId); log.info("【drySocket】连接断开,总数为:" + machinePool.size()); } catch (Exception ignored) { } } /** * websocket 收到消息 */ @OnMessage public void onMessage(String message) { log.info("【drySocket】onMessage:" + message); IDryRealTimeDataService realTimeDataService = SpringContextUtils.getBean(IDryRealTimeDataService.class); JSONObject json; try { json = JSON.parseObject(message); } catch (Exception e) { log.warn("【drySocket】收到不合法的消息:" + message); return; } String type = json.getString(DrySocketConst.TYPE); switch (type) { // 心跳检测 case DrySocketConst.TYPE_HB: this.sendMessage(DrySocket.packageMessage(type, true)); break; // 实时数据处理 case DrySocketConst.TYPE_RDT: Object o = json.get(DrySocketConst.DATA); RealTimeDataVo realTimeDataVo = BeanUtil.toBean(o, RealTimeDataVo.class); realTimeDataService.realTimeDataHandle(realTimeDataVo); break; default: log.warn("【drySocket】收到不识别的消息类型:" + type); break; } } }