首页 > 编程语言 >提高kafka消费速度之从源码去了解Spring-kafka的concurrency参数

提高kafka消费速度之从源码去了解Spring-kafka的concurrency参数

时间:2023-04-19 10:58:13浏览次数:49  
标签:Spring KafkaListener factory kafka 源码 线程 concurrency consumer

网上看到这篇文章,觉得很不错,这里转载记录一下。

转自:提高kafka消费速度之从源码去了解Spring-kafka的concurrency参数 - 简书

第一部分、引言

        在spring应用中,如果我们需要订阅kafka消息,通常情况下我们不会直接使用kafka-client,而是使用了更方便的一层封装spring-kafka。特别是在springboot微服务中,基于注解和配置的spring-kafka可以给我们带来更简单更便捷的开发方式。不过,它不仅仅只是简单的封装的kafka-client,仔细看他的源码会发现里面大有文章,不得不惊呼大神写这个框架的牛逼。

        本文介绍了常见的几种在spring-kafka中提高kafka消费速度的方式,并重点介绍了使用多线程并发消费的方式,通过解读源码真正理解该多线程消费模型。

         partition与consumer的限制:“一个partition只能对应一个消费线程”。

注意:本文涉及到的case是在partition数大于consumer实例数的情况。比如现在有5台部署了consumer实例机器,要去消费30个partition,是这种情况下的提升消费速度优化。

 

        如果你现在面临的情况是那种partition数很少,又受限于“一个partition只能对应一个消费线程”的限制。

        比如partition数只有1,这种情况就必须要“把拉取动作和处理动作分开” 可以参考以下模型 —— 消息拉下来之后就丢给线程池handle并自动提交offset,也可以让消息在handler完之后手动提交offset(无论哪种方式都是不安全的,建议参考类似于TCP的滑动窗口控制的方式来控制消费位移的提交)

        默认情况下, Spring-Kafka @KafkaListener 串行消费的(这个串行是指poll的线程只有一个)。缺点显而易见生产者生产的数据过多时,消费端容易导致消息积压的问题。

        当然了, 我们可以通过启动多个进程,实现 多进程的并发消费。 当然了也取决于你的TOPIC的 partition的数量。

        试想一下, 在单进程的情况下,能否实现多线程的并发消费呢? Spring Kafka 为我们提供了这个功能,而且使用起来相当简单。 重点是把握原理,灵活运用。

@KafkaListener 的 concurrecy属性 可以指定并发消费的线程数 。

        举个例子 : 如果设置 concurrency=2 时,Spring-Kafka 就会为该 @KafkaListener标注的方法消费的消息 创建 2个线程,进行并发消费。 当然了,这是有前置条件的。 不要超过 partitions 的大小

  • 当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
  • 当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
  • 当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费

 分布式情况:总consumer线程数=concurrency*机器数量;

 

第二部分:提高消费速度的几种操作

        kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个或多个相同名称的consumer-group的消费者实例来加快消息消费(如果该topic只有一个partition,实际上再启动一个新的消费者,没有作用)。——这是最原始的提高消费速度的方式。
        然后我们介绍一下引入spring-kafka有哪些操作(其实都是可以用代码实现,只是你要自己做这层二次封装)

一、用多线程并发消费

        通过设置concurrency参数的方式。先看代码,我们可以使用两种不同的途径设置该参数:

第一种方式,直接在factory里面设置。

        我们给ConcurrentKafkaListenerContainerFactory设置了concurrency等于3,也可以通过在application.properties中添加spring.kafka.listener.concurrency=3的方式配置factory.

 
  1.   @Bean
  2.   KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  3.   ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  4.   factory.setConsumerFactory(consumerFactory());
  5.   factory.setConcurrency(3);
  6.   factory.setBatchListener(true);
  7.   factory.getContainerProperties().setPollTimeout(3000);
  8.   return factory;
  9.   }
 

      本人不推荐使用该方式,因为设置它的意思是给factory里面的每个listener都设置了3个线程,但其实有些listener监听的topic并没有那么多分区。推荐用第二种方式。

 

第二种方式,在@KafkaListener设置,支持SpEL表达式。

 
  1.   单独给某个listener配置并发线程数,这种方式在逻辑上更为合理。
  2.   @KafkaListener(topics = {"${kafka.calculate.topic}"}, concurrency = "3")
  3.   public void listen(ConsumerRecord<String, String> record) {
  4.   // doing
  5.   }
 

先介绍一下spring-kafka在工作的时候启动的一堆线程:

  • 分为两类线程,一类是Consumer线程,另一类是Listener线程
  • Consumer线程: 用来直接调用kafka-client的poll()方法获取消息。
  • Listener线程: 真正调用处理我们代码中标有@KafkaListener注解方法的线程。

        如果不使用spring-kafka,而是直接用kafka-client的话,那么正常我们会整一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

因此,先看结论:

        对于concurrency=3这个参数的值的设定来说,它设置的其实是每个@KafkaListener的并发数。spring-kafka在初始化的时候会启动concurrency个Consumer线程来执行@KafkaListener里面的方法。

        这里如果你是用第一种方式,在factory里面直接设置concurrency,那么每个加了@KafkaListener注解的都会新建concurrency个线程,这样如果listener订阅的topic没那么多分区,new那么多线程只会带来格外的性能开销,这样是我不推荐在factory指定concurrency的原因。

源代码

最后一部分将会带大家阅读源码证明以上这段话

另外:
看到这里可能会问,那我说的Listener线程没用到啊。不急,继续往下看,在源码中已经给出了:

 
  1.   protected void pollAndInvoke() {
  2.   if (!this.autoCommit && !this.isRecordAck) {
  3.   processCommits();
  4.   }
  5.   processSeeks();
  6.   checkPaused();
  7.   ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
  8.   this.lastPoll = System.currentTimeMillis();
  9.   checkResumed();
  10.   debugRecords(records);
  11.   if (records != null && records.count() > 0) {
  12.   if (this.containerProperties.getIdleEventInterval() != null) {
  13.   this.lastReceive = System.currentTimeMillis();
  14.   }
  15.   // 这里可以看到如果是自动提交offset,会直接把consumer poll下来的消息给到listener执行,
  16.   // 即kafka consumer所在线程会直接调用我们的@KafkaListener方法
  17.   invokeListener(records);
  18.   }
  19.   else {
  20.   checkIdle();
  21.   }
  22.   }
  23.   如果是手动提交offset,即enable-auto-commit设置为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行。
 

所以,当concurrency=3,自动提交设置为false时,如果你程序里有两个方法标记了@KafkaListener,那么此时会启动 2 * 3 = 6 个Consumer线程,6个Listener线程。
这个信息在排查错误的时候非常重要,但官方文档居然没怎么提线程的事(不够详细),只是在介绍KafkaContainerListener

另外例如:

      对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数。每添加一个@KafkaListener, spring-kafka都会启动concurrency条Consumer线程来监听这些topic(注解可以指定监听多个topic):

  • enable-auto-commit设为true时会直接在当前线程,即kafka consumer所在线程调用我们的@KafkaListener方法,
  • 如果enable-auto-commit设为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行。

有源码为证:

 
  1.   // if the container is set to auto-commit, then execute in the
  2.   // same thread
  3.   // otherwise send to the buffering queue
  4.   if (this.autoCommit) {
  5.   invokeListener(records);
  6.   }
  7.   else {
  8.   if (sendToListener(records)) {
  9.   if (this.assignedPartitions != null) {
  10.   // avoid group management rebalance due to a slow
  11.   // consumer
  12.   this.consumer.pause(this.assignedPartitions);
  13.   this.paused = true;
  14.   this.unsent = records;
  15.   }
  16.   }
  17.   }
 

所以,当concurrency=3,自动提交设置为false时,如果你程序里有两个方法标记了@KafkaListener,那么此时会启动 2 * 3 = 6 个Consumer线程,6个Listener线程。
这个信息在排查错误的时候非常重要,但官方文档居然没怎么提线程的事(不够详细),只是在介绍KafkaContainerListener

 

二、批量消费

涉及到两个参数,

  • 一个是factory.setBatchListener(true) —— 启动批量消费,
  • 一个是propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50) —— 设置批量消费每次最多消费多少条记录。

官方文档对max.poll.records的定义是:

        这个参数定义了poll()方法最多可以返回多少条消息,默认值为500。注意这里的用词是"最多",也就是说如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,就只返回500。

        这个500默认值是比较坑人的,如果你的消息处理逻辑比较重,比如需要查数据库,调用接口,甚至是复杂计算,那么你很难保证能够在max.poll.interval.ms内处理完500条消息,也就是说,如果上游真的突然大爆发生产了成千上万条消息,而平摊到每个消费者身上的消息达到了500的又无法按时消费完成的话,会触发消费者实例的rebalance, 然后这批消息会被分配到另一个消费者中,还是会处理不完,又会触发rebalance, 这样这批消息就永远也处理不完,而且一直在重复处理。

        所以在配置的时候要避免出现上述问题,可以提前评估好处理一条消息最长需要多少时间,然后务必覆盖默认的max.poll.records参数。在spring-kafka中这个原生参数对应的参数项是max-poll-records。对于消息处理比较重的操作,建议把这个值改到50以下会保险一些。

配置代码如下:

 
  1.   @Bean
  2.   public Map<String, Object> consumerConfigs() {
  3.   Map<String, Object> propsMap = new HashMap<>();
  4.   propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
  5.   propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
  6.   propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  7.   propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  8.   propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  9.   propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  10.   propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
  11.   propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
  12.   // 这里设置批量消费的消息个数
  13.   propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
  14.   return propsMap;
  15.   }
  16.   @Bean
  17.   public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  18.   ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  19.   factory.setConsumerFactory(consumerFactory());
  20.   factory.setConcurrency(4);
  21.   // 记得要开启批量消费,不然只是单次poll的消息增多而已
  22.   factory.setBatchListener(true);
  23.   factory.getContainerProperties().setPollTimeout(3000);
  24.   return factory;
  25.   }
 

三、分区消费

这种方式我个人觉得不是很适用分布式部署的情况,所以基本上不会考虑用它。。具体操作代码如下:

 
  1.   @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
  2.   public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
  3.   log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
  4.   log.info("Id0 records size " + records.size());
  5.    
  6.   for (ConsumerRecord<?, ?> record : records) {
  7.   // doing
  8.   }
  9.   }
  10.    
  11.   @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
  12.   public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
  13.   log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
  14.   log.info("Id1 records size " + records.size());
  15.    
  16.   for (ConsumerRecord<?, ?> record : records) {
  17.   // doing
  18.   }
 

第三部分:从源码去了解Spring-kafka的concurrency参数

直接找到我们的ConcurrentKafkaListenerContainerFactory类中,concurrency的使用:

 
  1.   @Override
  2.   protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance, KafkaListenerEndpoint endpoint) {
  3.   super.initializeContainer(instance, endpoint);
  4.   if (endpoint.getConcurrency() != null) {
  5.   instance.setConcurrency(endpoint.getConcurrency());
  6.   }
  7.   else if (this.concurrency != null) {
  8.   instance.setConcurrency(this.concurrency);
  9.   }
  10.   }
 

继续在ConcurrentMessageListenerContainer中我们找到了concurrency的使用:

 
  1.   @Override
  2.   protected void doStart() {
  3.   if (!isRunning()) {
  4.   checkTopics();
  5.   ContainerProperties containerProperties = getContainerProperties();
  6.   TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
  7.   // 这里你会发现如果你设置的concurrency 大于分区数,spring-kafka也会让它等于分区的个数,肯定不会超过分区数。
  8.   // 可这是在一个JVM里面的concurrency,如果你是多个消费者实例部署在多个服务器上(实际生产都是这么做),**那么你的concurrency 值记得还要除以机器数**。
  9.   if (topicPartitions != null && this.concurrency > topicPartitions.length) {
  10.   this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
  11.   + "equal to the number of partitions; reduced from " + this.concurrency + " to "
  12.   + topicPartitions.length);
  13.   this.concurrency = topicPartitions.length;
  14.   }
  15.   setRunning(true);
  16.   // 这里可以很清晰的看到,concurrency是指KafkaMessageListenerContainer的个数,即concurrency表明的是spring会创建多个KafkaMessageListenerContainer。
  17.   // 那么KafkaMessageListenerContainer类又干了什么?这里是for循环顺序创建了concurrency个KafkaMessageListenerContainer,那么spring又是如何并发的?这里会猜想是new了线程去处理
  18.   for (int i = 0; i < this.concurrency; i++) {
  19.   KafkaMessageListenerContainer<K, V> container;
  20.   if (topicPartitions == null) {
  21.   container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);
  22.   }
  23.   else {
  24.   container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
  25.   containerProperties, partitionSubset(containerProperties, i));
  26.   }
  27.   String beanName = getBeanName();
  28.   container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
  29.   if (getApplicationEventPublisher() != null) {
  30.   container.setApplicationEventPublisher(getApplicationEventPublisher());
  31.   }
  32.   container.setClientIdSuffix("-" + i);
  33.   container.setGenericErrorHandler(getGenericErrorHandler());
  34.   container.setAfterRollbackProcessor(getAfterRollbackProcessor());
  35.   container.setEmergencyStop(() -> {
  36.   stop(() -> {
  37.   // NOSONAR
  38.   });
  39.   publishContainerStoppedEvent();
  40.   });
  41.   container.start();
  42.   this.containers.add(container);
  43.   }
  44.   }
  45.   }
 

继续往下看KafkaMessageListenerContainer干了什么?
进入41行的start方法:这是父类AbstractMessageListenerContainer的start方法

 
  1.   @Override
  2.   public final void start() {
  3.   checkGroupId();
  4.   synchronized (this.lifecycleMonitor) {
  5.   if (!isRunning()) {
  6.   Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
  7.   () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
  8.   doStart();
  9.   }
  10.   }
  11.   }
 

再进入第8行的doStart方法:
发现doStart是一个抽象方法,于是我们需要回到AbstractMessageListenerContainer的子类KafkaMessageListenerContainer中,查看doStart方法具体做了什么:

 
  1.   @Override
  2.   protected void doStart() {
  3.   if (isRunning()) {
  4.   return;
  5.   }
  6.   if (this.clientIdSuffix == null) { // stand-alone container
  7.   checkTopics();
  8.   }
  9.   ContainerProperties containerProperties = getContainerProperties();
  10.   // 这里是做一些参数校验
  11.   checkAckMode(containerProperties);
  12.    
  13.   Object messageListener = containerProperties.getMessageListener();
  14.   Assert.state(messageListener != null, "A MessageListener is required");
  15.   if (containerProperties.getConsumerTaskExecutor() == null) {
  16.   SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
  17.   (getBeanName() == null ? "" : getBeanName()) + "-C-");
  18.   containerProperties.setConsumerTaskExecutor(consumerExecutor);
  19.   }
  20.   Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
  21.   // 设置消息来到之后,需要回调的listener和listenerType类型
  22.   GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
  23.   ListenerType listenerType = deteremineListenerType(listener);
  24.   // 这里创建了一个ListenerConsumer,看到这里我们也明白了为什么顺序创建了concurrency个KafkaMessageListenerContainer,却可以做到并发?
  25.   // 理由很简单:因为在KafkaMessageListenerContainer类内部中,对于ListenerConsumer的处理逻辑,是新起线程执行,所以才可以做到并发。
  26.   // 这里又一次说明了设置好concurrency值的重要性,如果处理不好,会new过多闲置的ListenerConsumer,因为一个partition只对应一个消费线程。
  27.   this.listenerConsumer = new ListenerConsumer(listener, listenerType);
  28.   setRunning(true);
  29.   this.listenerConsumerFuture = containerProperties
  30.   .getConsumerTaskExecutor()
  31.   .submitListenable(this.listenerConsumer);
  32.   }
 

继续,我们进入ListenerConsumer的构造函数里面:

 
  1.   ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
  2.   Assert.state(!this.isAnyManualAck || !this.autoCommit,
  3.   () -> "Consumer cannot be configured for auto commit for ackMode "
  4.   + this.containerProperties.getAckMode());
  5.   // 这里可以看到每个ListenerConsumer首先创建了一个Kafka定义的Consumer,然后,设置了consumer的订阅的topic、分区偏移量信息以及重新分配分区的监听对象。
  6.   this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
  7.   this.consumerGroupId,
  8.   this.containerProperties.getClientId(),
  9.   KafkaMessageListenerContainer.this.clientIdSuffix,
  10.   this.containerProperties.getConsumerProperties());
  11.    
  12.   if (this.transactionManager != null) {
  13.   this.transactionTemplate = new TransactionTemplate(this.transactionManager);
  14.   }
  15.   else {
  16.   this.transactionTemplate = null;
  17.   }
  18.   subscribeOrAssignTopics(this.consumer);
  19.   GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
  20.   this.genericListener = listener;
  21.   if (listener instanceof BatchMessageListener) {
  22.   this.listener = null;
  23.   this.batchListener = (BatchMessageListener<K, V>) listener;
  24.   this.isBatchListener = true;
  25.   this.wantsFullRecords = this.batchListener.wantsPollResult();
  26.   }
  27.   else if (listener instanceof MessageListener) {
  28.   this.listener = (MessageListener<K, V>) listener;
  29.   this.batchListener = null;
  30.   this.isBatchListener = false;
  31.   this.wantsFullRecords = false;
  32.   }
  33.   else {
  34.   throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
  35.   + "'BatchMessageListener', or the variants that are consumer aware and/or "
  36.   + "Acknowledging"
  37.   + " not " + listener.getClass().getName());
  38.   }
  39.   this.listenerType = listenerType;
  40.   this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
  41.   || listenerType.equals(ListenerType.CONSUMER_AWARE);
  42.   if (this.isBatchListener) {
  43.   validateErrorHandler(true);
  44.   this.errorHandler = new LoggingErrorHandler();
  45.   this.batchErrorHandler = determineBatchErrorHandler(errHandler);
  46.   }
  47.   else {
  48.   validateErrorHandler(false);
  49.   this.errorHandler = determineErrorHandler(errHandler);
  50.   this.batchErrorHandler = new BatchLoggingErrorHandler();
  51.   }
  52.   Assert.state(!this.isBatchListener || !this.isRecordAck,
  53.   "Cannot use AckMode.RECORD with a batch listener");
  54.   if (this.containerProperties.getScheduler() != null) {
  55.   this.taskScheduler = this.containerProperties.getScheduler();
  56.   this.taskSchedulerExplicitlySet = true;
  57.   }
  58.   else {
  59.   ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
  60.   threadPoolTaskScheduler.initialize();
  61.   this.taskScheduler = threadPoolTaskScheduler;
  62.   }
  63.   this.monitorTask = this.taskScheduler.scheduleAtFixedRate(this::checkConsumer,
  64.   this.containerProperties.getMonitorInterval() * 1000); // NOSONAR magic #
  65.   if (this.containerProperties.isLogContainerConfig()) {
  66.   this.logger.info(this);
  67.   }
  68.   Map<String, Object> props =
  69.   KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
  70.   this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, false));
  71.   this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, true));
  72.   }
 

看到这里差不多真相大白了,在梳理一遍:

ConcurrentKafkaListenerContainerFactory类中的concurrency设置了KafkaMessageListenerContainer对象创建的个数,每个KafkaMessageListenerContainer对象创建了一个ListenerConsumer对象,ListenerConsumer对象有封装了一个kafkaConsumer对象。

 

所以concurrency最终设置的是kafkaConsumer对象的个数。这个也和ConcurrentKafkaListenerContainerFactory类中setConcurrency方法的注释是一致的。

配置说明

参考:【spring-kafka】@KafkaListener详解与使用 - 掘金

说明

  • 从2.2.4版开始,您可以直接在注释上指定Kafka使用者属性,这些属性将覆盖在使用者工厂中配置的具有相同名称的所有属性。您不能通过这种方式指定group.id和client.id属性。他们将被忽略;

  • 可以使用#{…​}或属性占位符(${…​})在SpEL上配置注释上的大多数属性。 比如:

   @KafkaListener(id = "consumer-id",topics = "SHI_TOPIC1",concurrency = "${listen.concurrency:3}",clientIdPrefix = "myClientId")

concurrency = "${listen.concurrency:3}", 属性concurrency将会从容器中获取listen.concurrency的值,如果不存在就默认用3

concurrency并发数

KafkaListener的concurrency会覆盖KafkaListenerContainerFactory消费者工厂中的concurrency,这里的并发数就是多线程消费;

比如说单机情况下,你设置了3,相当于就是启动了3个客户端来分配消费分区;

分布式情况:总线程数=concurrency*机器数量;

并不是设置越多越好,具体如何设置请看 属性concurrency的作用及配置(RoundRobinAssignor 、RangeAssignor)

 
  1.   /**
  2.   * 监听器工厂
  3.   * @return
  4.   */
  5.   @Bean
  6.   public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
  7.   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
  8.   new ConcurrentKafkaListenerContainerFactory<>();
  9.   factory.setConsumerFactory(kafkaConsumerFactory());
  10.   factory.setConcurrency(6);
  11.   return factory;
  12.   }
 
    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

虽然使用的工厂是concurrencyFactory(concurrency配置了6); 但是他最终生成的监听器数量 是1;

 

总结

调大concurrency的值,真的会提高系统消息消费的能力吗?

       此时问题可以理解为:增加kafkaConsumer可以提高系统消息消费能力吗?答案很明显:不一定

       因为我们知道,kafka中一个consumer最多消费一个分区,在topic分区数量一定的情况下:

  1. 如果consumer的数量小于分区数量,那么增加consumer是可以提高系统消息消费能力的。
  2. 如果consumer的数量大于等于分区数量,那么此时一味提高consumer数量,系统的消息消费能力是不会提高的,反而还有可能下降。这是因为每次rebalance操作,所有的consumer都会参与,当consumer数量大时,rebalance的耗时会增加,因此系统的性能会有所下降。

标签:Spring,KafkaListener,factory,kafka,源码,线程,concurrency,consumer
From: https://www.cnblogs.com/silentdoer/p/17332537.html

相关文章

  • 5.数据交换格式与 SpringIOC 底层实现
    数据交换格式与SpringIOC底层实现一、课程目标XML和JSONJava反射机制手写SpringIOC二、什么是数据交换格式客户端与服务器常用数据交换格式xml、json、html三、数据交换格式用场景移动端(安卓、IOS)通讯方式采用http协议+JSON格式走restful风格。很多互......
  • 14.SpringAOP 编程
    SpringAOP编程课程目标代理设计模式Spring的环境搭建SpringIOC与AOPSpring事物与传播行为一、代理模式1.1概述代理(Proxy)是一种设计模式,提供了对目标对象另外的访问方式;即通过代理访问目标对象。这样好处:可以在目标对象实现的基础上,增强额外的功能操作。(扩......
  • springboot项目 宿舍管理系统 (源码+数据库文件+1w字论文+ppt)
    来了就点个赞再走呗,即将毕业的兄弟有福了文章底部获取源码springboot项目宿舍管理系统(源码+数据库文件+1w字论文+ppt)技术框架:java+springboot+vue+mysql后端框架:SpringBoot、SpringMVC、MyBatisPlus前端界面:vue、BootStrap、jQuery、ajxs系统共分为三种用户系统主要功......
  • 【Vue2.x源码系列06】计算属性computed原理
    上一章Vue2异步更新和nextTick原理,我们介绍了JavaScript执行机制是什么?nextTick源码是如何实现的?以及Vue是如何异步更新渲染的?本章目标计算属性是如何实现的?计算属性缓存原理-带有dirty属性的watcher洋葱模型的应用初始化在Vue初始化实例的过程中,如果用户options选......
  • springboot
    介绍springboot是spring项目中的一个子工程,前者的实现是基于spring的。springboot的特点:“开箱即用”和“约定大于配置”使用pom配置1)添加父工程依赖<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId>......
  • SpringBoot中配置Swagger2
    首先在pom.xml添加springfox-swagger2和springfox-swagger-ui两个依赖,并且spring-boot-starter-parent的版本不能太高,可以设置为2.1.6.RELEASE<!--https://mvnrepository.com/artifact/io.springfox/springfox-swagger2--><dependency> <groupId>io.springfox</groupId>......
  • java学习日记20230414-HashSet源码
    HashSetHashSet底层是HashMap添加一个元素时,先得到Hash值,会转化成索引值;找到存储数据表table,看这个索引位置是否存放元素;如果没有直接加入如果有,调用equals比较,如果相同放弃添加,如果不同,则添加到最后在java8中,如果一条链表的元素个数到达TREEIFY_THRESHOLD(默认是8)(table表......
  • Springboot整合Flowable6.x导出bpmn20
    项目源码仓库BPMN2.0(BusinessProcessModelandNotation)是一套业务流程模型与符号建模标准,以XML为载体,以符号可视化业务,支持精准的执行语义来描述元素的操作。Flowable诞生于Activiti,是一个使用Java编写的轻量级业务流程引擎。Flowable流程引擎可用于部署BPMN2.0流程定义,可以......
  • SpringMvc 响应数据传出
    SpringMVC输出模型数据概述提供了以下几种途径输出模型数据:ModelAndView:处理方法返回值类型为ModelAndView时,方法体即可通过该对象添加模型数据Map及Model:入参为org.springframework.ui.Model、org.springframework.ui.ModelMap或java.uti.Map时,处理方法返回时......
  • 源码共读|yocto-queue 队列 链表
    前言Yocto-queue是一种允许高效存储和检索数据的数据结构。它是一种队列类型,是一个元素集合,其中的项被添加到一端并从另一端移除。它被设计用来操作数据量很大的数组,在你需要使用大量的Array.push、Array.shift操作时,Yocto-queue有更好的性能表现。仓库地址:sindresorhus/yo......