一、地址
1、实时更新的思维导图
https://www.mubucm.com/doc/4uqlpedefuj
2、图片
二、具体内容
5.producer生产者
demo
发送pro.send(new ProducerRecord<String,String>("test","123"))
ProducerRecord的属性
重载方法234
只topic和内容
多key
hash(key)确定发送的分区
多key和partition
key不起作用,发到指定分区
topic
partiton
headers
K
用于hash计算分区
V
发送的消息值
timestamp
ack应答机制
用于保证数据发送可靠
必要参数
bootstrap.servers:整个集群的地址
发送模式
发后即忘
producer.send(rcd)
同步发送sync
send(rcd).get()
异步发送async
send(rcd,new CallBack(){onCompletion(){xxxx}})
6.consumer消费者
demo
订阅多个topic
poll(time超时时间)获取消息ConsumerRecord
record包含的内容.get
offset
key
value
必要参数
group id
bootstrap.servers
无需设置整个集群,只需设置单个
消息订阅
subscribe订阅
重载方法*4
ConsumerRebalanceListener再均衡监听器?
方式
指定集合方式
正则方式订阅
传参Pattern.compile("ods_*")
assign订阅
consumer.assign(Collection<TopicPartition> collection)
可以手动订阅多个topic的指定分区
单个:Arrays.asList(new TopicPartition("topic1",0))
区别
粒度
topic粒度(group管理)
topic-partition粒度(自己管理)
是否具有rebalance分区再均衡功能
取消订阅
consumer.unsubscribe()
subscribe(new ArrayList<T>())
String
TopicPatition
消费模式
拉取模式poll(time)
time为阻塞时间(多久拉一次)
设置为Long.MAX_VALUE,可以提高吞吐率
消息类型ConsumerRecord
ConsumerRecords<String,String> records=consumer.poll(100)
ConsumerRecord rd=records.record(new TopicPartition("topic1",0))
提交偏移量
指定位移消费
consumer.seek(TopicPartition,offset)
自动提交
两个配置
auto.commit
interval.ms
到点提交各分区最大位移
默认true,5000
存在问题
重复消费
消费者崩溃
丢失消息
拉取消息放入阻塞队列BlockingQueue
阻塞队列的处理线程异常,从上次提交的位移处消费
即实际消费到了3,已经提交了6
手动提交
调用API实现
关闭自动提交:auto.commit设为false
类型
同步提交
commitSync()-处理完提交(提交和拉取会阻塞)
含参commitAsync(Map<TopicPatition,OffsetAndMetaData>)
record获取offset,+1后作为Meta构造参数
提交的偏移量是消费完record的偏移量+1
异步提交
提交和拉取不会阻塞,提高消费者性能
重载的commitAsync
commitAsync()
commitAsync(OffsetCommitCallback)-带回调
commitAsync(Map<TopicPatition,OffsetAndMetaData>,OffsetCommitCallback)-指定分区&偏移量+回调
手动提交的时机
处理完成前提交
存在漏处理(数据丢失)
实现了at most once语义
处理完成后提交
存在重复处理/消费(数据重复)
原因:处理后提交前出bug
实现了at least once语义
理想语义:Exactly once(精确一次)
★通过kafka的事务机制实现
提交方式总结
全自动
半自动
全手动
提交的位置:__consumer_offset
重要参数介绍
一次拉取的最大最小数据量
拉取的最大等待时长
每个分区拉取的最大数据量
一次拉取的最大条数
等待请求响应、闲置、重试间隔
消息隔离级别:读未提交或已提交
read_uncommit:能消费到LSO
read_committed:可以消费到HW
超时时长,超过认为消费者已离开cg
7.topic管理
工具类KafkaAdminClient
用于集成内部系统,实现多功能的生态平台
功能:管理broker、配置、ACL、管理topic
创建方式:KafkaAdminClient.create(props)
列出主题
listTopics()
查看主题信息
describeTopics(Arrays.asList(xx,yy))
创建主题
createTopics(new NewTopic(name,replicaAssignment))
删除主题
deleteTopics(Arrays.asList(xx,yy))
其他管理
动态参数管理
修改配置:alterConfigs(Map<ConfigResource,Config>)
分区管理
新增分区:createPartitions(Map<String,NewPartitions> map)
以上内容整理于 幕布文档
作者:哥们要飞