首页 > 其他分享 >kafka

kafka

时间:2023-09-17 23:22:45浏览次数:45  
标签:消费者 分区 partition kafka topic offset

kafka的partiton在实际的消息生产消费过程中是如何使用的。

安装

zookeeper安装

jdk安装~
#zookeeper 默认端口2181
数据缓存位置:zoo.cfg => dataDir=xxx
1.启动方式
./zkServer.sh stop
./zkServer.sh start
./zkServer.sh status
2.连接方式
bin/zkCli.sh
#指定端口
bin/zkCli.sh -server ip:port
3.集群角色
领导者leader 负责进行投票的发起和决议,更新系统状态
学习者Learner	(跟随者Follower、观察者Observer)
跟随者Follower	用于接受客户请求,并返回客户端结果,在选主过程中参与投票
观察者Observer 接受客户请求,转发leader节点,不参与投票,之同步Leader状态
客户端Client


java连接

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;

/**
 * 操作ZooKeeper的Znode
 */
public class ZnodeDemo implements Watcher {
    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        //创建连接zookeeper对象
        ZooKeeper zooKeeper = new ZooKeeper("192.168.3.130:2181,192.168.3.130:2182,192.168.3.130:2183",150000,new ZnodeDemo());
        //
//        String path=zooKeeper.create("/bjsxt/test","oldlu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);
//        System.out.println(path);
//        byte[] result=zooKeeper.getData("/bjsxt/test0000000000",new ZnodeDemo(),new Stat());
//        System.out.println(new String(result));
        //遍历所有的znode节点
//        List<String> list=zooKeeper.getChildren("/bjsxt",new ZnodeDemo());
//        for (String path:list){
//            byte[] data=zooKeeper.getData("/bjsxt/"+path,new ZnodeDemo(),null);
//            System.out.println(new String(data));
//        }
        //设置Znode的值
//        Stat stat=zooKeeper.setData("/bjsxt/test0000000000","modifyvalue1".getBytes(),-1);
//        System.out.println(stat);
          zooKeeper.delete("/bjsxt/test0000000000",-1);
    }

    /**
     * 事件通知回调方法
     * @param event
     */
    @Override
    public void process(WatchedEvent event) {
        if(event.getState() == Event.KeeperState.SyncConnected){
            System.out.println("连接成功");
        }
    }
}

kafka安装

#config/server.properties
broker.id	节点唯一标识

#配置Listeners配置的ip为内网ip advertised.listeners公网ip
listeners(ٖ内网Ip)
advertised.listeners(公网ip)

#修改zookeeper地址,默认地址
zookeeper.connection=localhost:2181

#消息存储的目录
log.dirs=

#启动命令
./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh -daemon ../config/server.properties &
./kafka-server-stop.sh

./kafka-topics.sh --create --zookeeper 192.168.3.128:2181 --replication-factor 1 --partitions 1 --topic xdclass-topic
#查看topic
./kafka-topics.sh --list --zookeeper 192.168.3.128:2181

#生产者
./kafka-console-producer.sh --broker-list 192.168.3.128:9092 --topic xdclass-topic

#消费者  --from-beginning 会将主题中以往的数据读出来
./kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --from-beginning --topic xdclass-topic

#删除topic
./kafka-topics.sh --zookeeper 192.168.3.128:2181 --delete --topic t1

#查看broker状态
./kafka-topics.sh --describe --zookeeper 192.168.3.128:2181 --topic xdclass-topic

基本概念

  • LEO(LogEndOffset)

    • 表示每个partition的log最后一条Message的位置
  • HW (HighWatermark)

    • 各个replicas数据同步且一致的offset位置
  • Broker

    • Kafka服务端程序,一个mq节点
    • broker存储topic数据
  • Producer生产者

  • Consumer消费者

  • ConsumerGroup消费者组

    一个组只有一个消费者可以消费同一个topic中的一个消息

  • ReplicationLeader、ReplicationFollower

    • Partition有多个副本,但只有一个RelicationLeader负责该Partition和生产者消费
    • ReplicationFollower只是作为一个备份,从RelicationLeader进行同步
  • LEO(LogEndOffset)

    • 表示每个partition的log最后一条Message的位置
  • HW(HighWaterMark)

    • 表示partition各个replicas数据间同步且一致的offset位置,即表示all replicas已经commit的位置
    • HW之前的数据才是commit后的,对消费者才可见
    • ISR集合里面最小leo
  • offset

    • 每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中
    • partition中的每个消息都有一个联系的序列号叫做offset,用于partition唯一表示一条消息
    • 可以认为offset是partition中message的id
  • Segment:每个partition又由多个segment file组成

    • segment file 由两个部分组成,分别是index file 和data file (log file)
    • 两个文件一一对应,后缀.index和.log分别表示索引文件和数据文件
    • 命名规则:partition的第一个segment从0开始i,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
  • Kafka高效文件存储设计特点:

    • topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用
    • 通过索引信息可以快速定位message
    • producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明。同样的磁盘,顺序写能到600m/s,而随机写只有100k/s
  • kafka的producer生产着发送到broker流程

    • 如果指定Partition ID,则PR被发送到指定partition(ProducerRecord)
    • 如果未指定Partition ID,但指定了key,PR会按照hash(key)发送至指定Partition
    • 如果未指定Partition ID 也没有指定Key,PR会按照round-Robin轮训模式发送到每个Partition
      • 消费者消费partition分区默认是range模式
    • 如果同时指定了Partition ID和Key,PR只会发送到指定的Partition(Key不起作用,代码逻辑决定)
    • Partition由多个副本,只有一个replicationLeader负责改Partition和生产者消费者交互

点对点和订阅发布模型

点对点

#创建topic
./kafka-topics.sh --create --zookeeper 192.168.3.128:2181 --replication-factor 1 --partitions 2 --topic t1

./kafka-console-producer.sh --broker-list 192.168.3.128:9092 --topic t1

#指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 192.168.3.128:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties


发布订阅

修改config/comsumer.properties中的group.id确保不一样

整合springboot

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

KafkaAdmin

import org.apache.kafka.clients.admin.*;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.*;
import java.util.concurrent.ExecutionException;

@SpringBootTest
public class KafkaAdminTest {

    private static final String TOPIC_NAME = "my-topic";

    /**
     * 设置admin客户端
     * @return
     */
    public static AdminClient initAdminClient(){
        Properties properties = new Properties();

        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.128:9092");
        AdminClient adminClient =
                AdminClient.create(properties);
        return adminClient;
    }

    @Test
    public void createTopic() {
        AdminClient adminClient = initAdminClient();
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 2 , (short) 1);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
        try {
            createTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("创建新的topic");
    }

    //获取topic集合
    @Test
    public void listTopic() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        ListTopicsResult listTopics = adminClient.listTopics(options);
        Set<String> topics = listTopics.names().get();
        for (String topic : topics) {
            System.err.println(topic);
        }
    }

    //删除
    @Test
    public void delTopicTest() {
        AdminClient adminClient = initAdminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclasssp11-topic"));
        try {
            deleteTopicsResult.all().get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //查看topic详细信息
    @Test
    public void getTopicInfo() throws Exception {
        AdminClient adminClient = initAdminClient();
        DescribeTopicsResult describeTopicsResult =
                adminClient.describeTopics(Arrays.asList(TOPIC_NAME)
                );
        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.stream().forEach((entry)-> System.out.println("name:"+entry.getKey()+" , desc:"+ entry.getValue()));
    }

    @Test
    public void incrPartitionsTest() throws Exception{
        Map<String, NewPartitions> infoMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        AdminClient adminClient = initAdminClient();
        infoMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
        createPartitionsResult.all().get();
    }
}

生产者

基础配置

生产者常见配置:https://kafka.apache.org/documentation/#producerconfigs

#kafka地址,即broker地址
bootstrap.servers  

#当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
acks

#请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
retries

#每个分区未发送消息总字节大小,单位:字节,超过设置的值就会提交数据到服务端,默认值是16KB
batch.size

# 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满,如果想减少请求的数量,可以设置 linger.ms 大于#0,即消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
# 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送 减少请求
#如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
linger.ms

# buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
# 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到Kafka服务器
# 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
# buffer.memory要大于batch.size,否则会报申请内存不足的错误,不要超过物理内存,根据实际情况调整
buffer.memory

# key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使
#消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将#key序列化成字节数组。
key.serializer
value.serializer  

注意!!!!
key默认是null,大多数应用程序会用到key
	如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡的分布在各个partition上
	如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息被写到Topic的哪个partition,拥有相同key消息会被写到同一个partition,实现顺序消息
	

样例

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerTest {

    private static final String TOPIC_NAME = "xdclasssp-topic";

    public static Properties getProperties(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.3.128:9092");
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092");

        // 当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别,分别是0, 1,all。
        props.put("acks", "all");
        //props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 请求失败,生产者会自动重试,指定是0次,如果启用重试,则会有重复消息的可能性
        props.put("retries", 0);
        //props.put(ProducerConfig.RETRIES_CONFIG, 0);

        // 生产者缓存每个分区未发送的消息,缓存的大小是通过 batch.size 配置指定的,默认值是16KB
        props.put("batch.size", 16384);

        /**
         * 默认值就是0,消息是立刻发送的,即便batch.size缓冲空间还没有满
         * 如果想减少请求的数量,可以设置 linger.ms 大于0,即消息在缓冲区保留的时间,超过设置的值就会被提交到          服务端
         * 通俗解释是,本该早就发出去的消息被迫至少等待了linger.ms时间,相对于这时间内积累了更多消息,批量发送           减少请求
         * 如果batch被填满或者linger.ms达到上限,满足其中一个就会被发送
         */
        props.put("linger.ms", 1);

        /**
         * buffer.memory的用来约束Kafka Producer能够使用的内存缓冲的大小的,默认值32MB。
         * 如果buffer.memory设置的太小,可能导致消息快速的写入内存缓冲里,但Sender线程来不及把消息发送到             Kafka服务器
         * 会造成内存缓冲很快就被写满,而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了
         * buffer.memory要大于batch.size,否则会报申请内存不#足的错误,不要超过物理内存,根据实际情况调整
         * 需要结合实际业务情况压测进行配置
         */
        props.put("buffer.memory", 33554432);

        /**
         * key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被          设置,
         * 即使消息中没有指定key,序列化器必须是一个实
         org.apache.kafka.common.serialization.Serializer接口的类,
         * 将key序列化成字节数组。
         */
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        return props;
    }


    /**
     * send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
     * 生产者将单个的消息批量在一起发送来提高效率,即 batch.size和linger.ms结合
     *
     * 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回 ack
     * 发送消息后返回的一个 Future 对象,调用get即可
     *
     * 消息发送主要是两个线程:一个是Main用户主线程,一个是Sender线程
     *  1)main线程发送消息到RecordAccumulator即返回
     *  2)sender线程从RecordAccumulator拉取信息发送到broker
     *  3) batch.size和linger.ms两个参数可以影响 sender 线程发送次数
     *
     *
     */
    @Test
    public void testSend(){

        Properties props = getProperties();
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 1; i < 3; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>("my-topic", "xdclass-key"+i, "xdclass-value"+i));
            try {
                RecordMetadata recordMetadata = future.get();//不关心是否发送成功,则不需要这行
                System.out.println("发送状态:"+recordMetadata.toString());

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println(i+"发送:"+ LocalDateTime.now().toString());
        }
        producer.close();
    }
    
    


}

自定义partition分区规则

  • 默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
  • 自定义分区规则

    • 创建类,实现Partitioner接口,重写方法
    • 配置partitioner.class指定类
    public class XdclassPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if("xdclass".equals(key)) {
                return 0;
            }
            //使用hash值取模,确定分区(默认的也是这个方式)
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
        @Override
        public void close() {
        }
        @Override
        public void configure(Map<String, ?> configs) {
        }
    }
    
      @Test
        public void testSend(){
            Properties props = getProperties();
            props.put("partitioner.class", "net.xdclass.xdclassredis.XdclassPartitioner");
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 3; i++){
                Future<RecordMetadata>  future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass", "xdclass-value"+i));
                try {
                    RecordMetadata recordMetadata = future.get();
                    System.out.println("发送状态:"+recordMetadata.toString());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println(i+"发送:"+LocalDateTime.now().toString());
            }
            producer.close();
        }
    

消费者

  • 消费者根据什么模式从broker获取数据的?
    • 为什么是pull模式,而不是broker主动push?
      • 消费者采用 pull 拉取方式,从broker的partition获取数据
      • pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
        • 如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
      • 如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。

消费者从哪个分区进行消费?

  • 一个 topic 有多个 partition,一个消费者组里面有多个消费者,那是怎么分配过
    • 一个主题topic可以有多个消费者,因为里面有多个partition分区 ( leader分区)
    • 一个partition leader可以由一个消费者组中的一个消费者进行消费
    • 一个 topic 有多个 partition,所以有多个partition leader,给多个消费者消费,那分配策略如何?

顶层接口

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
  • round-robin (RoundRobinAssignor非默认策略)轮训
    • 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
    • c-1: topic-p0/topic-p2/topic-p4/topic-p6
    • c-2:topic-p1/topic-p3/topic-p5
    • 弊端
      • 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
      • 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
      • t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
      • 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
image-20230917225856747
  • range (RangeAssignor默认策略)范围
    • 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
    • c-1: topic-p0/topic-p1/topic-p2/topic-p3
    • c-2:topic-p4/topic-p5/topic-p6
    • 弊端
      • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
      • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

面试

  • 什么是Rebalance操作

    • kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。

    • 而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态

  • 面试

    • 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
      • Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
        • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
        • 分区数量发生变化时(即 topic 的分区数量发生变化时)
  • 面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?

    • 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
    • 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
      • 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
      • 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
      • 由 消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
      • 三元组:group.id+topic+分区号,而 value 就是 offset 的值

消费者配置

#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id
​
#为true则自动提交偏移量
enable.auto.commit
​
#自动提交offset周期
auto.commit.interval.ms
​
#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset
​
#序列化器
key.deserializer

消费者机制和分区策略

消费者从哪个分区进行消费?两个策略

  • 顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
  • round-robin (RoundRobinAssignor非默认策略)轮训

    • 【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来, 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
    • c-1: topic-p0/topic-p2/topic-p4/topic-p6
    • c-2:topic-p1/topic-p3/topic-p5
    • 弊端
      • 如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
      • 有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
      • t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
      • 消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
  • range (RangeAssignor默认策略)范围

    • 【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
    • topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
    • c-1: topic-p0/topic-p1/topic-p2/topic-p3
    • c-2:topic-p4/topic-p5/topic-p6
    • 弊端
      • 只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
      • 如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

reblance操作

  • 什么是Rebalance操作

    • kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance),前面讲了 Range 范围分区 和 RoundRobin 轮询分区,也支持自定义分区策略。
    • 而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态
  • 面试

    • 例如70个分区,10个消费者,但是先启动一个消费者,后续再启动一个消费者,这个会怎么分配?
      • Kafka 会进行一次分区分配操作,即 Kafka 消费者端的 Rebalance 操作 ,下面都会发生rebalance操作
        • 当消费者组内的消费者数量发生变化(增加或者减少),就会产生重新分配patition
        • 分区数量发生变化时(即 topic 的分区数量发生变化时)
  • 面试:当消费者在消费过程突然宕机了,重新恢复后是从哪里消费,会有什么问题?

    • 消费者会记录offset,故障恢复后从这里继续消费,这个offset记录在哪里?
    • 记录在zk里面和本地,新版默认将offset保证在kafka的内置topic中,名称是 __consumer_offsets
      • 该Topic默认有50个Partition,每个Partition有3个副本,分区数量由参数offset.topic.num.partition配置
      • 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到__consumer_offsets主题的哪个分区中
      • 由 消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的值
      • 三元组:group.id+topic+分区号,而 value 就是 offset 的值

Consumer配置讲解和Kafka调试日志配置

  • springboot关闭kafka调试日志
#yml配置文件修改
logging:
  config: classpath:logback.xml
  
  
#logback.xml内容
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>
  • 消费者配置
#消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
group.id

#为true则自动提交偏移量
enable.auto.commit

#自动提交offset周期
auto.commit.interval.ms

#重置消费偏移量策略,消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理,
#默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更 才可以
auto.offset.reset

#序列化器
key.deserializer

Kafka消费者Consumer消费消息配置实战

  • 配置
 public static Properties getProperties() {
        Properties props = new Properties();

        //broker地址
        props.put("bootstrap.servers", "112.74.55.160:9092");

        //消费者分组ID,分组内的消费者只能消费该消息一次,不同分组内的消费者可以重复消费该消息
        props.put("group.id", "xdclass-g1");

        //开启自动提交offset
        props.put("enable.auto.commit", "true");

        //自动提交offset延迟时间
        props.put("auto.commit.interval.ms", "1000");

        //反序列化
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }
  • 消费订阅
@Test
    public void simpleConsumerTest(){
        Properties props = getProperties();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //订阅topic主题
        consumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));

       while (true) {
            //拉取时间控制,阻塞超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
            for (ConsumerRecord<String, String> record : records) {
                System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
            }
        }
    }

Consumer手工提交offset配置和从头消费配置

  • 如果需要从头消费partition消息,怎操作?

    • auto.offset.reset 配置策略即可
    • 默认是latest,需要改为 earliest 且消费者组名变更 ,即可实现从头消费
    //默认是latest,如果需要从头消费partition消息,需要改为 earliest 且消费者组名变更,才生效 props.put("auto.offset.reset","earliest");
    
  • 自动提交offset问题

    • 没法控制消息是否正常被消费
    • 适合非严谨的场景,比如日志收集发送
  • 手工提交offset配置和测试

    • 初次启动消费者会请求broker获取当前消费的offset值
  • 手工提交offset

    • 同步 commitSync 阻塞当前线程 (自动失败重试)
    • 异步 commitAsync 不会阻塞当前线程 (没有失败重试,回调callback函数获取提交信息,记录日志)

标签:消费者,分区,partition,kafka,topic,offset
From: https://www.cnblogs.com/dzystudy/p/17710169.html

相关文章

  • Kafka的零拷贝技术Zero-Copy
    传统的拷贝过程流程步骤:(1)操作系统将数据从磁盘文件中读取到内核空间的页面缓存;(2)应用程序将数据从内核空间读入用户空间缓冲区;(3)应用程序将读到数据写回内核空间并放入socket缓冲区;(4)操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。此过程涉及到4次上......
  • kafka
    Kafka学习笔记_day01适用场景:大数据场景消息队列模式点对点模式消费者主动拉取数据,消息收到以后清除消息发布/订阅模式可以存在多个Topic主题消费者消费完数据以后,不删除数据每个消费者相互独立,都可以消费到数据基础架构内部将一个Topic(主题)分为了多个partition(分区),并配合分区......
  • KafKa概述
    概述KafKa就是一个消息队列:作用概况为:解耦、异步、削峰https://juejin.cn/post/6996826368512098317使用消息队列的好处解耦(类似Spring的IOC)允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。可恢复性系统的一部分组件失效时,不会影响到整个......
  • RabbitMQ、RocketMQ和Kafka的不同之处
    RabbitMQ、RocketMQ和Kafka是三种常见的消息队列系统,它们在设计和使用方面有一些不同之处:架构设计:RabbitMQ:RabbitMQ是一个基于AMQP(高级消息队列协议)的开源消息队列系统,采用的是传统的Broker架构模式,其中包括生产者、消费者和中间件(Broker)。RocketMQ:RocketMQ是一个基于分布式......
  • OGG-Postgres实时同步到Kafka
    (一)数据同步信息名称源端名称目标端数据库类型Postgresql12.4组件类型KafkaIP地址20.2.127.23Broker地址20.2.125.52:9092,20.2.127.23:9092,20.2.127.24:9092端口5432端口9092数据库testpdbZookeeperHa......
  • SpringBoot-Learning系列之Kafka整合
    SpringBoot-Learning系列之Kafka整合本系列是一个独立的SpringBoot学习系列,本着WhatWhyHow的思想去整合Java开发领域各种组件。消息系统主要应用场景流量消峰(秒杀抢购)、应用解耦(核心业务与非核心业务之间的解耦)异步处理、顺序处理实时数据传输管道异构语言架构......
  • kafka3.x 简单使用
    ***保证kafka和zookeeper已经在linux上进行了安装,目录需要改为自己的目录 ***kafka2.8之后引入了kraft机制,不用zookeeper也能启动参数介绍 --create创建一个topic --topic[your_topic_name]创建的topic的信息 --describe描述信息 --bootstrap-server[host_url......
  • 【Kafka】ZooKeeper启动失败报错java.net.BindException_ Address already in use_ bi
    问题描述Kafka2.8.1ZooKeeper启动失败。zookeeper-server-start.bat../../config/zookeeper.properties[2023-09-0418:21:49,497]INFObindingtoport0.0.0.0/0.0.0.0:2181(org.apache.zookeeper.server.NIOServerCnxnFactory)[2023-09-0418:21:49,498]ERRORUnexpected......
  • Strimzi从入门到精通系列之三:部署Kafka Connect
    Strimzi从入门到精通系列之三:部署KafkaConnect一、概述二、将KafkaConnect部署到Kubernetes集群三、KafkaConnect配置四、为多个实例配置KafkaConnect五、添加连接器六、自动使用连接器插件构建新的容器映像七、使用KafkaConnect基础镜像中的连接器插件构建新的容器镜......
  • Strimzi从入门到精通系列之二:部署Kafka
    Strimzi从入门到精通系列之二:部署Kafka一、认识Strimzi二、Strimzi的核心知识点三、Kafka集群、TopicOperator、UserOperator四、部署Kafka集群五、使用ClusterOperator部署TopicOperator六、使用ClusterOperator部署UserOperator一、认识StrimziStrimzi是一款用于在......