首页 > 其他分享 >Kafka:Producer异步发送与回调

Kafka:Producer异步发送与回调

时间:2022-11-09 19:02:31浏览次数:32  
标签:异步 ProducerRecord Producer producer kafka apache org Kafka


​pom.xml​​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>

测试代码:

package com.kaven.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerTest {

private static final String[] MESSAGE = new String[]{"你好啊", "现在正在测试Kafka的Producer", "先溜了"};

public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}

public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < MESSAGE.length; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);
// 异步发送
producer.send(producerRecord);
}
// 要关闭Producer实例
producer.close();
}

public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

return new KafkaProducer<>(properties);
}
}

使用​​kafka-console-consumer.sh​​​脚本来获取​​Producer​​发送的消息。

[root@192 kafka_2.13-3.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092  --from-beginning --topic new-topic-user
你好啊
现在正在测试Kafka的Producer
先溜了

​KEY​​​和​​VALUE​​​的序列化器类可以在如下图所示的包中找到,​​Kafka​​提供了常用的序列化器。

Kafka:Producer异步发送与回调_maven


​Producer​​​异步发送消息,可以通过​​get​​方法来阻塞它。

// 异步发送并阻塞
producer.send(producerRecord).get();

使用回调。

public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < MESSAGE.length; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}

输出:

partition: 1 offset: 28
partition: 2 offset: 21
partition: 0 offset: 22

​Topic​​​有三个分区,可见每个分区存储了一条消息。​​ProducerRecord​​​封装了要发送到​​Kafka​​​的消息,包括消息需要发送到的​​Topic​​名称、可选的分区号、可选的键、值等。

指定分区号(​​ProducerRecord​​​构造函数中的​​1​​):

ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
1,
String.valueOf(System.currentTimeMillis()),
MESSAGE[i]
);

输出:

partition: 1 offset: 29
partition: 1 offset: 30
partition: 1 offset: 31

​Producer​​异步发送与回调就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


标签:异步,ProducerRecord,Producer,producer,kafka,apache,org,Kafka
From: https://blog.51cto.com/u_15870611/5838298

相关文章

  • Kafka Java客户端Stream API
    KafkaStream概念及初识高层架构图KafkaStream是ApacheKafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,KafkaStre......
  • kafka 客户端之producer API发送消息(自定义负载均衡实现)与负载均衡调用源码分析
    背景:​​kafka客户端之producerAPI发送消息以及简单源码分析​​已经介绍了producer的异步发送和异步回调发送消息的基本使用,但是都是使用内置的负载均衡策略。kafka的负......
  • kafka 客户端之producer API发送消息以及简单源码分析
    背景:我使用docker-compose搭建的kafka服务​kafka的简单介绍以及docker-compose部署单主机Kafka集群​​KafkaAPI简单介绍kafka除了用于管理和管理任务的命令行工具,Kafka......
  • kafka Java客户端之 consumer API 消费消息
    背景:我使用docker-compose搭建的kafka服务​kafka的简单介绍以及docker-compose部署单主机Kafka集群​​使用consumerAPI消费指定Topic里面的消息首先我们需要使用AdminA......
  • 同步与异步;阻塞与非阻塞
    阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态,是对客户端说的同步和异步是通信机制,是对服务端说的拿餐厅吃饭举例:同步:客人(客户端)去餐厅(服务端)吃饭,点了一杯饮料......
  • 5种常见的异步编程的方法
    1、回调函数/*利用回调函数执行异步操作*/getCallBackData(callback){//把函数作为参数传递进去setTimeout(()=>{letdata='thisiscallbackdata';......
  • php kafka的使用
    生产者<?php$conf=newRdKafka\Conf();$conf->setDrMsgCb(function($kafka,$message){file_put_contents("./dr_cb.log",var_export($message,true).PHP_EO......
  • 微信小程序中使用Async-await方法异步请求变为同步请求方法
    微信小程序中有些Api是异步的,无法直接进行同步处理。例如:wx.request、wx.showToast、wx.showLoading等。如果需要同步处理,可以使用如下方法:注意:Async-await方法属于ES7......
  • 纯手写线程池+async注解实现异步任务
    spring整合多线程---@Async注解基本配置启动添加注解@SpringBootApplication@EnableAsyncpublic class Demo000Application {    public static void m......
  • Springboot Async异步扩展使用 结合 CompletableFuture
    前言很早前,出过一篇介绍springboot怎么使用异步线程的文章(如果你还未了解异步的使用,可以先看看这篇)《SpringBoot最简单的使用异步线程案例@Async》:然后近期有些小伙伴使用......