/** * websocket消息订阅处理 */ @Slf4j @Component @ServerEndpoint(value = "/socket/subscribe") @ToString public class SocketSubscribeEndPoint { /** * websocket连接数目 */ private static AtomicInteger WEBSOCKET_CONNECTION_NUM = new AtomicInteger(); /** * socket路径携带认证码参数名称 */ private static final String TOKEN_KEY = "token"; @Getter private Session session; @Getter private String username; /** * 订阅的topic主题集合 */ @Getter private List<String> socketTopics; /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ @Getter private static final CopyOnWriteArraySet<SocketSubscribeEndPoint> WEB_SOCKET_SET = new CopyOnWriteArraySet<>(); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { try { // 如果Session携带token,验证token获取用户名 String token = session.getPathParameters().get(TOKEN_KEY); if (StringUtils.isNotBlank(token)) { UserAuthenService userAuthenService = SpringUtil.getBean(UserAuthenService.class); this.username = userAuthenService.getUsername(token); } this.session = session; // 连接数+1 int num = WEBSOCKET_CONNECTION_NUM.addAndGet(1); log.info(username +"开开心心进入了直播间,当前人数:{}", num); } catch (Exception e) { log.error("websocket连接处理错误", e); } } /** * 连接成功后收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message) { if(StringUtils.isBlank(message)){ log.info("客户端发送空字符串消息"); return; } Object obj = JSONObject.parse(message); try { // 多个topic if (obj instanceof JSONArray) { log.debug("接收客户端订阅消息:{}", message); // 设置多个topic集合 this.socketTopics = JSONObject.parseArray(message, String.class); //将此webSocket对象放入set集合 WEB_SOCKET_SET.add(this); //回复客户端消息订阅成功 this.session.getAsyncRemote() .sendText(JSONObject.toJSONString( SubscribePushEntity.builder() .type(SubscribePushEntity.SUB_RESULT_MESSAGE_TYPE) .content(ResponseData.success("订阅成功")) .build())); log.info("订阅成功:{}", this); return; } // 单个topic 不做处理 此场景可用多个topic兼容 if(obj instanceof JSONObject){ log.info("客户端发送JSON对象消息:{}",message); // WEB_SOCKET_SET.add(this); return; } log.info("客户端发送其它消息:{}",message); } catch (Exception e) { log.error("订阅websocket消息处理错误", e); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { //移除当前连接用户所有订阅信息 WEB_SOCKET_SET.remove(this); //在线数减1 int num = WEBSOCKET_CONNECTION_NUM.decrementAndGet(); log.info(username + "骂骂咧咧离开了直播间,当前人数:{}", num); } /** * 发生错误时调用 * * @param error 错误对象 */ @OnError public void one rror(Throwable error) { log.error("websocket处理发生错误", error); } @Override public int hashCode() { return super.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof SocketSubscribeEndPoint)){ return false; } SocketSubscribeEndPoint socketSubscribeEndPoint = (SocketSubscribeEndPoint)obj; return this.session == socketSubscribeEndPoint.session; } }
/** * 订阅消息回复消息协议实体类 * */ @Data @Builder @ToString public class SubscribePushEntity implements Serializable { private static final long serialVersionUID = 1L; /** * 消息类型:1、订阅过程消息、2、业务消息 */ public static final int SUB_RESULT_MESSAGE_TYPE = 1; public static final int BUSINESS_MESSAGE_TYPE = 2; /** * 回复消息类型 */ private int type ; /** * 订阅的topic */ private String topic; /** * 消息内容 */ private Object content; }
/** * 订阅消息处理逻辑 */ @Service @Slf4j public class SubscribeServiceImpl implements SocketSubscribeService { @Override public void sendMessage(String topic, Object content) { CopyOnWriteArraySet<SocketSubscribeEndPoint> socketSubscribeEndPoints = SocketSubscribeEndPoint.getWEB_SOCKET_SET(); for (SocketSubscribeEndPoint socketSubscribeEndPoint : socketSubscribeEndPoints) { // 匹配topic 组装订阅消息回复消息协议实体类 发送消息 Pattern pattern = Pattern.compile("^"+topic.replace("+","[0-9a-zA-Z]+")+"$"); List<String> sendTopics = socketSubscribeEndPoint.getSocketTopics().stream().filter(s->pattern.matcher(s).matches()).collect(Collectors.toList()); if(!CollectionUtils.isEmpty(sendTopics) && sendTopics.size() > 0 ){ try { socketSubscribeEndPoint.getSession() .getAsyncRemote().sendText( JSONObject.toJSONString( SubscribePushEntity .builder() .topic(sendTopics.get(0)) .type(SubscribePushEntity.BUSINESS_MESSAGE_TYPE) .content(content) .build())); }catch (Exception e){ log.error("发送socket消息失败",e); } } } } }
/** * kafka进行websocket消息分发,确保双机情况下客户端订阅任意一台服务器的socket都能收到消息 */ @Slf4j @Component public class KafkaWebSocketPublisher { @Value(value = "${kafka.websocket.topic}") private String topic; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Autowired private SocketSubscribeService socketSubscribeService; private static final String TOPIC = "topic"; private static final String CONTENT = "content"; /** * 服务器websocket消息发送到kafka * * @param websocketTopic * @param content */ @Async public void sendMessage(String websocketTopic, MessageContent content) { JSONObject map = new JSONObject(); map.put(TOPIC, websocketTopic); map.put(CONTENT, content); kafkaTemplate.send(topic, map.toJSONString()); } /** * 监听kafka中的websocket消息 发送到对应的socket * @param record */ public void sendWebsocket(ConsumerRecord<?, ?> record) { JSONObject map = JSONObject.parseObject((String) record.value()); socketSubscribeService.sendMessage((String) map.get(TOPIC), map.get(CONTENT)); } }
/** * kafka监听消费 */ @Slf4j @Component @AllArgsConstructor public class KafkaConsumer { private KafkaStrategyContext kafkaStrategyContext; private KafkaWebSocketPublisher kafkaWebSocketPublisher; private SocketTransferService socketTransferService; private DailyExportThreadManager dailyExportThreadManager; /** * 消费监听 */ @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}") public void onMessage1(List<ConsumerRecord<?, ?>> record) { log.info("本次批量处理kafka消息数:{}", record.size()); kafkaStrategyContext.converterData(record); } /** * websocket监听 */ @KafkaListener(topics = "#{'${kafka.websocket.topic}'}", groupId = "#{'${kafka.websocket.consumer}'}") public void websocket(List<ConsumerRecord<?, ?>> record) { log.info("本次批量处理kafka消息数:{}", record.size()); for (ConsumerRecord<?, ?> consumerRecord : record) { // 消费的哪个topic、partition的消息,打印出消息内容 log.info("kafka websocket消息:" + consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.value()); // 监听kafka中的websocket消息 发送到对应的socket kafkaWebSocketPublisher.sendWebsocket(consumerRecord); } } }
/** * 用户订阅websocket消息格式,前端后台socket消息协议 */ @Data @Builder @ToString public class MessageContent { @JsonFormat(pattern = "HH:mm:ss", timezone = "GMT+8") private Date time; /** * 发送消息体 */ private Object message; /** * 消息类型 */ private String messageType; }
// 实际业务处理,发送websocket消息 public class XXXServiceServiceImpl extends ServiceImpl<XXXMapper, XXXEntity> implements XXXService { @Override public void sendWebSocketMessage(String xxxMessage) { kafkaWebSocketPublisher.sendMessage("xxxtopic", MessageContent.builder() .time(new Date()) .messageType("xxxType") .message(xxxMessage) .build()); }
//前端 // websocket 初始化 websocketInit () { try { this.wsUrl = process.env.NODE_ENV === 'development' ? process.env.VUE_APP_BASE_API.replace('http', 'ws') + '/api/socket/subscribe' // 本地开发 : 'ws://' + location.host + '/api/socket/subscribe' // 线上部署 this.ws = new WebSocket(this.wsUrl) // 连接成功 this.ws.onopen = () => { console.log( 'socket连接成功' ) this.ws.send(JSON.stringify([`xxxtopic`])) this.refreshTime = setInterval(() => { this.ws.send(JSON.stringify([`xxxtopic`])) }, 30 * 1000) } // 报错 this.ws.onerror = err => { console.log( '%c err', err ) // ws 重连 this.reconnect() } // 接收消息 this.ws.onmessage = msg => { const message = JSON.parse(msg.data) // console.log(message) switch (+message.type) { case 1: break case 2: const obj = {} // 替换 message.content.message.replace( /([^(, ]+)=([^, )]+)/g, (_, k, v) => (obj[k] = v) ) const { progress, describe } = obj || {} this.progress = progress * 100 this.describe = describe } } } catch (error) { console.log(error) } }, // 重连 reconnect () { if (this.lockReconnect) return this.lockReconnect = true // 设置重连次数 if (this.reconnectNumber < 5) { this.reconnectTimer = setTimeout(() => { console.log( 'websocket重连......!' ) this.websocketInit() this.lockReconnect = false this.reconnectNumber += 1 }, 10 * 1000) } else { clearTimeout(this.reconnectTimer) } }
标签:websocket,log,private,kafka,topic,消息,双机,public From: https://www.cnblogs.com/axibug/p/16790628.html