首页 > 其他分享 >Kafka 上手实战

Kafka 上手实战

时间:2023-02-24 00:33:07浏览次数:32  
标签:实战 重试 send Kafka 发送 Future 异常 KafkaProducer

参数配置

  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的 broker 地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的 broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka 集群上。

  • key.serializervalue.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。生产者使用的 KafkaProducer<String, String>和 ProducerRecord<String, String> 中的泛型 <String, String> 对应的就是消息中 key 和 value 的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节数组。key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如org.apache.kafka.common.serialization.StringSerializer,单单指定 StringSerializer 是错误的。

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进行池化来供其他线程调用。

消息的发送

针对不同的消息,需要构建不同的 ProducerRecord 对象,在实际应用中创建 ProducerRecord 对象是一个非常频繁的动作。

创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

发后即忘,它只管往 Kafka 中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

KafkaProducer 的 send()方法并非是 void 类型,而是 Future类型,send()方法有2个重载方法,具体定义如下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record)
public Future<RecordMetadata> send(ProducerRecord<K, V> record, 
                                   Callback callback)

要实现同步的发送方式,可以利用返回的 Future 对象实现,示例如下:

try {
    producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
    e.printStackTrace();
}

实际上 send() 方法本身就是异步的,send() 方法返回的 Future 对象可以使调用方稍后获得发送的结果。示例中在执行 send() 方法之后直接链式调用了 get() 方法来阻塞等待 Kafka 的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

也可以在执行完 send() 方法之后不直接调用 get() 方法,比如下面的一种同步发送方式的实现:

try {
    Future<RecordMetadata> future = producer.send(record);
    RecordMetadata metadata = future.get();
    System.out.println(metadata.topic() + "-" +
            metadata.partition() + ":" + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
    e.printStackTrace();
}

这样可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应用代码中需要这些信息,则可以使用这个方式。如果不需要,则直接采用 producer.send(record).get() 的方式更省事。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然 KafkaProducer.send() 方法的返回值是一个 Future 类型的对象,那么完全可以用 Java 语言层面的技巧来丰富应用的实现,比如使用 Future 中的 get(long timeout, TimeUnit unit) 方法实现可超时的阻塞。

KafkaProducer 中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如 NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如 LeaderNotAvailableException 表示分区的 leader 副本不可用,这个异常通常发生在 leader 副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 RecordTooLargeException 异常,暗示了所发送的消息太大,KafkaProducer 对此不会进行任何重试,直接抛出异常。

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries 参数的默认值为0,配置方式参考如下:

props.put(ProducerConfig.RETRIES_CONFIG, 10);

示例中配置了10次重试。如果重试了10次之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条

我们再来了解一下异步发送的方式,一般是在 send() 方法里指定一个 Callback 的回调函数,Kafka 在返回响应时调用该函数来实现异步的发送确认。

标签:实战,重试,send,Kafka,发送,Future,异常,KafkaProducer
From: https://www.cnblogs.com/fxh0707/p/17149969.html

相关文章

  • React从入门到实战- 企业级实战项目-宜居三
    2019年最新React从入门到实战(带React企业级实战项目-宜居)(ReactHook新特性&)ReactHook新特性&(1.React新特性StateHook&)1.React新特性StateHook&App.jsimportR......
  • 想完全弄懂kafka?看这篇就够了
    有人说世界上有三个伟大的发明:火,轮子,以及Kafka。发展到现在,ApacheKafka无疑是很成功的,Confluent公司曾表示世界五百强中有三分之一的企业在使用Kafka。今天便和大家分......
  • 实战项目-美多商城(二)用户注册,登录
    图片验证码依赖Pillow库,先安装pipinstallPillowscripts(项目根目录),新建captcha包里面再新建fonts目录,用来存放字体文件-actionj.ttf-Arial.ttf-Geor......
  • kafka常用操作
    如果把一个项目/微服务当成一个消费组,那么一个topic可能在多个消费组【一个topic被多个项目订阅】,一个消费组可能有多个topic。【一个项目订阅了多个topic】。一个消费组内......
  • chatGPT帮助开发实战解答问题和反思
     问题来自.Net开发群友 问题我想做一个自动生成单据号的功能,但是在EFCORE里没有行级锁,请有什么等价方案吗? ChatGPT回答在EFCore中确实没有提供行级锁(row-le......
  • 基于SpringBoot实现操作GaussDB(DWS)的项目实战
    摘要:本文就使用springboot结合mybatisplus在项目中实现对GaussDB(DWS)的增删改查操作。本文分享自华为云社区《基于SpringBoot实现操作GaussDB(DWS)的项目实战【玩转PB级......
  • 实战100-黑衣路人:普通人的出路在哪里?再不懂点实战技術就晚了!
    实战100-黑衣路人:普通人的出路在哪里?再不懂点实战技術就晚了!  普通人的出路在哪里?人人都想拥有数不完的銀子,于是都在忙着找出路,可是出路在哪里?普通人怎么突破限制?大部......
  • RabbitMQ 延迟消息实战
    RabbitMQ延迟消息实战现实生活中有一些场景需要延迟或在特定时间发送消息,例如智能热水器需要30分钟后打开,未支付的订单或发送短信、电子邮件和推送通知下午2:00开始......
  • 读Java实战(第二版)笔记18_基于Lambda的领域特定语言
    1. 编程语言1.1. 仍然是一门语言1.1.1. 以最清晰、最容易理解的方式传递信息1.2. 代码的易读性和易理解性在软件中的重要性甚至更胜一筹2. 领域特定语言DSL2.1.......
  • Kafka事务
    producer可能给多个topic,多个partition发送消息,这些消息组成一个事务,这些消息需要对consumer同时可见或者同时不可见。Kafka事务需要在producer端处理,consumer端不需要做特......