首页 > 其他分享 >kafka 保证消息有序性

kafka 保证消息有序性

时间:2024-05-28 11:10:51浏览次数:26  
标签:顺序 消费者 生产者 分区 kafka 保证 消息 key 有序性

具体需要从 生产者和消费者两个方面来讲:

生产者:

1. 分区机制:

Kafka的核心机制之一是分区(Partition)。每个主题(Topic)可以被分割成多个分区,而消息在发布时会被追加到特定的分区中。在每个分区内部,消息是按照它们被追加的顺序来存储的,因此保证了分区内的消息顺序性。

 

2. 分区器:

生产者(Producer)在发送消息时可以指定一个分区器(Partitioner)来决定消息应该发送到哪个分区。分区器通常基于消息的某个属性(如key的哈希值)来决定分区。这样,具有相同key值的消息会始终被发送到同一个分区,从而确保了这些消息的顺序性。

3. 消息key:

生产者可以通过为消息设置特定的key来确保消息的顺序。例如,如果业务逻辑要求相同用户的消息保持顺序,那么生产者可以使用用户ID作为消息的key。这样,所有来自同一用户的消息都会被发送到同一个分区,并按顺序存储和消费。

 

备注:有可能发送的时候生产者 1 2  3三个消息其中 第二个消息因为网络或者其他原因党搁了一会,导致 发到 kafka里面 变成了 132   ,为了避免这个情况我们可以开启kafak的生产端消息幂等性(需要在生产端配置参数enable.idempotence = true,当幂等性开启的时候acks必须即为all。)

什么是幂等性,为什么要实现幂等性?
分布式系统中,一些不可控因素有很多,比如网络、OOM、FullGC等。在Kafka Broker确认Ack前,有可能出现网络异常、FullGC、OOM等问题时导致Ack超时,Producer会进行重复发送。注,在未达到最大重试次数前,会自动重试(非应用程序代码写的重试) 总结:生产者可以通过将消息指定分区,指定特定的key 加上开启消息幂等性 保证消息的有序性。

消费者:

4. 消费者组配置:

在消费者组(Consumer Group)中,每个分区通常只会被一个消费者实例消费。这意味着,如果生产者确保了消息在分区内的顺序性,那么消费者也将按照相同的顺序消费这些消息。这要求消费者组配置得当,确保每个分区只被一个消费者消费。

 

总结:消费者可以一个 topic,一个 partition,一个 consumer,内部单线程消费。保证消息的有序性。

OK 。生产消息端基于这种情况基本可以保证顺序性。但是问题如下:

问题

比如说我们建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。
消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。

 

解决方案如下:

消费者使用内存队列,将拿到的具有相同 key 的数据都存到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性

 

标签:顺序,消费者,生产者,分区,kafka,保证,消息,key,有序性
From: https://www.cnblogs.com/paimianbaobao/p/18217511

相关文章

  • springboot整合Kafka的快速使用教程
        目录一、引入Kafka的依赖二、配置Kafka三、创建主题1、自动创建(不推荐)2、手动动创建四、生产者代码五、消费者代码 六、常用的KafKa的命令    Kafka是一个高性能、分布式的消息发布-订阅系统,被广泛应用于大数据处理、实时日志分析等场景。S......
  • SpringBoot-kafka集成
    代码运行版本springboot.version=2.7.7spring-kafka.version=2.8.111POM<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <!--springboot依赖管理中有kafka的预管理版本,这里不用写版本就可以--> <v......
  • [AIGC] flink sql 消费kafka消息,然后写到mysql中的demo
    这是一个使用FlinkSQL从Kafka中消费数据并写入MySQL的示例。在这个示例中,我们将假设有一个Kafka主题“input_topic”,它产生格式为(user_id:int,item_id:int,behavior:string,timestamp:long)的数据,我们需要把这些数据写入名为"output_table"的MySQL表......
  • ELK+kafka+filebeat企业内部日志分析系统
    1、组件介绍1、Elasticsearch:  是一个基于Lucene的搜索服务器。提供搜集、分析、存储数据三大功能。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTfulweb接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计......
  • 第二篇:深入剖析Kafka生产者的架构和原理
    大家好!今天我们来深入探讨一下Kafka生产者的架构和原理。Kafka生产者是数据流入Kafka集群的起点,其设计和实现直接影响消息传输的可靠性和性能。本文将通过示例代码和源码剖析,带大家全面了解Kafka生产者的参数、整体架构、元数据更新过程等内容。准备好了吗?让我们开始吧!文......
  • 动态地控制kafka的消费速度,从而满足业务要求
    kafka是一个分布式流媒体平台,它可以处理大规模的数据流,并允许实时消费该数据流。在实际应用中,我们需要动态控制kafka消费速度,以便处理数据流的速率能够满足系统和业务的需求。本文将介绍如何在kafka中实现动态控制消费速度的方法。1.消费者配置在Kafka中,消费者可以使用以下参......
  • go go-redis 使用lua保证操作的原子性
      Redis是应对高并发的常用工具,在常用缓存技巧中讲过相关技巧。但有些业务场景,使用Redis会遇到问题,如电商里的秒杀、扣减库存等。拿减库存举例,一般需要两步:先扣减库存,获取扣减后的库存值V如果V小于0,说明库存不够,需要将扣减的值再加回去;如果V大于等于0,则执行后续操作......
  • kafka调优参考建议 —— 筑梦之路
    这里主要是从不同使用场景来调优,仅供参考。吞吐量优先吞吐量优先使用场景如采集日志。1. broker配置调优num.partitions:分区个数,设置为与消费者的线程数基本相等2. producer配置调优 batch.size批量提交消息的字节数,发送消息累计大小达到该值时才会发送(或者达到......
  • 如何保证消息不被重复消费?
    面试题如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?面试官心理分析其实这是很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是MQ领域的基本问题,其实本质上......
  • springboot集成kafka解决集群模式下分组ID不同问题
    背景:在集群模式下,每个实例需要分组ID不同,共同消费某个topic,集群下的实例是动态扩展的,无法确认实例的个数,每次项目启动的时候,需要动态的给定kakfa的分组ID,但是分组ID整体是一样的,不能改变。方式1:CURRENT_INSTANCE_GROUP_ID=KafkaConstant.SSE_GROUP.concat(String.valueOf(Sys......