首页 > 其他分享 >Kafka教程(二)API开发-生产者、消费者、topic

Kafka教程(二)API开发-生产者、消费者、topic

时间:2022-10-08 13:01:15浏览次数:76  
标签:分区 Kafka topic API 提交 new 拉取 consumer

一、地址

1、实时更新的思维导图

​https://www.mubucm.com/doc/4uqlpedefuj​

2、图片

Kafka教程(二)API开发-生产者、消费者、topic_偏移量

二、具体内容


  • 5.producer生产者

  • demo

  • 发送pro.send(new ProducerRecord<String,String>("test","123"))

  • ProducerRecord的属性

  • 重载方法234

  • 只topic和内容

  • 多key

  • hash(key)确定发送的分区

  • 多key和partition

  • key不起作用,发到指定分区

  • topic

  • partiton

  • headers

  • K

  • 用于hash计算分区

  • V

  • 发送的消息值

  • timestamp

  • ack应答机制

  • 用于保证数据发送可靠

  • 必要参数

  • bootstrap.servers:整个集群的地址

  • 发送模式

  • 发后即忘

  • producer.send(rcd)

  • 同步发送sync

  • send(rcd).get()

  • 异步发送async

  • send(rcd,new CallBack(){onCompletion(){xxxx}})

  • 6.consumer消费者

  • demo

  • 订阅多个topic

  • poll(time超时时间)获取消息ConsumerRecord

  • record包含的内容.get

  • offset

  • key

  • value

  • 必要参数

  • group id

  • bootstrap.servers

  • 无需设置整个集群,只需设置单个

  • 消息订阅

  • subscribe订阅

  • 重载方法*4

  • ConsumerRebalanceListener再均衡监听器?

  • 方式

  • 指定集合方式

  • 正则方式订阅

  • 传参Pattern.compile("ods_*")

  • assign订阅

  • consumer.assign(Collection<TopicPartition> collection)

  • 可以手动订阅多个topic的指定分区

  • 单个:Arrays.asList(new TopicPartition("topic1",0))

  • 区别

  • 粒度

  • topic粒度(group管理)

  • topic-partition粒度(自己管理)

  • 是否具有rebalance分区再均衡功能

  • 取消订阅

  • consumer.unsubscribe()

  • subscribe(new ArrayList<T>())

  • String

  • TopicPatition

  • 消费模式

  • 拉取模式poll(time)

  • time为阻塞时间(多久拉一次)

  • 设置为Long.MAX_VALUE,可以提高吞吐率

  • 消息类型ConsumerRecord

  • ConsumerRecords<String,String> records=consumer.poll(100)

  • ConsumerRecord rd=records.record(new TopicPartition("topic1",0))

  • 提交偏移量

  • 指定位移消费

  • consumer.seek(TopicPartition,offset)

  • 自动提交

  • 两个配置

  • auto.commit

  • interval.ms

  • 到点提交各分区最大位移

  • 默认true,5000

  • 存在问题

  • 重复消费

  • 消费者崩溃

  • 丢失消息

  • 拉取消息放入阻塞队列BlockingQueue

  • 阻塞队列的处理线程异常,从上次提交的位移处消费

  • 即实际消费到了3,已经提交了6

  • 手动提交

  • 调用API实现

  • 关闭自动提交:auto.commit设为false

  • 类型

  • 同步提交

  • commitSync()-处理完提交(提交和拉取会阻塞)

  • 含参commitAsync(Map<TopicPatition,OffsetAndMetaData>)

  • record获取offset,+1后作为Meta构造参数

  • 提交的偏移量是消费完record的偏移量+1

  • 异步提交

  • 提交和拉取不会阻塞,提高消费者性能

  • 重载的commitAsync

  • commitAsync()

  • commitAsync(OffsetCommitCallback)-带回调

  • commitAsync(Map<TopicPatition,OffsetAndMetaData>,OffsetCommitCallback)-指定分区&偏移量+回调

  • 手动提交的时机

  • 处理完成前提交

  • 存在漏处理(数据丢失)

  • 实现了at most once语义

  • 处理完成后提交

  • 存在重复处理/消费(数据重复)

  • 原因:处理后提交前出bug

  • 实现了at least once语义

  • 理想语义:Exactly once(精确一次)

  • ★通过kafka的事务机制实现

  • 提交方式总结

  • 全自动

  • 半自动

  • 全手动

  • 提交的位置:__consumer_offset

  • 重要参数介绍

  • 一次拉取的最大最小数据量

  • 拉取的最大等待时长

  • 每个分区拉取的最大数据量

  • 一次拉取的最大条数

  • 等待请求响应、闲置、重试间隔

  • 消息隔离级别:读未提交或已提交

  • read_uncommit:能消费到LSO

  • read_committed:可以消费到HW

  • 超时时长,超过认为消费者已离开cg

  • 7.topic管理

  • 工具类KafkaAdminClient

  • 用于集成内部系统,实现多功能的生态平台

  • 功能:管理broker、配置、ACL、管理topic

  • 创建方式:KafkaAdminClient.create(props)

  • 列出主题

  • listTopics()

  • 查看主题信息

  • describeTopics(Arrays.asList(xx,yy))

  • 创建主题

  • createTopics(new NewTopic(name,replicaAssignment))

  • 删除主题

  • deleteTopics(Arrays.asList(xx,yy))

  • 其他管理

  • 动态参数管理

  • 修改配置:alterConfigs(Map<ConfigResource,Config>)

  • 分区管理

  • 新增分区:createPartitions(Map<String,NewPartitions> map)

以上内容整理于​​ 幕布文档 ​

作者:​​哥们要飞​


标签:分区,Kafka,topic,API,提交,new,拉取,consumer
From: https://blog.51cto.com/liujinhui/5737232

相关文章

  • Consul的HTTP API和使用方法
    Consul支持基础结构的服务注册和发现(称为内部服务),也支持外部服务(第三方SAAS服务以及无法直接运行Consul代理的其它环境,例如redis)。直接使用sudoapt-getinstallc......
  • docker搭建yapi接口文档系统、Idea中上传接口、在线调用
    一、前言在我们后端开发中,必不可少的是接口的交接,有很多种方式,常见的就是swagger,不过这个侵入性太强了。还有就是接口文档的框架,比如今天小编带大家一起搭建的yapi,在公司......
  • 谷歌翻译无法使用,google翻译失效,googleapis失效的解决方案
    我们日常工作和学习中,都无可避免的参考一些英文网站或者一些优秀的外国文献,这时候谷歌翻译就显得尤为重要。现在忽然不能使用了,经过几番探索,可通过修改dns解析即可恢复,希望......
  • ES API基本操作
    创建索引,user为索引名称PUThttp://192.168.0.110:9200/user查询索引基本信息GEThttp://192.168.0.110:9200/user查询所有索引GEThttp://19......
  • vueuse 核心api
     供自己学习使用。代码来源于elementplus。后续看情况是否增加说明  isClient  useStorageconstuserPrefer=useStorage<boolean|string>(......
  • go-zero微服务实战系列(三、API定义和表结构设计)
    前两篇文章分别介绍了本系列文章的背景以及根据业务职能对商城系统做了服务的拆分,其中每个服务又可分为如下三类:api服务-BFF层,对外提供HTTP接口rpc服务-内部依赖的微服......
  • 数维图API文档:SovitJS编辑器开放API调用方法
    SovitChart、Sovit2D、Sovit3D已经在众多行业领域被使用,也受到了大家的一致好评,为了更好的二次开发,不少用户想把我们的编辑器集成在自己的系统中,强烈要求我们开放API接口,经......
  • .NetCore WebAPI 导入、导出Excel文件
    .NetCoreWebAPI导入、导出Excel文件导入思路:上传Excel文件,使用MemoryStream在内存中加载,使用NPOI读取内容到Model类中。///<summary>///导入Excel文件///</su......
  • .Net WebApi 中的 FromBody FromForm FromQuery FromHeader FromRoute
    在日常后端Api开发中,我们跟前端的沟通中,通常需要协商好入参的数据类型,和参数是通过什么方式存在于请求中的,是表单(form)、请求体(body)、地址栏参数(query)、还是说通过请求......
  • 若依使用postman登录进行api测试
    最近无聊研究一下若依的架构,但是不想在前端上有过多的纠结,因此想使用postman来操作后端的api进行测试。关闭验证码我们使用postman时传递验证码的base64非常繁琐,因此关闭......