创建生产者实例和构建消息之后,就可以开始发送消息了。
发送消息主要有三种模式:发后即忘、同步、异步。
发后即忘:
就是直接调用 生产者的 send方法发送。
发后即完,只管往 kafka中发送消息,而不关心消息是否正确到达。
这种发送方式的性能最高,可靠性也最差。
producer.send(record);
具体代码如下:
public class KafkaDemoProducer {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "myTopic1";
public static void main(String[] args) {
//属性配置
Properties properties = getProperties(BROKER_LIST);
//生产者初始化
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
//发送消息
try {
producer.send(record);
System.out.println("========>producer.send(record).");
} catch (Exception e) {
System.out.println("send error." + e);
}
producer.close();
}
private static Properties getProperties(String brokerList) {
Properties properties = new Properties();
properties.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
properties.put("bootstrap.servers", brokerList);
return properties;
}
}
同步发送:
try {
producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
log.error("send record get error", e);
}
同步发送的方式可靠性最高,要么消息发送成功,要么发生异常。如果发生异常,会catch并处理异常。
同步发送的性能会差一些,需要阻塞等待一条消息发送完,才能发送下一条。
异步发送:
异步发送,就是在 send 方法里指定一下 Callback 的回调函数。
消息发送成功后,会收到成功的回调。参数 metadata ,为发送成功的消息,相关的信息
如果发送失败,也会收到回调,包含失败的异常信息 exception。
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("send onCompletion error." , exception);
} else {
log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
参考资料:
《深入理解Kafka 核心设计与实践原理》
标签:入门,producer,send,kafka,发送,record,properties From: https://www.cnblogs.com/expiator/p/17910050.html