| | |
| | | package org.jeecg.modules.dry.websocket; |
| | | |
| | | import cn.hutool.core.bean.BeanUtil; |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.jeecg.common.constant.DrySocketConst; |
| | | import org.jeecg.common.util.SpringContextUtils; |
| | | import org.jeecg.modules.dry.service.IDryRealTimeDataService; |
| | | import org.jeecg.modules.dry.vo.RealTimeDataVo; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.jeecg.common.constant.WebsocketConst; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.websocket.OnClose; |
| | | import javax.websocket.OnMessage; |
| | | import javax.websocket.OnOpen; |
| | | import javax.websocket.Session; |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | @Slf4j |
| | | /** |
| | | * @Author scott |
| | | * @Date 2019/11/29 9:41 |
| | | * @Description: 此注解相当于设置访问URL |
| | | */ |
| | | @Component |
| | | @ServerEndpoint("/drySocket/{tenantId}/{machineId}") |
| | | @Slf4j |
| | | @ServerEndpoint("/drySocket/{tenantId}") |
| | | public class DrySocket { |
| | | |
| | | /**线程安全Map*/ |
| | | private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>(); |
| | | |
| | | /** |
| | | * 当前 session |
| | | */ |
| | | private Session session; |
| | | /** |
| | | * 当前租户id |
| | | */ |
| | | private String tenantId; |
| | | /** |
| | | * 设备id,用于标识同一租户,不同设备的数据 |
| | | */ |
| | | private String machineId; |
| | | /** |
| | | * 当前socket唯一id |
| | | */ |
| | | private String socketId; |
| | | |
| | | /** |
| | | * 租户连接池,包含单个租户的所有socket连接; |
| | | * 因为一个租户可能打开多个设备,多个设备就会有多个连接; |
| | | * key是tenantId,value是Map对象;子Map的key是machineId,value是drySocket对象 |
| | | */ |
| | | private static Map<String, Map<String, DrySocket>> tenantPool = new HashMap<>(); |
| | | /** |
| | | * 连接池,包含所有WebSocket连接; |
| | | * key是socketId,value是drySocket对象 |
| | | */ |
| | | private static Map<String, DrySocket> machinePool = new HashMap<>(); |
| | | |
| | | /** |
| | | * 获取某个租户所有的设备 |
| | | */ |
| | | public static Map<String, DrySocket> getTenantPool(String tenantId) { |
| | | return tenantPool.computeIfAbsent(tenantId, k -> new HashMap<>(5)); |
| | | } |
| | | |
| | | /** |
| | | * 向当前租户发送消息 |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | | public void sendMessage(String message) { |
| | | try { |
| | | this.session.getAsyncRemote().sendText(message); |
| | | } catch (Exception e) { |
| | | log.error("【drySocket】消息发送失败:" + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 封装消息json |
| | | * |
| | | * @param data 消息内容 |
| | | */ |
| | | public static String packageMessage(String type, Object data) { |
| | | JSONObject message = new JSONObject(); |
| | | message.put(DrySocketConst.TYPE, type); |
| | | message.put(DrySocketConst.DATA, data); |
| | | return message.toJSONString(); |
| | | } |
| | | |
| | | /** |
| | | * 向指定租户的所有设备发送消息 |
| | | * |
| | | * @param tenantId 接收消息的租户ID |
| | | * @param message 消息内容 |
| | | */ |
| | | public static void sendMessageTo(String tenantId, String message) { |
| | | Collection<DrySocket> values = getTenantPool(tenantId).values(); |
| | | if (values.size() > 0) { |
| | | for (DrySocket socketItem : values) { |
| | | socketItem.sendMessage(message); |
| | | } |
| | | } else { |
| | | log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"不存在或未在线!"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向指定租户的指定设备发送消息 |
| | | * |
| | | * @param tenantId 接收消息的租户ID |
| | | * @param message 消息内容 |
| | | */ |
| | | public static void sendMessageTo(String tenantId, String machineId, String message) { |
| | | DrySocket socketItem = getTenantPool(tenantId).get(machineId); |
| | | if (socketItem != null) { |
| | | socketItem.sendMessage(message); |
| | | } else { |
| | | log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"的machineId\"" + machineId + "\"不存在或未在线!"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向多个租户的所有设备发送消息 |
| | | * |
| | | * @param tenantIds 接收消息的租户ID数组 |
| | | * @param message 消息内容 |
| | | */ |
| | | public static void sendMessageTo(String[] tenantIds, String message) { |
| | | for (String tenantId : tenantIds) { |
| | | DrySocket.sendMessageTo(tenantId, message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 向所有租户的所有设备发送消息 |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | | public static void sendMessageToAll(String message) { |
| | | for (DrySocket socketItem : machinePool.values()) { |
| | | socketItem.sendMessage(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * websocket 开启连接 |
| | | */ |
| | | //==========【websocket接受、推送消息等方法 —— 具体服务节点推送ws消息】======================================================================================== |
| | | @OnOpen |
| | | public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("machineId") String machineId) { |
| | | public void onOpen(Session session, @PathParam(value = "tenantId") String tenantId) { |
| | | try { |
| | | this.tenantId = tenantId; |
| | | this.machineId = machineId; |
| | | this.socketId = tenantId + machineId; |
| | | this.session = session; |
| | | |
| | | machinePool.put(this.socketId, this); |
| | | getTenantPool(tenantId).put(this.machineId, this); |
| | | |
| | | log.info("【drySocket】有新的连接,总数为:" + machinePool.size()); |
| | | } catch (Exception ignored) { |
| | | sessionPool.put(tenantId, session); |
| | | log.info("【系统 WebSocket】有新的连接,总数为:" + sessionPool.size()); |
| | | } catch (Exception e) { |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * websocket 断开连接 |
| | | */ |
| | | @OnClose |
| | | public void onClose() { |
| | | public void onClose(@PathParam("tenantId") String tenantId) { |
| | | try { |
| | | machinePool.remove(this.socketId); |
| | | getTenantPool(this.tenantId).remove(this.machineId); |
| | | |
| | | log.info("【drySocket】连接断开,总数为:" + machinePool.size()); |
| | | } catch (Exception ignored) { |
| | | sessionPool.remove(tenantId); |
| | | log.info("【系统 WebSocket】连接断开,总数为:" + sessionPool.size()); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * websocket 收到消息 |
| | | * ws推送消息 |
| | | * |
| | | * @param tenantId |
| | | * @param message |
| | | */ |
| | | public void pushMessage(String tenantId, String message) { |
| | | for (Map.Entry<String, Session> item : sessionPool.entrySet()) { |
| | | //userId key值= {用户id + "_"+ 登录token的md5串} |
| | | //TODO vue2未改key新规则,暂时不影响逻辑 |
| | | if (item.getKey().contains(tenantId)) { |
| | | Session session = item.getValue(); |
| | | try { |
| | | //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU |
| | | synchronized (session){ |
| | | log.info("【系统 WebSocket】推送单人消息:" + message); |
| | | session.getBasicRemote().sendText(message); |
| | | } |
| | | //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage(),e); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * ws遍历群发消息 |
| | | */ |
| | | public void pushMessage(String message) { |
| | | try { |
| | | for (Map.Entry<String, Session> item : sessionPool.entrySet()) { |
| | | try { |
| | | item.getValue().getAsyncRemote().sendText(message); |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | log.info("【系统 WebSocket】群发消息:" + message); |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * ws接受客户端消息 |
| | | */ |
| | | @OnMessage |
| | | public void onMessage(String message) { |
| | | log.info("【drySocket】onMessage:" + message); |
| | | IDryRealTimeDataService realTimeDataService = SpringContextUtils.getBean(IDryRealTimeDataService.class); |
| | | JSONObject json; |
| | | try { |
| | | json = JSON.parseObject(message); |
| | | } catch (Exception e) { |
| | | log.warn("【drySocket】收到不合法的消息:" + message); |
| | | return; |
| | | public void onMessage(String message, @PathParam(value = "tenantId") String tenantId) { |
| | | if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){ |
| | | log.info("【系统 WebSocket】收到客户端消息:" + message); |
| | | }else{ |
| | | log.debug("【系统 WebSocket】收到客户端消息:" + message); |
| | | } |
| | | String type = json.getString(DrySocketConst.TYPE); |
| | | switch (type) { |
| | | // 心跳检测 |
| | | case DrySocketConst.TYPE_HB: |
| | | this.sendMessage(DrySocket.packageMessage(type, true)); |
| | | break; |
| | | // 实时数据处理 |
| | | case DrySocketConst.TYPE_RDT: |
| | | Object o = json.get(DrySocketConst.DATA); |
| | | RealTimeDataVo realTimeDataVo = BeanUtil.toBean(o, RealTimeDataVo.class); |
| | | realTimeDataService.realTimeDataHandle(realTimeDataVo); |
| | | break; |
| | | |
| | | default: |
| | | log.warn("【drySocket】收到不识别的消息类型:" + type); |
| | | break; |
| | | } |
| | | |
| | | |
| | | |
| | | // //------------------------------------------------------------------------------ |
| | | // JSONObject obj = new JSONObject(); |
| | | // //业务类型 |
| | | // obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK); |
| | | // //消息内容 |
| | | // obj.put(WebsocketConst.MSG_TXT, "心跳响应"); |
| | | // this.pushMessage(userId, obj.toJSONString()); |
| | | // //------------------------------------------------------------------------------ |
| | | } |
| | | |
| | | /** |
| | | * 配置错误信息处理 |
| | | * |
| | | * @param session |
| | | * @param t |
| | | */ |
| | | @OnError |
| | | public void onError(Session session, Throwable t) { |
| | | log.warn("【系统 WebSocket】消息出现错误"); |
| | | t.printStackTrace(); |
| | | } |
| | | //==========【系统 WebSocket接受、推送消息等方法 —— 具体服务节点推送ws消息】======================================================================================== |
| | | |
| | | |
| | | } |
| | | //==========【采用redis发布订阅模式——推送消息】======================================================================================== |
| | | |
| | | |
| | | |
| | | /** |
| | | * 此为单点消息(多人) redis |
| | | * |
| | | * @param userIds |
| | | * @param message |
| | | */ |
| | | // public void sendMessage(String[] userIds, String message) { |
| | | // for (String userId : userIds) { |
| | | // sendMessage(userId, message); |
| | | // } |
| | | // } |
| | | //=======【采用redis发布订阅模式——推送消息】========================================================================================== |
| | | |
| | | } |