首页 > 其他分享 >Kafka 生产者

Kafka 生产者

时间:2024-02-13 13:11:05浏览次数:26  
标签:生产者 broker 默认 Kafka 发送 该值 线程

1.生产者消息发送流程

1.发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

2.生产者重要参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地 址 清 单 。 例 如192.168.58.130:9092,192.168.58.131:9092,192.168.58.132:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间
acks 0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。
默认值是-1,-1 和all 是等价的
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms
enable.idempotence 是否开启幂等性,默认 true,开启幂等性
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。
支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.异步发送 API

标签:生产者,broker,默认,Kafka,发送,该值,线程
From: https://www.cnblogs.com/fanqisoft/p/18014525

相关文章

  • Kafka 命令行操作
    1.Topic(主题)命令行操作1.查看Topic所有命令bin/kafka-topics.sh以下展示为最常使用的参数描述--bootstrap-server<String:servertoconnectto>连接的KafkaBroker主机名称和端口号--topic<String:topic>操作的topic名称--create创建Topic(主题)......
  • Kafka 3.6.1 集群安装与部署
    1.集群规划hadoop02(192.168.58.130)hadoop03(192.168.58.131)hadoop04(192.168.58.132)zookeeperzookeeperzookeeperkafkakafkakafka2.集群部署1.下载kafka二进制包https://kafka.apache.org/downloads2.解压mkdir/usr/kafkatar-zxvf/home/kafka_2......
  • 简单的斐波那契数列通过chan实现生产者消费者模型
    1.实现斐波拉契数列写一个函数返回长度为n的斐波拉契slice数组funcfi(nint)[]int{ ifn<=0{ return[]int{} } fibs:=make([]int,n) fibs[0]=0 ifn>1{ fibs[1]=1 fori:=2;i<n;i++{ fibs[i]=fibs[i-1]+fibs[i-2] } } returnfibs}......
  • Flink CDC实时同步PG数据库到Kafka
    一、安装规划操作系统服务器IP主机名硬件配置CentOS7.6192.168.80.131hadoop01内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.132hadoop02内存:2GB,CPU:2核,硬盘:100GBCentOS7.6192.168.80.133hadoop03内存:2GB,CPU:2核,硬盘:100GB......
  • SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka
    (之前写了一个flink-cdc同步数据的博客,发布在某N,最近代码开源了,直接复制过来了,懒得重新写了,将就着看下吧)最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC采集PostgreSQL变更数据发布到Kafka。 一、业务......
  • kafka-oti
    尚硅谷大数据技术之Kafka(作者:尚硅谷研究院)版本:V4.0第1章Kafka概述1.1定义1.2消息队列目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。在大数据场景主 要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。......
  • kafka系列(一)【消息队列、Kafka的基本概念、Kafka的工作机制、Kafka可满足的需求、Kafk
    (kafka系列一)一、消息队列1.消息队列的来源在高并发的应用场景中,由于来不及同步处理请求,接收到的请求往往会发生阻塞。例如,大量的插入、更新请求同时到达数据库,这会导致行或表被锁住,最后会因为请求堆积过多而触发“连接数过多的异常”(TooManyConnections)错误。因此,在高......
  • Kafka笔记
    参考博客:https://www.cnblogs.com/qingyunzong/category/1212387.htmlhttps://www.cnblogs.com/haolujun/p/9632835.html(kafka与rabbitmq区别)https://www.cnblogs.com/alvinscript/p/17407980.html(kafka核心机制,有图)一、概念1.1BrokerKafka集群包含一个或多个服务器,......
  • Kafka-常用命令行命令
    第一章Kafka常用命令1. Topic(主题)1.1. 创建Topicbin/kafka-topics.sh--create--bootstrap-serverhadoop01:9092 --replication-factor2 --partitions1 --topictest 1.2. 查询Topic列表1.2.1. 查询所有Topic列表bin/kafka-topics.sh--list--bootstrap-ser......
  • Kafka-如何重设消费者位移(OFFSET)
    1.为什么要重设消费者组位移?我们知道,Kafka和传统的消息引擎在设计上是有很大区别的,其中一个比较显著的区别就是,Kafka的消费者读取消息是可以重演的(replayable)。像RabbitMQ或ActiveMQ这样的传统消息中间件,它们处理和响应消息的方式是破坏性的(destructive),即一旦消息被成功......