首页 > 其他分享 >提高消费速度的几种操作

提高消费速度的几种操作

时间:2023-04-04 09:24:00浏览次数:38  
标签:spring factory kafka 线程 concurrency 操作 速度 几种 Consumer

https://blog.csdn.net/yanluandai1985/article/details/122317238

 

第二部分:提高消费速度的几种操作

        kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个或多个相同名称的consumer-group的消费者实例来加快消息消费(如果该topic只有一个partition,实际上再启动一个新的消费者,没有作用)。——这是最原始的提高消费速度的方式。
        然后我们介绍一下引入spring-kafka有哪些操作(其实都是可以用代码实现,只是你要自己做这层二次封装)

一、用多线程并发消费

        通过设置concurrency参数的方式。先看代码,我们可以使用两种不同的途径设置该参数:

第一种方式,直接在factory里面设置。

        我们给ConcurrentKafkaListenerContainerFactory设置了concurrency等于3,也可以通过在application.properties中添加spring.kafka.listener.concurrency=3的方式配置factory.

 

 

本人不推荐使用该方式,因为设置它的意思是给factory里面的每个listener都设置了3个线程,但其实有些listener监听的topic并没有那么多分区。推荐用第二种方式。

 

第二种方式,在@KafkaListener设置,支持SpEL表达式。

 

 

先介绍一下spring-kafka在工作的时候启动的一堆线程:

  • 分为两类线程,一类是Consumer线程,另一类是Listener线程。
  • Consumer线程: 用来直接调用kafka-client的poll()方法获取消息。
  • Listener线程: 真正调用处理我们代码中标有@KafkaListener注解方法的线程。

        如果不使用spring-kafka,而是直接用kafka-client的话,那么正常我们会整一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果你想用多个Consumer, 除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

因此,先看结论:

        对于concurrency=3这个参数的值的设定来说,它设置的其实是每个@KafkaListener的并发数。spring-kafka在初始化的时候会启动concurrency个Consumer线程来执行@KafkaListener里面的方法。

        这里如果你是用第一种方式,在factory里面直接设置concurrency,那么每个加了@KafkaListener注解的都会新建concurrency个线程,这样如果listener订阅的topic没那么多分区,new那么多线程只会带来格外的性能开销,这样是我不推荐在factory指定concurrency的原因。

 

标签:spring,factory,kafka,线程,concurrency,操作,速度,几种,Consumer
From: https://www.cnblogs.com/jiangzishun/p/17285264.html

相关文章

  • 操作系统-文件结构划分
    1.文件的逻辑结构(文件内部的逻辑结构)索引文件索引顺序文件2.文件目录结构(文件外部的逻辑结构:文件与文件之间的逻辑结构)FCB和索引节点的关系是怎样的在检索目录文件的过程中,只用到了文件名,文件的其他描述信息用不到,也不需要调入内存,所以文件描述信息就形成了一个叫索引结......
  • Django笔记十五之in查询及date日期相关过滤操作
    这一篇介绍关于范围,日期的筛选inrangedateyearweekweekdayquarterhour1、inin对应于MySQL中的in操作,可以接受数组、元组等类型数据作为参数:Blog.objects.filter(id__in=[1,2,3])对应的SQL是:select*fromblog_blogwhereidin(1,2,3);字符串也可以作......
  • python+playwright 学习-45 drag_to 拖拽操作
    前言按住元素从页面的一个位置拖动到另外一个位置,可以用drag_to()方法实现拖拽操作场景目标元素拖动到指定位置drag_to拖拽操作您可以使用locator.drag_to()执行拖放操作。此方法将:将鼠标悬停在要拖动的元素上。按鼠标左键。将鼠标移动到将接收放置的元素。松开鼠......
  • 1006-HBase操作实战(JAVA API模式)
    一、准备阶段开发环境:hadoop: hadoop -2.4.0hbase: hbase -0.94.11-securityeclipse:JunoServiceRelease2二、创建 hbasedemo项目1、通过Eclipse创建一个新Java工程2、右击项目根目录,选择“Propertiesà>JavaBuildPathà>Libraryà> Add Ext......
  • 1005--HBase操作实战(HBase Shell命令行模式)
    通过HBase命令行,创建一张表,用户存储用户信息,其中包括基本信息和额外信息HBaseshell下所有命令可以使用:help“cmd”进行了解1、创建表create't_person',{NAME=>'basic_info'},{NAME=>'extra_info'}2、表中存储数据put't_person','g201425001','ba......
  • 1004-HBase的基本操作
    1、连接HBase./bin/hbaseshell2、创建一个表使用create命令创建一个表,必须给出特定的表名(tablename)和列族(theColumnFamilyname)hbase(main):001:0>create'test','cf'3、列出表信息hbase(main):002:0>list'test'4、put数据到指定的表使用put命令,并指定表,行建,列族......
  • 第八章:文件操作
    第八章文件操作目录第八章文件操作1文件读取1将文件整个读取内存2按字节读取文件1文件读取1将文件整个读取内存类似于python的withopen(filename,mode='rt',encoding='utf-8')asf:res=f.read()go中的书写方式:方式一:packagemainimport( "fmt" ......
  • 第九章:json操作
    第十章json操作目录第十章json操作一、Marshal序列化二、Unmarshal反序列化1已知数据解析2未知数据解析3json测试一、Marshal序列化packagemainimport( "encoding/json" "fmt")typeAnimalstruct{ Namestring`json:"name"` Orderstring`json:"order"......
  • LINUX 放开端口,防火墙操作
    防火墙操作:查看防火墙状态systemctlstatusfirewalld、firewall-cmd--state暂时关闭防火墙systemctlstopfirewalld永久关闭防火墙(慎用)systemctldisablefirewalld开启防火墙systemctlstartfirewalld开放指定端口firewall-cmd--zone=public--add-port=8080/tcp--perman......
  • 从零开始USRP 02 一些基本的GNU Radio操作
    继续学习:https://blog.csdn.net/YOUNGAAAAA/article/details/128098154我们可以先简单创建一个USRP图:  但是这个图目前是跑不了的,因为:RuntimeError:LookupError:KeyError:Nodevicesfoundfor----->EmptyDeviceAddress>>>Done(returncode1)这里我暂时先不......