首页 > 其他分享 >kafka进行websocket消息分发,解决双机/集群情况下客户端与服务器的连接问题

kafka进行websocket消息分发,解决双机/集群情况下客户端与服务器的连接问题

时间:2022-10-14 09:57:20浏览次数:57  
标签:websocket log private kafka topic 消息 双机 public

 

/**
 * websocket消息订阅处理
 */
@Slf4j
@Component
@ServerEndpoint(value = "/socket/subscribe")
@ToString
public class SocketSubscribeEndPoint {

    /**
     * websocket连接数目
     */
    private static AtomicInteger WEBSOCKET_CONNECTION_NUM = new AtomicInteger();

    /**
     * socket路径携带认证码参数名称
     */
    private static final String TOKEN_KEY = "token";

    @Getter
    private Session session;

    @Getter
    private String username;
    /**
     * 订阅的topic主题集合
     */
    @Getter
    private List<String> socketTopics;

    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    @Getter
    private static final CopyOnWriteArraySet<SocketSubscribeEndPoint> WEB_SOCKET_SET = new CopyOnWriteArraySet<>();


    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        try {
            // 如果Session携带token,验证token获取用户名
            String token = session.getPathParameters().get(TOKEN_KEY);
            if (StringUtils.isNotBlank(token)) {
                UserAuthenService userAuthenService = SpringUtil.getBean(UserAuthenService.class);
                this.username = userAuthenService.getUsername(token);
            }
            this.session = session;
            // 连接数+1
            int num = WEBSOCKET_CONNECTION_NUM.addAndGet(1);
            log.info(username +"开开心心进入了直播间,当前人数:{}", num);
        } catch (Exception e) {
            log.error("websocket连接处理错误", e);
        }

    }

    /**
     * 连接成功后收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message) {
        if(StringUtils.isBlank(message)){
            log.info("客户端发送空字符串消息");
            return;
        }
        Object obj = JSONObject.parse(message);
        try {
            // 多个topic
            if (obj instanceof JSONArray) {
                log.debug("接收客户端订阅消息:{}", message);
                // 设置多个topic集合
                this.socketTopics = JSONObject.parseArray(message, String.class);
                //将此webSocket对象放入set集合
                WEB_SOCKET_SET.add(this);
                //回复客户端消息订阅成功
                this.session.getAsyncRemote()
                        .sendText(JSONObject.toJSONString(
                                SubscribePushEntity.builder()
                                        .type(SubscribePushEntity.SUB_RESULT_MESSAGE_TYPE)
                                        .content(ResponseData.success("订阅成功"))
                                        .build()));
                log.info("订阅成功:{}", this);
                return;
            }
            // 单个topic 不做处理 此场景可用多个topic兼容
            if(obj instanceof JSONObject){
                log.info("客户端发送JSON对象消息:{}",message);
                // WEB_SOCKET_SET.add(this);
                return;
            }
            log.info("客户端发送其它消息:{}",message);
        } catch (Exception e) {
            log.error("订阅websocket消息处理错误", e);
        }

    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        //移除当前连接用户所有订阅信息
        WEB_SOCKET_SET.remove(this);
        //在线数减1
        int num = WEBSOCKET_CONNECTION_NUM.decrementAndGet();
        log.info(username + "骂骂咧咧离开了直播间,当前人数:{}", num);
    }

    /**
     * 发生错误时调用
     *
     * @param error 错误对象
     */
    @OnError
    public void one rror(Throwable error) {
        log.error("websocket处理发生错误", error);
    }


    @Override
    public int hashCode() {
        return super.hashCode();
    }

    @Override
    public boolean equals(Object obj) {
        if(!(obj instanceof SocketSubscribeEndPoint)){
            return false;
        }
        SocketSubscribeEndPoint socketSubscribeEndPoint = (SocketSubscribeEndPoint)obj;
        return this.session == socketSubscribeEndPoint.session;
    }
}

 

 

/**
 * 订阅消息回复消息协议实体类
 * 
 */
@Data
@Builder
@ToString
public class SubscribePushEntity implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     *     消息类型:1、订阅过程消息、2、业务消息
     */
    public static final int SUB_RESULT_MESSAGE_TYPE = 1;
    public static final int BUSINESS_MESSAGE_TYPE = 2;

    /**
     * 回复消息类型
     */
    private int type ;
    /**
     * 订阅的topic
     */
    private String topic;

    /**
     * 消息内容
     */
    private Object content;
}

 

 

/**
 * 订阅消息处理逻辑
 */
@Service
@Slf4j
public class SubscribeServiceImpl implements SocketSubscribeService {


    @Override
    public void sendMessage(String topic, Object content) {
            CopyOnWriteArraySet<SocketSubscribeEndPoint> socketSubscribeEndPoints =  SocketSubscribeEndPoint.getWEB_SOCKET_SET();
            for (SocketSubscribeEndPoint socketSubscribeEndPoint : socketSubscribeEndPoints) {
                // 匹配topic 组装订阅消息回复消息协议实体类 发送消息
                Pattern pattern = Pattern.compile("^"+topic.replace("+","[0-9a-zA-Z]+")+"$");
                List<String> sendTopics = socketSubscribeEndPoint.getSocketTopics().stream().filter(s->pattern.matcher(s).matches()).collect(Collectors.toList());
                if(!CollectionUtils.isEmpty(sendTopics) && sendTopics.size() > 0 ){
                    try {
                        socketSubscribeEndPoint.getSession()
                                .getAsyncRemote().sendText(
                                        JSONObject.toJSONString(
                                        SubscribePushEntity
                                        .builder()
                                        .topic(sendTopics.get(0))
                                        .type(SubscribePushEntity.BUSINESS_MESSAGE_TYPE)
                                        .content(content)
                                        .build()));
                    }catch (Exception e){
                        log.error("发送socket消息失败",e);
                    }
                }
            }
    }
}

 

 

/**
 * kafka进行websocket消息分发,确保双机情况下客户端订阅任意一台服务器的socket都能收到消息
 */
@Slf4j
@Component
public class KafkaWebSocketPublisher {


    @Value(value = "${kafka.websocket.topic}")
    private String topic;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    private SocketSubscribeService socketSubscribeService;

    private static final String TOPIC = "topic";
    private static final String CONTENT = "content";



    /**
     * 服务器websocket消息发送到kafka
     *
     * @param websocketTopic
     * @param content
     */
    @Async
    public void sendMessage(String websocketTopic, MessageContent content) {
        JSONObject map = new JSONObject();
        map.put(TOPIC, websocketTopic);
        map.put(CONTENT, content);
        kafkaTemplate.send(topic, map.toJSONString());
    }

    /**
     * 监听kafka中的websocket消息 发送到对应的socket
     * @param record
     */
    public void sendWebsocket(ConsumerRecord<?, ?> record) {
        JSONObject map = JSONObject.parseObject((String) record.value());
        socketSubscribeService.sendMessage((String) map.get(TOPIC), map.get(CONTENT));
    }
}

 

 

/**
 * kafka监听消费
 */
@Slf4j
@Component
@AllArgsConstructor
public class KafkaConsumer {

    private KafkaStrategyContext kafkaStrategyContext;
    private KafkaWebSocketPublisher kafkaWebSocketPublisher;
    private SocketTransferService socketTransferService;
    private DailyExportThreadManager dailyExportThreadManager;

    /**
     * 消费监听
     */
    @KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
    public void onMessage1(List<ConsumerRecord<?, ?>> record) {
        log.info("本次批量处理kafka消息数:{}", record.size());
        kafkaStrategyContext.converterData(record);
    }

    /**
     * websocket监听
     */
    @KafkaListener(topics = "#{'${kafka.websocket.topic}'}", groupId = "#{'${kafka.websocket.consumer}'}")
    public void websocket(List<ConsumerRecord<?, ?>> record) {
        log.info("本次批量处理kafka消息数:{}", record.size());
        for (ConsumerRecord<?, ?> consumerRecord : record) {
            // 消费的哪个topic、partition的消息,打印出消息内容
            log.info("kafka websocket消息:" + consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.value());
            // 监听kafka中的websocket消息 发送到对应的socket
            kafkaWebSocketPublisher.sendWebsocket(consumerRecord);
        }
    }

   
}

 

 

/**
 * 用户订阅websocket消息格式,前端后台socket消息协议
 */
@Data
@Builder
@ToString
public class MessageContent {

    @JsonFormat(pattern = "HH:mm:ss", timezone = "GMT+8")
    private Date time;

    /**
     * 发送消息体
     */
    private Object message;
    /**
     * 消息类型
     */
    private String messageType;

}

 

 

// 实际业务处理,发送websocket消息
public class XXXServiceServiceImpl extends ServiceImpl<XXXMapper, XXXEntity> implements XXXService {
    @Override
    public void sendWebSocketMessage(String xxxMessage) {
            kafkaWebSocketPublisher.sendMessage("xxxtopic", MessageContent.builder()
                    .time(new Date())
                    .messageType("xxxType")
                    .message(xxxMessage)
                    .build());
    }

 

//前端

    // websocket 初始化
    websocketInit () {
      try {
        this.wsUrl =
          process.env.NODE_ENV === 'development'
            ? process.env.VUE_APP_BASE_API.replace('http', 'ws') + '/api/socket/subscribe' // 本地开发
            : 'ws://' + location.host + '/api/socket/subscribe' // 线上部署
        this.ws = new WebSocket(this.wsUrl)
        // 连接成功
        this.ws.onopen = () => {
          console.log(
            'socket连接成功'
          )
          this.ws.send(JSON.stringify([`xxxtopic`]))
          this.refreshTime = setInterval(() => {
            this.ws.send(JSON.stringify([`xxxtopic`]))
          }, 30 * 1000)
        }
        // 报错
        this.ws.onerror = err => {
          console.log(
            '%c err',
            err
          )
          // ws 重连
          this.reconnect()
        }
        // 接收消息
        this.ws.onmessage = msg => {
          const message = JSON.parse(msg.data)
          // console.log(message)
          switch (+message.type) {
            case 1:
              break
            case 2:
              const obj = {}
              // 替换
              message.content.message.replace(
                /([^(, ]+)=([^, )]+)/g,
                (_, k, v) => (obj[k] = v)
              )
              const { progress, describe } = obj || {}
              this.progress = progress * 100
              this.describe = describe
          }
        }
      } catch (error) {
        console.log(error)
      }
    },
    // 重连
    reconnect () {
      if (this.lockReconnect) return
      this.lockReconnect = true
      // 设置重连次数
      if (this.reconnectNumber < 5) {
        this.reconnectTimer = setTimeout(() => {
          console.log(
            'websocket重连......!'
          )
          this.websocketInit()
          this.lockReconnect = false
          this.reconnectNumber += 1
        }, 10 * 1000)
      } else {
        clearTimeout(this.reconnectTimer)
      }
    }

 

标签:websocket,log,private,kafka,topic,消息,双机,public
From: https://www.cnblogs.com/axibug/p/16790628.html

相关文章

  • 一个有意思的问题:Kafka的消费Offset会溢出吗
    最近在项目上接入公司APP产品的用户点击日志数据时,发现消费者组的Offset值非常大,才一天的时间,已提交的Offset值就有千亿级别了。于是不禁想了一个问题:假设一个Topic就只......
  • websocket中获取客户端通信的真实IP
    一些场景中,我们要对websocket客户端的ip进行校验,如果是黑名单,或者不被允许的则不应该让他访问业务系统。笔者本地使用了两个Websocket技术原型,一个基于Netty封装的Websocket......
  • MySQL双机热备
     1.MySQL双机热备原理双机热备就是使用MySQL提供的一种主从备份机制实现。所谓双机热备其实是一个复制的过程,复制过程中一个服务器充当主服务器,一个或多个服务器充当从服......
  • kafka实战最佳经验(文末福利)
    Kafka由于高吞吐量、可持久化、分布式、支持流数据处理等特性而被广泛应用。但当前关于Kafka原理及应用的相关资料较少,在我打算编写本文时,还没有见到中文版本的Kafka相关书......
  • Keeping Multiple Databases in Sync Using Kafka Connect and CDC
    SEP20,2022Microservicesarchitectureshavenowbeenwidelyadoptedamongdevelopers,andwithagreatdegreeofsuccess.However,drawbacksdoexist.Data......
  • Kafka效率详解
    前言:讲解透彻,从存储到系统,硬件,网络细节综合考虑问:讲讲kafka为什么这么快?答:kafka的快是从底层设计,到充分利用硬件,系统,压缩等等特性,综合产生的结果。要理解kafka为什么......
  • CentOS 7.9 安装 kafka_2.13
    一、CentOS7.9安装kafka_2.13地址https://kafka.apache.org/downloads.html 二、安装准备1安装JDK在安装kafka之前必须先安装JDK和zookeeper,如何安装JDK,可......
  • 03、如何理解Kafka和Zookeeper的关系
    001、Kafka简介ApacheKafka最早是由Linkedin公司开发,后来捐献给了Apack基金会。Kafka被官方定义为分布式流式处理平台,因为具备高吞吐、可持久化、可水平扩展等特性而被......
  • 02、kafka介绍
    001、kafka简介kafka消息队列有两种消费模式,分别是点对点模式和订阅/发布模式。具体比较可以参考Kafka基础–消息队列与消费模式。下图是一个点对点的Kafka结构示意图p......
  • 01、kafka常用命令
    001、kafka版本kafka_2.13-3.0.0kafka_2.12-2.8.0002、模拟给topic名称是yikuang的发一条数据(helloworld)./kafka-console-producer.sh--broker-listIP:9092-......