1、添加依赖
<!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2、定义WebSocket的管理类
import com.alibaba.fastjson.JSON;import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.websocket.Session; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @desc: WebSocket管理器 **/ @Slf4j @Component public class WebSocketManager { // 维护手机端用户socket:key为用户的标识,每个用户重连时会覆盖Session。 private static Map<String, Session> users; // 维护屏端socket:key为屏端的标识。List<Session> 表名可以有多个相同的大屏,不覆盖Session。 private static Map<String,List<Session>> screens;public final static String FlAG_SCREEN = "screen_flag";public WebSocketManager(){ users = new ConcurrentHashMap<>(); screens = new ConcurrentHashMap<>(); } /** * 获取设备的标识 */ public static String getDeviceFlag(Integer activityId,String deviceFlag){ return activityId + "_" + deviceFlag; } /* * 添加session */ public static Boolean addSession(DeviceTypeEnum deviceType,Integer activityId,String deviceFlag,Session session){ if(deviceType == null || activityId == null || StringUtils.isBlank(deviceFlag) || session == null){ return Boolean.FALSE; } deviceFlag = getDeviceFlag(activityId, deviceFlag); if(DeviceTypeEnum.DEVICE_MOBILE.equals(deviceType)){ // 用户管理 users.put(deviceFlag,session); }else if(DeviceTypeEnum.DEVICE_SCREEN.equals(deviceType)){ synchronized (deviceFlag.intern()) { // 大屏session管理 List<Session> sessions = screens.get(deviceFlag); if (CollectionUtils.isEmpty(sessions)) { sessions = new ArrayList<>(); screens.put(deviceFlag, sessions); } sessions.add(session); } }return Boolean.TRUE; } /* * 删除session */ public static Boolean removeSession(DeviceTypeEnum deviceType,Integer activityId,String deviceFlag,Session session){ if(deviceType == null || activityId == null || StringUtils.isBlank(deviceFlag) || session == null){ return Boolean.FALSE; } deviceFlag = getDeviceFlag(activityId, deviceFlag); if(DeviceTypeEnum.DEVICE_MOBILE.equals(deviceType)){ // 用户管理 users.remove(deviceFlag,session); }else if(DeviceTypeEnum.DEVICE_SCREEN.equals(deviceType)){ // 大屏session管理 synchronized (deviceFlag.intern()) { List<Session> sessions = screens.get(deviceFlag);
// 因为有多个相同的大屏,所以需要删除指定的大屏 if (!CollectionUtils.isEmpty(sessions)) { Iterator<Session> iterator = sessions.iterator(); while (iterator.hasNext()) { Session next = iterator.next(); if (next.equals(session)) { iterator.remove(); } } } if(CollectionUtils.isEmpty(sessions)){ screens.remove(deviceFlag); } } }return Boolean.TRUE; } public static Map<String, Session> getUsers() { return users; } public static Map<String, List<Session>> getScreens() { return screens; }/** * @desc : 服务端发送消息到手机端(点对点消息) * @author: 毛会懂 * @create: 2019/9/30 17:34:00 **/ public static Boolean sendMessageToMobile(Integer activityId,String openId, SocketContentVO contentVO, String msgFlag){ if(activityId == null){ log.error("服务点发送消息到手机端(点对点消息)活动id为空"); return Boolean.FALSE; } if(StringUtils.isBlank(openId) || contentVO == null){ log.error("服务端发送消息到手机端(点对点消息) openId为空或发送的内容为空"); return Boolean.FALSE; } String openIdFlag = getDeviceFlag(activityId,openId); Session session = users.get(openIdFlag); if(session == null){ log.error("服务端发送消息到手机端(点对点消息) session == null"); return Boolean.FALSE; } String message = JSON.toJSONString(contentVO); if(!"mobileHeart".equals(contentVO.getCmd()) && !"operator".equals(contentVO.getCmd())) { log.info("{}:openId={},message={}", msgFlag, openIdFlag, message); } try { synchronized (session) { session.getBasicRemote().sendText(message); } return Boolean.TRUE; }catch (Exception ex){ log.error("服务端发送消息到手机端.点对点发送消息失败,发给openId={},msg={}",openIdFlag,message); return Boolean.FALSE; } } /** * @desc : 服务端发送消息到大屏端(点对点消息) * @author: 毛会懂 * @create: 2019/9/30 17:34:00 **/ public static Boolean sendMessageToScreen(String deviceId, SocketContentVO contentVO, String msgFlag, Session currentSession){ if(screens == null || screens.get(deviceId)==null || contentVO == null){ return Boolean.FALSE; } List<Session> sessionList = screens.get(deviceId); if(sessionList == null || sessionList.isEmpty()){ return Boolean.FALSE; } String message = JSON.toJSONString(contentVO); if(!("realRank".equals(contentVO.getCmd()) || "screenHeart".equals(contentVO.getCmd()))) { log.info("{}:大屏id={},message={}",msgFlag, deviceId, message); } for(Session session :sessionList) { if(session == currentSession) { try { synchronized (session) { session.getBasicRemote().sendText(message); } } catch (Exception ex) { log.error("服務端发送消息到大屏端,点对点发送消息失败,deviceId={},msg={}", deviceId, message); } } } return Boolean.TRUE; } /** * @desc : 服务端发送消息到大屏端(群发消息) * @author: 毛会懂 * @create: 2019/9/30 17:34:00 **/ public static Boolean sendMessageToAllScreen(String deviceId,SocketContentVO contentVO,String msgFlag){ if(screens == null || screens.get(deviceId)==null || contentVO == null){ return Boolean.FALSE; } List<Session> sessionList = screens.get(deviceId); if(sessionList == null || sessionList.isEmpty()){ return Boolean.FALSE; } String message = JSON.toJSONString(contentVO); if(!("realRank".equals(contentVO.getCmd()) || "screenHeart".equals(contentVO.getCmd()) || "operator".equals(contentVO.getCmd()))) { log.info("{}:大屏id={},message={}",msgFlag, deviceId, message); } for(Session session :sessionList) { try { synchronized (session) { session.getBasicRemote().sendText(message); } } catch (Exception ex) { log.error("服務端发送消息到大屏端,点对点发送消息失败,deviceId={},msg={}", deviceId, message); } } return Boolean.TRUE; } }
3、webSocket连接
import com.alibaba.fastjson.JSON;import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; /** * @desc: 服务端:webSocket连接粒度:activityId + 设备标识flag(如openId)
* deviceType: 设备类型:标识是手机端,还是大屏的连接 **/ @Slf4j @ServerEndpoint("/websocket/{deviceType}/{activityId}/{flag}") @Component @Getter @Setter @NonessentialSession public class WebSocketServer { //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; // 设备类型 private DeviceTypeEnum deviceType; // 活动id private Integer activityId; // 设备标识 private String deviceFlag; /** * @desc : 连接建立成功调用的方法 * @author: 毛会懂 * @create: 2019/10/17 14:26:00 **/ @OnOpen public void onOpen(Session session, @PathParam("deviceType") String deviceTypeStr, @PathParam("activityId") Integer activityId,@PathParam("flag") String deviceFlag) { this.session = session; log.info("客户端连接成功"); try { log.info("activityId={},deviceFlag={}",activityId,deviceFlag); this.deviceType = DeviceTypeEnum.getEnumByCode(deviceTypeStr); this.activityId = activityId; this.deviceFlag = deviceFlag; WebSocketManager.addSession(deviceType,activityId,deviceFlag,session); }catch (Exception ex){ log.error("连接建立异常",ex); throw ex; } } /** * @desc : 收到客户端消息后触发的方法 * @author: 毛会懂 * @create: 2019/10/17 14:25:00 **/ @OnMessage public void onMessage(String message) throws IOException { if (StringUtils.isEmpty(message)){ return ; } if(message.indexOf("heart") > 0){ // 心跳命令,返回原值 session.getBasicRemote().sendText(message); return; } log.info("来自客户端的消息:" + message); try { SocketContentParam socketContent = JSON.parseObject(message, SocketContentParam.class); // 根据设备类型,查找对应的设备 DeviceSelect deviceSelect = SpringContextUtil.getBean(DeviceSelect.class); IDevice device = deviceSelect.getDevice(deviceType); if(device == null){ return; } // 执行设备的命令 socketContent.setActivityId(activityId); socketContent.setUserCode(deviceFlag); device.execute(socketContent); }catch (Exception ex){ log.error("接收客户端的消息转换异常:",ex); } } /** * @desc : 连接关闭调用的方法 * @author: 毛会懂 * @create: 2019/10/17 14:24:00 **/ @OnClose public void onClose() { WebSocketManager.removeSession(deviceType,activityId,deviceFlag,session); } /** * @desc : 客户端非正常关闭引起的,此时需要在onError中调用onClose来关闭链接 * @author: 毛会懂 * @create: 2019/10/17 14:24:00 **/ @OnError public void one rror(Session session, Throwable error) { log.error("发生错误 session: "+session); this.onClose(); } }
标签:WebSocket,SpringBoot,代码,activityId,session,deviceFlag,import,return,message From: https://www.cnblogs.com/maohuidong/p/17097436.html