首页 > 其他分享 >kafka的生产者学习

kafka的生产者学习

时间:2022-10-08 22:14:41浏览次数:53  
标签:发送 错误 生产者 分区 kafka 学习 消息 key

转自:https://www.cnblogs.com/cxuanBlog/p/11949238.html

1.流程介绍

图 1

大概流程:

  1. 创建一个ProducerRecord 对象,它是 Kafka 中的一个核心类,由记录要发送到的主题名称(Topic Name),可选的分区号(Partition Number)以及可选的键值对key/value构成。
  2. 在发送时,需要将ProducerRecord的key/value键值对由序列化器转换为字节数组,这样它们才能够在网络上传输。
  3. 然后消息到达了分区器。如果发送过程中指定了有效的分区号,那么在发送记录时将使用该分区。如果发送过程中未指定分区,则将使用key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有key,则将以循环的方式分配一个分区。选好分区后,生产者就知道向哪个主题的哪个分区发送数据了。
  4. 然后,这条消息被存放在一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。
  5. Kafka Broker 在收到消息时会返回一个响应,如果写入成功,会返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量,以及时间戳。如果写入失败,会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败的话,就返回错误消息。

2.配置

首先,需要创建一个生产者对象ProducerRecord,并设置一些属性。Kafka 生产者有3个必选的属性

  • bootstrap.servers:指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他的 broker 信息。不过建议至少要提供两个 broker 信息,一旦其中一个宕机,生产者仍然能够连接到集群上。
  • key.serializer:指定key/value的序列化方式。

例子,创建一个 Properties 对象,使用 StringSerializer 序列化器序列化 key / value 键值对,创建一个新的生产者对象,并为键值设置了恰当的类型,然后把 Properties 对象传递给它:

private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String,String>(properties);

//但这里没有设置key和value的值?之后应该可可以再设置?producer.setKey(),producer.setValue()??

3. 发送消息

3.1 简单消息发送

public ProducerRecord(String topic, K key, V value) {}

例子:

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");
producer.send(record);//直接发送 

 如上图,消息会先被写入分区中的缓冲区中,然后分批次发送给 Kafka Broker。send方法会返回RecordMetadata对象。

消息发送过程可能会发生错误和异常,如SerializationException(序列化失败)BufferedExhaustedException 或 TimeoutException(说明缓冲区已满),又或是 InterruptedException(说明发送线程被中断)。

3.2 同步发送消息

ProducerRecord<String,String> record =
                new ProducerRecord<String, String>("CustomerCountry","West","France");

try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

如果出现错误则抛出异常。生产者(KafkaProducer)在发送的过程中会出现两类错误:

  • 重试错误,这类错误可以通过重发消息来解决。比如连接的错误,可以通过再次建立连接来解决;无错误则可以通过重新为分区选举首领来解决(意思是如果其中一个leader副本挂掉了?此时就不知道连哪个,就需要zookeeper来重新选主?)。KafkaProducer 被配置为自动重试,如果多次重试后仍无法解决问题,则会抛出重试异常。
  • 无法通过重试来解决的错误,比如消息过大对于这类错误,KafkaProducer 不会进行重试,直接抛出异常。

3.3 异步发送

无须等待,通过先注册回调函数。

4. 生产者分区机制

分区使得每个节点能够实现独立的数据写入和读取,通过分区部署在多个 Broker 来实现负载均衡的效果。但在发送消息时,如何知道它是存在topic的哪个分区呢?

4.1 顺序轮询 

消息是均匀的分配给每个 partition,即每个分区存储一次消息,是默认策略。

 //那这个是由图1中的分区器来完成的?它需要记录上一次这个topic被记录到了哪个partition?然后下一次来的时候再轮转?

4.2 随即轮询

可以自己实现, 先计算出该主题总的分区数(topic的配置中有设置),然后随机地返回一个小于它的正整数。

//有个小问题,如果现在有10个brokers,某个topic有3个partition,那么这些partition是需要人手动指定要存储的broker.id吗?还是kafka会负载均衡自己选择?

 效果不如顺序轮询好。如果追求数据的均匀分布,还是使用轮询策略比较好

4.3 按照key保存消息

也叫做 key-ordering 策略,按消息键保序策略。可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的。【同一分区内消息串形处理?】,实现例子:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

5. 发送参数配置 

  • acks:指定了要有多少个分区副本接收消息,生产者才认为消息是写入成功的。此参数对消息丢失的影响较大。值包括0、1、all。
  • buffer.memory: 用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send() 方法调用要么被阻塞,要么抛出异常。

  • compression.type:表示生产者启用何种压缩算法,默认情况下,消息发送时不会被压缩。
  • retries:生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领),在这种情况下,reteis 参数的值决定了生产者可以重发的消息次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者在每次重试之间等待 100ms,这个等待参数可以通过 retry.backoff.ms 进行修改。
  • batch.size:当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次被填满,批次里的所有消息会被发送出去。不过生产者井不一定都会等到批次被填满才发送,任意条数的消息都可能被发送。【这个和分区器里的一个批次大小有关系吗?】
  • max.in.flight.requests.per.connection:指定了生产者在收到服务器响应之前可以发送多少消息,它的值越高,就会占用越多的内存,不过也会提高吞吐量。把它设为1 可以保证消息是按照发送的顺序写入服务器。【有点像滑动窗口】
  • max.request.size:该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。
  • receive.buffer.bytes 和 send.buffer.bytes:Kafka 是基于 TCP 实现的,为了保证可靠的消息传输,这两个参数分别指定了 TCP Socket 接收和发送数据包的缓冲区的大小。如果它们被设置为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值。

 

标签:发送,错误,生产者,分区,kafka,学习,消息,key
From: https://www.cnblogs.com/BlueBlueSea/p/16770269.html

相关文章

  • Flask学习笔记(八)-Flask-Sqlalchemy基本使用详解
    一、环境的安装pipinstallflask-sqlalchemypipinstallpymysql二、基本使用1、最小型应用:对于Flask的应用来说,需要做的就是为Flask实例选择加载配置,然后把S......
  • 【10月】C语言学习第1天
    指针符号&和*&用于指向变量数据位置,用十六进制表示*用于指向变量内存储的值-----------------------------------------函数对变量进行操控:由于函数返回只有一个值,固......
  • OpenGL 学习系列---基本形状的绘制
    在之前的一篇博客中,讲述了​​OpenGL基础绘制流程​​ 及相关的代码,其中关于OpenGL程序编译部分都是可以在其他项目中接着复用的,接下来会讲到如何去绘制其他的基本图元......
  • ​OpenGL 学习系列---坐标系统
    在前面​​绘制基本图形​​中,遇到了很明显的问题,圆形不像圆形,正多边形不像正多边形?就像下面图形一样:不规则的形状好好的正五边形却东倒西歪的,这就是因为我们前面的绘制都是......
  • Markdown语法学习
    #Markdown语法学习##标题一级标题:#+空格+标题名称二级标题:##+空格+标题名称以此类推 ##字体样式**字体加粗***字体倾斜****字体加粗并倾斜***~~字体中间......
  • Markdown语法学习(完成后)
    Markdown语法学习标题一级标题:#+空格+标题名称二级标题:##+空格+标题名称以此类推 字体样式字体加粗字体倾斜字体加粗并倾斜字体中间加删除线 分割线 ......
  • DICOM:DICOM标准学习路线图(初稿)
    https://zssure.blog.csdn.net/article/details/49231303题记:DICOM医学图像处理专栏撰写已有两个年头,积累了近百篇文章。起初只是用于记录自己科研、工作中遇到的疑难问......
  • 2022-2023-1 20221425 《计算机基础与程序设计》第6周学习总结
    学期(如2022-2023-1)学号(如:20221300)《计算机基础与程序设计》第六周学习总结作业信息这个作业属于哪个课程<班级的链接>(如2022-2023-1-计算机基础与程序设计)这......
  • ENVI5.3安装教程(含软件,仅用于学习试用)
    ENVI5.3安装教程(含软件仅用于学习试用)​​ENVI5.3安装包​​​​ENVI5.3安装教程​​ENVI5.3安装包链接:https://pan.baidu.com/s/1sa9V1-R7cOAx0wnfyEMAAw?pwd=ip1d提取码:i......
  • 2022-2023-1 20221419 《计算机基础与程序设计》第6周学习总结
    2022-2023-120221419《计算机基础与程序设计》第6周学习总结作业信息班级:[2022-2023-1-计算机基础与程序设计]https://edu.cnblogs.com/campus/besti/2022-2023-1-CFAP......