From 07df3edd01d77d77d2653a6a9ee325e53e607955 Mon Sep 17 00:00:00 2001 From: bsw215583320 <baoshiwei121@163.com> Date: 星期一, 04 十二月 2023 08:31:09 +0800 Subject: [PATCH] 增加opc设备维护和控制 --- jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java | 296 ++++++++++++++++++++++------------------------------------ 1 files changed, 114 insertions(+), 182 deletions(-) diff --git a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java index 6103217..8b908b6 100644 --- a/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java +++ b/jeecg-module-dry/jeecg-module-dry-biz/src/main/java/org/jeecg/modules/dry/websocket/DrySocket.java @@ -1,214 +1,146 @@ package org.jeecg.modules.dry.websocket; -import cn.hutool.core.bean.BeanUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.jeecg.common.constant.DrySocketConst; -import org.jeecg.common.util.SpringContextUtils; -import org.jeecg.modules.dry.service.IDryRealTimeDataService; -import org.jeecg.modules.dry.vo.RealTimeDataVo; -import org.springframework.beans.factory.annotation.Autowired; +import org.jeecg.common.constant.WebsocketConst; import org.springframework.stereotype.Component; -import javax.websocket.OnClose; -import javax.websocket.OnMessage; -import javax.websocket.OnOpen; -import javax.websocket.Session; +import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.Collection; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; -@Slf4j +/** + * @Author scott + * @Date 2019/11/29 9:41 + * @Description: 姝ゆ敞瑙g浉褰撲簬璁剧疆璁块棶URL + */ @Component -@ServerEndpoint("/drySocket/{tenantId}/{machineId}") +@Slf4j +@ServerEndpoint("/drySocket/{tenantId}") public class DrySocket { + + /**绾跨▼瀹夊叏Map*/ + private static ConcurrentHashMap<String, Session> sessionPool = new ConcurrentHashMap<>(); - /** - * 褰撳墠 session - */ - private Session session; - /** - * 褰撳墠绉熸埛id - */ - private String tenantId; - /** - * 璁惧id锛岀敤浜庢爣璇嗗悓涓�绉熸埛锛屼笉鍚岃澶囩殑鏁版嵁 - */ - private String machineId; - /** - * 褰撳墠socket鍞竴id - */ - private String socketId; - /** - * 绉熸埛杩炴帴姹狅紝鍖呭惈鍗曚釜绉熸埛鐨勬墍鏈塻ocket杩炴帴锛� - * 鍥犱负涓�涓鎴峰彲鑳芥墦寮�澶氫釜璁惧锛屽涓澶囧氨浼氭湁澶氫釜杩炴帴锛� - * key鏄痶enantId锛寁alue鏄疢ap瀵硅薄锛涘瓙Map鐨刱ey鏄痬achineId锛寁alue鏄痙rySocket瀵硅薄 - */ - private static Map<String, Map<String, DrySocket>> tenantPool = new HashMap<>(); - /** - * 杩炴帴姹狅紝鍖呭惈鎵�鏈塛ebSocket杩炴帴锛� - * key鏄痵ocketId锛寁alue鏄痙rySocket瀵硅薄 - */ - private static Map<String, DrySocket> machinePool = new HashMap<>(); - /** - * 鑾峰彇鏌愪釜绉熸埛鎵�鏈夌殑璁惧 - */ - public static Map<String, DrySocket> getTenantPool(String tenantId) { - return tenantPool.computeIfAbsent(tenantId, k -> new HashMap<>(5)); - } - /** - * 鍚戝綋鍓嶇鎴峰彂閫佹秷鎭� - * - * @param message 娑堟伅鍐呭 - */ - public void sendMessage(String message) { - try { - this.session.getAsyncRemote().sendText(message); - } catch (Exception e) { - log.error("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細" + e.getMessage()); - } - } - - /** - * 灏佽娑堟伅json - * - * @param data 娑堟伅鍐呭 - */ - public static String packageMessage(String type, Object data) { - JSONObject message = new JSONObject(); - message.put(DrySocketConst.TYPE, type); - message.put(DrySocketConst.DATA, data); - return message.toJSONString(); - } - - /** - * 鍚戞寚瀹氱鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭� - * - * @param tenantId 鎺ユ敹娑堟伅鐨勭鎴稩D - * @param message 娑堟伅鍐呭 - */ - public static void sendMessageTo(String tenantId, String message) { - Collection<DrySocket> values = getTenantPool(tenantId).values(); - if (values.size() > 0) { - for (DrySocket socketItem : values) { - socketItem.sendMessage(message); - } - } else { - log.warn("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細tenantId\"" + tenantId + "\"涓嶅瓨鍦ㄦ垨鏈湪绾匡紒"); - } - } - - /** - * 鍚戞寚瀹氱鎴风殑鎸囧畾璁惧鍙戦�佹秷鎭� - * - * @param tenantId 鎺ユ敹娑堟伅鐨勭鎴稩D - * @param message 娑堟伅鍐呭 - */ - public static void sendMessageTo(String tenantId, String machineId, String message) { - DrySocket socketItem = getTenantPool(tenantId).get(machineId); - if (socketItem != null) { - socketItem.sendMessage(message); - } else { - log.warn("銆恉rySocket銆戞秷鎭彂閫佸け璐ワ細tenantId\"" + tenantId + "\"鐨刴achineId\"" + machineId + "\"涓嶅瓨鍦ㄦ垨鏈湪绾匡紒"); - } - } - - /** - * 鍚戝涓鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭� - * - * @param tenantIds 鎺ユ敹娑堟伅鐨勭鎴稩D鏁扮粍 - * @param message 娑堟伅鍐呭 - */ - public static void sendMessageTo(String[] tenantIds, String message) { - for (String tenantId : tenantIds) { - DrySocket.sendMessageTo(tenantId, message); - } - } - - /** - * 鍚戞墍鏈夌鎴风殑鎵�鏈夎澶囧彂閫佹秷鎭� - * - * @param message 娑堟伅鍐呭 - */ - public static void sendMessageToAll(String message) { - for (DrySocket socketItem : machinePool.values()) { - socketItem.sendMessage(message); - } - } - - /** - * websocket 寮�鍚繛鎺� - */ + //==========銆恮ebsocket鎺ュ彈銆佹帹閫佹秷鎭瓑鏂规硶 鈥斺�� 鍏蜂綋鏈嶅姟鑺傜偣鎺ㄩ�亀s娑堟伅銆�======================================================================================== @OnOpen - public void onOpen(Session session, @PathParam("tenantId") String tenantId, @PathParam("machineId") String machineId) { + public void onOpen(Session session, @PathParam(value = "tenantId") String tenantId) { try { - this.tenantId = tenantId; - this.machineId = machineId; - this.socketId = tenantId + machineId; - this.session = session; - - machinePool.put(this.socketId, this); - getTenantPool(tenantId).put(this.machineId, this); - - log.info("銆恉rySocket銆戞湁鏂扮殑杩炴帴锛屾�绘暟涓�:" + machinePool.size()); - } catch (Exception ignored) { + sessionPool.put(tenantId, session); + log.info("銆愮郴缁� WebSocket銆戞湁鏂扮殑杩炴帴锛屾�绘暟涓�:" + sessionPool.size()); + } catch (Exception e) { } } - /** - * websocket 鏂紑杩炴帴 - */ @OnClose - public void onClose() { + public void onClose(@PathParam("tenantId") String tenantId) { try { - machinePool.remove(this.socketId); - getTenantPool(this.tenantId).remove(this.machineId); - - log.info("銆恉rySocket銆戣繛鎺ユ柇寮�锛屾�绘暟涓�:" + machinePool.size()); - } catch (Exception ignored) { + sessionPool.remove(tenantId); + log.info("銆愮郴缁� WebSocket銆戣繛鎺ユ柇寮�锛屾�绘暟涓�:" + sessionPool.size()); + } catch (Exception e) { + e.printStackTrace(); } } /** - * websocket 鏀跺埌娑堟伅 + * ws鎺ㄩ�佹秷鎭� + * + * @param tenantId + * @param message + */ + public void pushMessage(String tenantId, String message) { + for (Map.Entry<String, Session> item : sessionPool.entrySet()) { + //userId key鍊�= {鐢ㄦ埛id + "_"+ 鐧诲綍token鐨刴d5涓瞹 + //TODO vue2鏈敼key鏂拌鍒欙紝鏆傛椂涓嶅奖鍝嶉�昏緫 + if (item.getKey().contains(tenantId)) { + Session session = item.getValue(); + try { + //update-begin-author:taoyan date:20211012 for: websocket鎶ラ敊 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU + synchronized (session){ + log.info("銆愮郴缁� WebSocket銆戞帹閫佸崟浜烘秷鎭�:" + message); + session.getBasicRemote().sendText(message); + } + //update-end-author:taoyan date:20211012 for: websocket鎶ラ敊 https://gitee.com/jeecg/jeecg-boot/issues/I4C0MU + } catch (Exception e) { + log.error(e.getMessage(),e); + } + } + } + } + + /** + * ws閬嶅巻缇ゅ彂娑堟伅 + */ + public void pushMessage(String message) { + try { + for (Map.Entry<String, Session> item : sessionPool.entrySet()) { + try { + item.getValue().getAsyncRemote().sendText(message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + log.info("銆愮郴缁� WebSocket銆戠兢鍙戞秷鎭�:" + message); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + + + /** + * ws鎺ュ彈瀹㈡埛绔秷鎭� */ @OnMessage - public void onMessage(String message) { - log.info("銆恉rySocket銆憃nMessage:" + message); - IDryRealTimeDataService realTimeDataService = SpringContextUtils.getBean(IDryRealTimeDataService.class); - JSONObject json; - try { - json = JSON.parseObject(message); - } catch (Exception e) { - log.warn("銆恉rySocket銆戞敹鍒颁笉鍚堟硶鐨勬秷鎭�:" + message); - return; + public void onMessage(String message, @PathParam(value = "tenantId") String tenantId) { + if(!"ping".equals(message) && !WebsocketConst.CMD_CHECK.equals(message)){ + log.info("銆愮郴缁� WebSocket銆戞敹鍒板鎴风娑堟伅:" + message); + }else{ + log.debug("銆愮郴缁� WebSocket銆戞敹鍒板鎴风娑堟伅:" + message); } - String type = json.getString(DrySocketConst.TYPE); - switch (type) { - // 蹇冭烦妫�娴� - case DrySocketConst.TYPE_HB: - this.sendMessage(DrySocket.packageMessage(type, true)); - break; - // 瀹炴椂鏁版嵁澶勭悊 - case DrySocketConst.TYPE_RDT: - Object o = json.get(DrySocketConst.DATA); - RealTimeDataVo realTimeDataVo = BeanUtil.toBean(o, RealTimeDataVo.class); - realTimeDataService.realTimeDataHandle(realTimeDataVo); - break; - - default: - log.warn("銆恉rySocket銆戞敹鍒颁笉璇嗗埆鐨勬秷鎭被鍨�:" + type); - break; - } - - + +// //------------------------------------------------------------------------------ +// JSONObject obj = new JSONObject(); +// //涓氬姟绫诲瀷 +// obj.put(WebsocketConst.MSG_CMD, WebsocketConst.CMD_CHECK); +// //娑堟伅鍐呭 +// obj.put(WebsocketConst.MSG_TXT, "蹇冭烦鍝嶅簲"); +// this.pushMessage(userId, obj.toJSONString()); +// //------------------------------------------------------------------------------ } + /** + * 閰嶇疆閿欒淇℃伅澶勭悊 + * + * @param session + * @param t + */ + @OnError + public void onError(Session session, Throwable t) { + log.warn("銆愮郴缁� WebSocket銆戞秷鎭嚭鐜伴敊璇�"); + t.printStackTrace(); + } + //==========銆愮郴缁� WebSocket鎺ュ彈銆佹帹閫佹秷鎭瓑鏂规硶 鈥斺�� 鍏蜂綋鏈嶅姟鑺傜偣鎺ㄩ�亀s娑堟伅銆�======================================================================================== + -} + //==========銆愰噰鐢╮edis鍙戝竷璁㈤槄妯″紡鈥斺�旀帹閫佹秷鎭��======================================================================================== + + + + /** + * 姝や负鍗曠偣娑堟伅(澶氫汉) redis + * + * @param userIds + * @param message + */ +// public void sendMessage(String[] userIds, String message) { +// for (String userId : userIds) { +// sendMessage(userId, message); +// } +// } + //=======銆愰噰鐢╮edis鍙戝竷璁㈤槄妯″紡鈥斺�旀帹閫佹秷鎭��========================================================================================== + +} \ No newline at end of file -- Gitblit v1.9.3