首页 > 其他分享 >Kafka笔记

Kafka笔记

时间:2024-02-05 15:22:51浏览次数:30  
标签:消费者 producer spring 偏移量 kafka 笔记 Kafka consumer

参考博客:
https://www.cnblogs.com/qingyunzong/category/1212387.html
https://www.cnblogs.com/haolujun/p/9632835.html (kafka与rabbitmq区别)
https://www.cnblogs.com/alvinscript/p/17407980.html (kafka核心机制,有图)

一、概念

1.1 Broker

  • Kafka 集群包含一个或多个服务器,服务器节点成为 Broker
  • Broker 存储 Topic ,如果某 Topic 有 N个 Partiton,集群有N个 Broker,那么每个 Broker 存储该 Topic的一个 Partition

1.2 Topic(略)

:主题

1.3 Partition

:每个topic至少有一个partition,当生产者产生数据的时候,根据分配策略(Hash取模),选择分区,然后将消息追加到指定的分区的末尾。每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。


Partition数据路由规则

  1. 指定了partition,则直接使用
  2. 未指定partition,但指定key,通过对key的value进行hash,选出一个partition
  3. partition和key都未指定,使用轮询选出一个partition

image

leader与follower同步机制具体逻辑:

  1. producer先从zookeeper的 "/brokers/../.../state" 节点找到该 partition 的leader
  2. producer将消息发送给该leader
  3. followers从leader pull消息,写入本地log后leader发送ACK
  4. leader收到所有ISR(in sync replicas)的ACK后,增加HW(hight watermark,最后commit的offset)并向 producer 发送ACK

1.4 Producer(略)

1.5 Consumer(略)

1.6 Consumer Group

  • 每个Consumer属于一个特定的 Consumer Group(可为每个Consumer指定group name,若不指定group name则数据默认的group)
  • 将多个消费者集中到一起去处理某一个Topic的数据
  • 整个消费者组共享一组偏移量(防止数据被重复读取),因为一个Topic有多个分区

1.6 Offset

image

  • 可以唯一的标识一条消息
  • HW(High Watermark): 也被称为高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。对于每个分区,HW是已知的,并且对消费者而言,只能消费HW之前的消息。
  • LEO(Log End Offset): 是当前日志文件中下一条待写入消息的偏移量。对于每个分区,LEO都会维护在ISR集合中的每个副本中,而ISR集合中最小的LEO即为分区的HW。一般来说,HW的值不大于LEO的值。

二、数据安全

2.1 ISR机制

  • AR((assigner replicas):用来标识副本的全集,AR = ISR + OSR
  • ISR(in-sync replicas): 加入同步队列的副本;ISR = Leader + 没有落后太多的副本
  • OSR(out-sync replicas): 离开同步队列的副本

2.2 传输保证

类型 说明
At most once 消息可能会丢,但绝不会重复传输
At least one 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的

Kafka默认保证 At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。

2.3 生产者方面

Producers可以选择是否为数据的写入接收ack,有以下几种ack的选项:
request.required.acks

  • acks=0:producer在ISR中leader已成功收到数据并得到确认后,发送下一条message
  • acks=1:producer会等待leader的ack(这意味着producer无需等待来自Broker的确认而继续发送下一批消息)
  • acks=2:leader与replicas的ack(producer需要等待ISR中所有的Follower都确认接收到数据后才算一次发送完成)

2.4 消费者方面

  • 如果将consumer设置为autocommit,consumer一旦读到数据立即自动commit (Exactly once)
  • 读完消息先commit,再处理消息(At most once)
  • 读完消息先处理,再commit(At least one)

三、参数配置

kafka中producer中的配置参数linger.ms的含义是什么,一直不太理解 ? - 知乎 (zhihu.com)
spring boot kafak 配置 - 布咚嘞 - 博客园 (cnblogs.com)

kafka:  
	# 指定kafka 代理地址,可以多个  
	bootstrap-servers: xxx
	# 主题分区  
	partition: 3  
	# 主题副本,副本数量不允许超过broker的数量  
	factor: 3  
	producer:  
		retries: 3  
		# 每次批量发送消息的数量  (字节数,16KB)
		batch-size: 16384  
		# 缓存容量  (32MB)
		buffer-memory: 33554432  
		# 如果消息迟迟没有达到`batch.size`,那么将尝试等待`linger.ms`时间发送。默认等待时间为0,也就是当消息到达之后立即发送
		linger: 5  
		# 指定消息key和消息体的编解码方式  
		key-serializer: org.apache.kafka.common.serialization.StringSerializer  
		value-serializer: org.apache.kafka.common.serialization.StringSerializer  
	consumer:  
		# 指定默认消费者group id  
		group-id: consumer-tutorial  
		# 当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办
			# _earliest_:自动将偏移量重置为最早的偏移量;
			# _latest_:自动将偏移量重置为最迟的偏移量;
			# _none_:如果未找到消费者组的先前偏移量,则将异常抛出给消费者;
			# _exception_:向消费者抛出异常;
		auto-offset-reset: earliest  
		# 消费者的消费记录offset是否后台自动提交
		enable-auto-commit: false  
		# 当消费者的消费记录offset是否后台自动提交时,多长时间自动提交一次(ms)
		auto-commit-interval: 100  
		# 指定消息key和消息体的编解码方式  
		key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
		value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
		# 批量一次最大拉取数据量  
		max-poll-records: 1000  
		# 指定listener 容器中的线程数,用于提高并发量  
	listener:  
		# 是否开启批量消费,true表示批量消费  
		batch-listener: true  
		concurrency: 3  
	streams:  
		application-id: MessageCenter

# kafka topic  
topic:  
	sms-mac: RXEnv{RX_MC_TOPIC_SMS_MAC}  
	sms-general: RXEnv{RX_MC_TOPIC_SMS_GENERAL}  
	im: RXEnv{RX_MC_TOPIC_IM}  
	mail: RXEnv{RX_MC_TOPIC_MAIL}  
  
# kafka group  
group:  
	sms-mac: RXEnv{RX_MC_GROUP_SMS_MAC}  
	sms-general: RXEnv{RX_MC_GROUP_SMS_GENERAL}  
	im: RXEnv{RX_MC_GROUP_IM}  
	mail: RXEnv{RX_MC_GROUP_MAIL}
key value 备注
spring.kafka.bootstrap-servers 配置kafak的broker地址 格式为 host:port,可以配置多个,用 ',' 隔开
spring.kafka.client-id 发出请求时传递给服务器的ID,用于服务器端日志记录
消费者配置 spring.kafka.consumer.bootstrap-servers 配置kafak customer 的broker地址 格式为 host:port,可以配置多个,用 ',' 隔开;优先级比spring.kafka.bootstrap-servers高,当不设置时,使用spring.kafka.bootstrap-servers的值
spring.kafka.consumer.client-id 发出请求时传递给服务器的ID,用于服务器端日志记录,不设置时,系统会自动生成
spring.kafka.consumer.group-id 设置当前消费者所在组的名称
spring.kafka.consumer.enable-auto-commit 消费者的消费记录offset是否后台自动提交
spring.kafka.consumer.auto-commit-interval 当消费者的消费记录offset是否后台自动提交时,多长时间自动提交一次
spring.kafka.consumer.auto-offset-reset 当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办 earliest:自动将偏移量重置为最早的偏移量
latest:自动将偏移量重置为最迟的偏移量
none:如果未找到消费者组的先前偏移量,则将异常抛出给消费者
exception:向消费者抛出异常
spring.kafka.consumer.max-poll-records 一次调用 poll() 返回的最大记录数,默认是500
spring.kafka.consumer.fetch-max-wait 当没有足够的数据(数据的大小不小于 fetch.min.bytes)返回给客户端时,服务器最大阻塞时间
spring.kafka.consumer.fetch-min-size 服务器应为获取请求返回的最小数据量(以字节为单位)
spring.kafka.consumer.heartbeat-interval 消费者协调员之间心跳的预期时间(单位是毫秒)
spring.kafka.consumer.key-deserializer 消费者所有keys的序列化类 默认是 org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer 消费者所有values的序列化类 默认是 org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.properties 消费者其他属性配置 类型 java.util.Map<java.lang.String,java.lang.String>
生产者配置 spring.kafka.producer.bootstrap-servers 配置kafak produce的broker地址 格式为 host:port,可以配置多个,用 ',' 隔开;优先级比spring.kafka.bootstrap-servers高,当不设置时,使用spring.kafka.bootstrap-servers的值
spring.kafka.producer.client-id 发出请求时传递给服务器的ID,用于服务器端日志记录,不设置时,系统会自动生成
spring.kafka.producer.acks 生产者要求数据有多少个副本接收到数据才算发送成功(类型是java.lang.String) '0' :表示生产者数据发送到 leader 就算写入成功,但是如果 leader 在把数据写到本地磁盘时报错,就会数据丢失,akcs 设置为0时,kafka 可以达到最大的吞吐量;
'1' :表示生产者数据发送到 leader 并写入到磁盘才算写入成功,但是如果数据在同步到其他副本时,leader 挂了,其他副本被选举为新 leader,那么就会有数据丢失;
“-1” / “all“:表示生产者把数据发送到 leader,并同步到其他副本,才算数据写入成功,这种模式一般不会产生数据丢失,但是 kafka 的吞吐量会很低;
spring.kafka.producer.batch-size 默认批处理大小(以字节为单位) 小批量将使分批变得不那么普遍,并且可能会降低吞吐量(零批量将完全禁用批处理)
spring.kafka.producer.buffer-memory 生产者可以用来缓冲等待发送到服务器的记录的内存总字节数
spring.kafka.producer.compression-type 生产者生成的所有数据的压缩类型
spring.kafka.producer.key-deserializer 生产者所有keys的序列化类 默认是 org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.value-deserializer 生产者所有values的序列化类 默认是 org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.retries 当数据发送失败时,可以重试发送的次数
spring.kafka.producer.transaction-id-prefix 不为空时,为生产者启用事务支持
spring.kafka.producer.properties 生产者其他属性配置 类型 java.util.Map<java.lang.String,java.lang.String>
消费者监听器 spring.kafka.listener.type 监听类型,类型 Listener.Type Type.SINGLE:一次调用一个ConsumerRecord的端点(默认)
Type.BATCH:用一批ConsumerRecords调用端点
spring.kafka.listener.ack-mode 当 auto.commit.enable 设置为false时,表示kafak的offset由customer手动维护,spring-kafka提供了通过ackMode的值表示不同的手动提交方式 AckMode.RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

AckMode.BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

AckMode.TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

AckMode.COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

AckMode.COUNT_TIME:上述 TIM 或 COUNT 有一个条件满足时提交

AckMode.MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

AckMode.MANUAL_IMMEDIATE :手动调用Acknowledgment.acknowledge()后立即提交
spring.kafka.listener.client-id 监听器的使用者的client.id属性的前缀
spring.kafka.listener.concurrency 在监听器容器中运行的线程数,表示启动多少个并发的消费者,这个值不能大于实际消费的主题的分区数
spring.kafka.listener.poll-timeout 消费者一次poll方法的超时时间,当在一次poll方法中,如果一次请求不到数据或者请求的数据小于设定的值,那么poll方法会继续执行请求,直到超时或者满足设置的条件
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.ack-count 表示当一个poll数据消费后,处理的记录数大于多少时,触发提交 当 spring.kafka.listener.ack-mode 设置为 AckMode.COUNT 或者 AckMode.COUNT_TIME 时生效
spring.kafka.listener.ack-time 表示当一个poll数据消费后,距离上次提交时间大于 ack-time 时提交 当 spring.kafka.listener.ack-mode 设置为 AckMode.TIME 或者 AckMode.COUNT_TIME 时生效
spring.kafka.listener.idle-event-interval 发布空闲的消费者事件之间的时间(未接收到数据)
spring.kafka.listener.monitor-interval 无反应的消费者检查之间的时间。 如果未指定持续时间后缀,则将使用秒

标签:消费者,producer,spring,偏移量,kafka,笔记,Kafka,consumer
From: https://www.cnblogs.com/lhxBlogs/p/18008244

相关文章

  • C# 微信公众号token 认证笔记
     因公司需要,开通了微信公众号。在开发对接中摸索了2天,写下此记,备忘。 服务器地址(URL):https://www.findtechgroup.net/Handler1.ashx因http80端口已被其他业务占用,只能用https(443)协议,需路由映射服务器的443端口。 IIS中需要添加SSL证书,这个证书在阿里云中免费申请,......
  • Eralng 学习笔记第五天, 异常,宏,头文件,预处理器,模式匹配
    Erlang异常在Erlang中,有3种例外类型-Error−调用将终止当前进程的执行,并在捕获到最后一个函数及其参数时包含堆栈跟踪。这些是引发上述运行时错误的异常。erlang:error(Reason)Exists −有两种Exists:内部退出和外部退出。内部退出通过调用函数exit/1来触发,并使当前进......
  • 云原生学习第4天笔记
    云原生技术栈容器技术:Docker、containerd容器编排:Kubernetes、Swarm、Mesos微服务架构:SpringCloud、Dubbo服务发现与负载均衡:Consul、Nginx配置管理:Consul、Etcd存储技术:Ceph、MinIO监控与日志:Prometheus、Grafana、ELK云原生数据库:Cassandra、MongoDB、PostgreSQL云原生应用架构......
  • 【System Design Interview】笔记
    2024/02/04c1-c4 Chapter1:scalefromzerotomillionsofuserssingleserversetupDNSdomainnamesystemis3rdpartyservicethatparsesdomainnametointernetprotocalipaddress.OncetheIPaddressisobtained,HTTPrequestsaresentdirectly......
  • 学习笔记——割点与桥
    一、割点、桥基本概念给定无向图\(G=(V,E)\)。对于一个点\(u\inG\),删除一个节点\(u\)与该节点所有相连的边后,该图不连通,则称点\(u\)为割点。对于一条边\(\{U,W\}\inG\),删除一条边\(\{U,W\}\)后,该图不连通,则称边\(\{U,W\}\)为桥。二、暴力算法对于割点,枚举每个......
  • c++11的左值 右值的笔记
    在C++11的程序中,所有的值必须属于左值,将亡值,纯右值之一。将忘值则是c++11新增的跟右值引用相关的表达式,这样表达式通常是将要被移动的对象(以为他用),比如返回右值引用T&&的函数返回值,std::move的返回值,或者转换为T&&的类型的转换函数的返回值。而剩余的,可以标识函数、对象的值都属......
  • service命令使用笔记
    一、简介#service--helpUsage:service[-h|-?]servicelistservicecheckSERVICEservicecallSERVICECODE[i32N|i64N|fN|dN|s16STR|null|fdf|nfdn|afdf]...Options:i32:Writethe32-bitintegerNintothes......
  • [解决办法]笔记本win11 win10系统亮度自动降低 关闭自动对比度自动亮度自适应
    https://www.bilibili.com/video/BV18K411k7AJ解决办法整理:控制面板:控制面板\所有控制面板项\电源选项\编辑计划设置这里的显示里面有的电脑有自动降低亮度相关设置英特尔显卡管理面板-功率菜单,节能功能关闭。(微软商店可以装这个软件)首先,他节约不少多少点能服务-......
  • 【学习笔记】网络流与二分图初步
    网络流与二分图初步我们约定,对于有向图\(G=(V,E)\),分析复杂度时\(m=|E|,n=|V|\)。在分析时间复杂度时,网络流的实际表现基本都优于其理论上的时间复杂度表现。I概念(1)网络流:在一个有向带权图上(不考虑自环和重边),与最短路类似,我们定义一个源点\(s\)和一个汇点\(t\)......
  • 读千脑智能笔记04_参考系
    1.      大脑中的参考系1.1.        人类出色的认知功能是区分我们与灵长目动物的最显著的特点1.1.1.          只有人类才能使用复杂的语言,制造诸如计算机等复杂的工具,并且能够论证进化、遗传学和民主等概念1.2.        人类大脑新皮质......