https://blog.csdn.net/yanluandai1985/article/details/122317238
第二部分:提高消费速度的几种操作
kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个或多个相同名称的consumer-group的消费者实例来加快消息消费(如果该topic只有一个partition,实际上再启动一个新的消费者,没有作用)。——这是最原始的提高消费速度的方式。
然后我们介绍一下引入spring-kafka有哪些操作(其实都是可以用代码实现,只是你要自己做这层二次封装)
一、用多线程并发消费
通过设置concurrency参数的方式。先看代码,我们可以使用两种不同的途径设置该参数:
第一种方式,直接在factory里面设置。
我们给ConcurrentKafkaListenerContainerFactory设置了concurrency等于3,也可以通过在application.properties中添加spring.kafka.listener.concurrency=3的方式配置factory.
本人不推荐使用该方式,因为设置它的意思是给factory里面的每个listener都设置了3个线程,但其实有些listener监听的topic并没有那么多分区。推荐用第二种方式。
第二种方式,在@KafkaListener
设置,支持SpEL表达式。
先介绍一下spring-kafka在工作的时候启动的一堆线程:
- 分为两类线程,一类是Consumer线程,另一类是Listener线程。
- Consumer线程: 用来直接调用kafka-client的poll()方法获取消息。
- Listener线程: 真正调用处理我们代码中标有
@KafkaListener
注解方法的线程。
如果不使用spring-kafka,而是直接用kafka-client的话,那么正常我们会整一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。
因此,先看结论:
对于concurrency=3
这个参数的值的设定来说,它设置的其实是每个@KafkaListener
的并发数。spring-kafka在初始化的时候会启动concurrency
个Consumer线程来执行@KafkaListener
里面的方法。
这里如果你是用第一种方式,在factory里面直接设置concurrency,那么每个加了@KafkaListener
注解的都会新建concurrency
个线程,这样如果listener订阅的topic没那么多分区,new那么多线程只会带来格外的性能开销,这样是我不推荐在factory指定concurrency的原因。
标签:spring,factory,kafka,线程,concurrency,操作,速度,几种,Consumer From: https://www.cnblogs.com/jiangzishun/p/17285264.html