干燥机配套车间生产管理系统/云平台服务端
zhuguifei
2024-11-29 339515558253d776769dc2e2560bbb4a0450c989
jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java
old mode 100644 new mode 100755
@@ -1,214 +1,146 @@
package org.jeecg.modules.dry.websocket;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.DrySocketConst;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.dry.service.IDryRealTimeDataService;
import org.jeecg.modules.dry.vo.RealTimeDataVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.jeecg.common.constant.WebsocketConst;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
/**
 * @Author scott
 * @Date 2019/11/29 9:41
 * @Description: 此注解相当于设置访问URL
 */
@Component
@ServerEndpoint("/drySocket/{tenantId}/{machineId}")
@Slf4j
@ServerEndpoint("/drySocket/{tenantId}")
public class DrySocket {
    /**线程安全Map*/
    private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>();
    /**
     * 当前 session
     */
    private Session session;
    /**
     * 当前租户id
     */
    private String tenantId;
    /**
     * 设备id,用于标识同一租户,不同设备的数据
     */
    private String machineId;
    /**
     * 当前socket唯一id
     */
    private String socketId;
    /**
     * 租户连接池,包含单个租户的所有socket连接;
     * 因为一个租户可能打开多个设备,多个设备就会有多个连接;
     * key是tenantId,value是Map对象;子Map的key是machineId,value是drySocket对象
     */
    private static Map<String, Map<String, DrySocket>> tenantPool = new HashMap<>();
    /**
     * 连接池,包含所有WebSocket连接;
     * key是socketId,value是drySocket对象
     */
    private static Map<String, DrySocket> machinePool = new HashMap<>();
    /**
     * 获取某个租户所有的设备
     */
    public static Map<String, DrySocket> getTenantPool(String tenantId) {
        return tenantPool.computeIfAbsent(tenantId, k -> new HashMap<>(5));
    }
    /**
     * 向当前租户发送消息
     *
     * @param message 消息内容
     */
    public void sendMessage(String message) {
        try {
            this.session.getAsyncRemote().sendText(message);
        } catch (Exception e) {
            log.error("【drySocket】消息发送失败:" + e.getMessage());
        }
    }
    /**
     * 封装消息json
     *
     * @param data 消息内容
     */
    public static String packageMessage(String type, Object data) {
        JSONObject message = new JSONObject();
        message.put(DrySocketConst.TYPE, type);
        message.put(DrySocketConst.DATA, data);
        return message.toJSONString();
    }
    /**
     * 向指定租户的所有设备发送消息
     *
     * @param tenantId  接收消息的租户ID
     * @param message 消息内容
     */
    public static void sendMessageTo(String tenantId, String message) {
        Collection<DrySocket> values = getTenantPool(tenantId).values();
        if (values.size() > 0) {
            for (DrySocket socketItem : values) {
                socketItem.sendMessage(message);
            }
        } else {
            log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"不存在或未在线!");
        }
    }
    /**
     * 向指定租户的指定设备发送消息
     *
     * @param tenantId  接收消息的租户ID
     * @param message 消息内容
     */
    public static void sendMessageTo(String tenantId, String machineId, String message) {
        DrySocket socketItem = getTenantPool(tenantId).get(machineId);
        if (socketItem != null) {
            socketItem.sendMessage(message);
        } else {
            log.warn("【drySocket】消息发送失败:tenantId\"" + tenantId + "\"的machineId\"" + machineId + "\"不存在或未在线!");
        }
    }
    /**
     * 向多个租户的所有设备发送消息
     *
     * @param tenantIds 接收消息的租户ID数组
     * @param message 消息内容
     */
    public static void sendMessageTo(String[] tenantIds, String message) {
        for (String tenantId : tenantIds) {
            DrySocket.sendMessageTo(tenantId, message);
        }
    }
    /**
     * 向所有租户的所有设备发送消息
     *
     * @param message 消息内容
     */
    public static void sendMessageToAll(String message) {
        for (DrySocket socketItem : machinePool.values()) {
            socketItem.sendMessage(message);
        }
    }
    /**
     * websocket 开启连接
     */
    //==========【websocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
    @OnOpen
    public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("machineId") String machineId) {
    public void onOpen(Session session, @PathParam(value = "tenantId") String tenantId) {
        try {
            this.tenantId = tenantId;
            this.machineId = machineId;
            this.socketId = tenantId + machineId;
            this.session = session;
            machinePool.put(this.socketId, this);
            getTenantPool(tenantId).put(this.machineId, this);
            log.info("【drySocket】有新的连接,总数为:" + machinePool.size());
        } catch (Exception ignored) {
            sessionPool.put(tenantId, session);
            log.info("【系统 WebSocket】有新的连接,总数为:" + sessionPool.size());
        } catch (Exception e) {
        }
    }
    /**
     * websocket 断开连接
     */
    @OnClose
    public void onClose() {
    public void onClose(@PathParam("tenantId") String tenantId) {
        try {
            machinePool.remove(this.socketId);
            getTenantPool(this.tenantId).remove(this.machineId);
            log.info("【drySocket】连接断开,总数为:" + machinePool.size());
        } catch (Exception ignored) {
            sessionPool.remove(tenantId);
            log.info("【系统 WebSocket】连接断开,总数为:" + sessionPool.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * websocket 收到消息
     * ws推送消息
     *
     * @param tenantId
     * @param message
     */
    public void pushMessage(String tenantId, String message) {
        for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
            //userId key值= {用户id + "_"+ 登录token的md5串}
            //TODO vue2未改key新规则,暂时不影响逻辑
            if (item.getKey().contains(tenantId)) {
                Session session = item.getValue();
                try {
                    //update-begin-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                    synchronized (session){
                        log.info("【系统 WebSocket】推送单人消息:" + message);
                        session.getBasicRemote().sendText(message);
                    }
                    //update-end-author:taoyan date:20211012 for: websocket报错 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU
                } catch (Exception e) {
                    log.error(e.getMessage(),e);
                }
            }
        }
    }
    /**
     * ws遍历群发消息
     */
    public void pushMessage(String message) {
        try {
            for (Map.Entry<String, Session> item : sessionPool.entrySet()) {
                try {
                    item.getValue().getAsyncRemote().sendText(message);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }
            log.info("【系统 WebSocket】群发消息:" + message);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    /**
     * ws接受客户端消息
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("【drySocket】onMessage:" + message);
        IDryRealTimeDataService realTimeDataService = SpringContextUtils.getBean(IDryRealTimeDataService.class);
        JSONObject json;
        try {
            json = JSON.parseObject(message);
        } catch (Exception e) {
            log.warn("【drySocket】收到不合法的消息:" + message);
            return;
    public void onMessage(String message, @PathParam(value = "tenantId") String tenantId) {
        if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){
            log.info("【系统 WebSocket】收到客户端消息:" + message);
        }else{
            log.debug("【系统 WebSocket】收到客户端消息:" + message);
        }
        String type = json.getString(DrySocketConst.TYPE);
        switch (type) {
            // 心跳检测
            case DrySocketConst.TYPE_HB:
                this.sendMessage(DrySocket.packageMessage(type, true));
                break;
            // 实时数据处理
            case DrySocketConst.TYPE_RDT:
                Object o = json.get(DrySocketConst.DATA);
                RealTimeDataVo realTimeDataVo = BeanUtil.toBean(o, RealTimeDataVo.class);
                realTimeDataService.realTimeDataHandle(realTimeDataVo);
                break;
            default:
                log.warn("【drySocket】收到不识别的消息类型:" + type);
                break;
        }
//        //------------------------------------------------------------------------------
//        JSONObject obj = new JSONObject();
//        //业务类型
//        obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK);
//        //消息内容
//        obj.put(WebsocketConst.MSG_TXT, "心跳响应");
//        this.pushMessage(userId, obj.toJSONString());
//        //------------------------------------------------------------------------------
    }
    /**
     * 配置错误信息处理
     *
     * @param session
     * @param t
     */
    @OnError
    public void onError(Session session, Throwable t) {
        log.warn("【系统 WebSocket】消息出现错误");
        t.printStackTrace();
    }
    //==========【系统 WebSocket接受、推送消息等方法 —— 具体服务节点推送ws消息】========================================================================================
}
    //==========【采用redis发布订阅模式——推送消息】========================================================================================
    /**
     * 此为单点消息(多人) redis
     *
     * @param userIds
     * @param message
     */
//    public void sendMessage(String[] userIds, String message) {
//        for (String userId : userIds) {
//            sendMessage(userId, message);
//        }
//    }
    //=======【采用redis发布订阅模式——推送消息】==========================================================================================
}