首页 > 其他分享 >kafka入门(四):kafka生产者发送消息

kafka入门(四):kafka生产者发送消息

时间:2023-12-17 22:56:01浏览次数:26  
标签:入门 producer send kafka 发送 record properties

创建生产者实例和构建消息之后,就可以开始发送消息了。

发送消息主要有三种模式:发后即忘、同步、异步。

发后即忘:

就是直接调用 生产者的 send方法发送。

发后即完,只管往 kafka中发送消息,而不关心消息是否正确到达。

这种发送方式的性能最高,可靠性也最差。

producer.send(record);

具体代码如下:

public class KafkaDemoProducer {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";

    public static void main(String[] args) {
        //属性配置
        Properties properties = getProperties(BROKER_LIST);
        //生产者初始化
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "hello kafka");
        //发送消息
        try {
            producer.send(record);
            System.out.println("========>producer.send(record).");
        } catch (Exception e) {
            System.out.println("send error." + e);
        }
        producer.close();
    }


    private static Properties getProperties(String brokerList) {
        Properties properties = new Properties();
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", brokerList);
        return properties;
    }

}

同步发送:

try {
	producer.send(record).get();
} catch (ExecutionException | InterruptedException e) {
	log.error("send record get error", e);
}

同步发送的方式可靠性最高,要么消息发送成功,要么发生异常。如果发生异常,会catch并处理异常。

同步发送的性能会差一些,需要阻塞等待一条消息发送完,才能发送下一条。

异步发送:

异步发送,就是在 send 方法里指定一下 Callback 的回调函数。

消息发送成功后,会收到成功的回调。参数 metadata ,为发送成功的消息,相关的信息

如果发送失败,也会收到回调,包含失败的异常信息 exception。

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
    	if (exception != null) {
    		log.error("send onCompletion error." , exception);
   		} else {
  			log.info(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
  	     }
    }
});

参考资料:

《深入理解Kafka 核心设计与实践原理》

标签:入门,producer,send,kafka,发送,record,properties
From: https://www.cnblogs.com/expiator/p/17910050.html

相关文章

  • NodeRed入门案例,在控制台输出Hello World!
    1、打开NodeRed软件,将inject控件拖入到流程绘制区域,因为我们需要提供HelloWorld!的输入内容。2、将debug控件拖入到流程绘制区域,因为我们要将输入到Inject控件输入的内容输出到控制台(调试窗口)。3、将控件连接并部署。4、测试编写的流程。5、点击inject控件的左侧按钮触......
  • 2、SpringBoot2之入门案例
    2.1、创建Maven工程2.1.1、创建空项目2.1.2、设置项目名称和路径2.1.3、设置项目sdk2.1.4、项目初始状态注意:需要关闭项目再重新打开,才能看到SpringBoot-Part文件夹2.1.5、配置maven2.1.6、创建module右击SpringBoot-Part文件夹,创建新module选择maven配......
  • C# TcpClient异常 由于套接字没有连接并且(当使用一个 sendto 调用发送数据报套接字时
    //C#TcpClient抛出异常Exceptionthrown:'System.Net.Sockets.SocketException'inSystem.dll由于套接字没有连接并且(当使用一个sendto调用发送数据报套接字时)没有提供地址,发送或接收数据的请求没有被接受。经查,winsock错误号:10057网上看到一些文章,通常是收发的时候,错误......
  • 云原生基础入门概念知识学习
    云原生的概念当谈及现代软件开发和IT基础架构时,云原生成为了一个备受关注的话题。它代表了一种软件架构和开发方法,旨在充分利用云计算环境的优势,以提高应用程序的可靠性、可扩展性和灵活性。在本文中,我们将深入探讨云原生的基本概念、核心技术以及为何它成为现代应用开发的首选。......
  • HydroOJ 从入门到入土(7)Hydro自带数据生成器使用说明(>=4.10.1)
    Hydro更新了一个新功能,可以直接用自带的数据生成器,在线生成数据,简单记录一下使用方法目录1.文件准备2.使用步骤3.注意事项4.文件模版1.文件准备gen.py(数据生成器,后附模版)std.cpp(标准程序,后附模版)文件名随意,其他的类型应该也行,不过没试.2.使用步......
  • 【python入门之OS模块介绍】---OS模块介绍
    title:【python入门之OS模块介绍】---OS模块介绍date:2023-12-1615:54:06updated:2023-12-1616:20:00description:【python入门之OS模块介绍】---OS模块介绍cover:https://home.cnblogs.com/u/dream-ze/【一】OS模块的介绍os模块是Python编程语言中......
  • ctfshow:misc入门+buuctf:misc
    misc34. 根据题干,得知宽度要大于900,又看wp限制为1200,我就倒着试,试到了1123就可以得出flag了 misc35. 同理上面,改宽度就好misc36. 一样的misc37. 点开是动态的,但其实仔细看可以看见部分flag的,就放进gif里面,一帧一帧看,得出之后按顺序拼凑出来就可以了misc38. ......
  • helm部署Kafka集群
    1.准备文件1.1.创建命令空间kubectlcreatenskafka1.1.helm包拉取本地#添加bitnami仓库helmrepoaddbitnamihttps://charts.bitnami.com/bitnami#查询charthelmsearchrepobitnami#拉取zookeeperhelmpullbitnami/zookeeper#解压tarzxvfzookeeper-12.0.......
  • QT 入门之 搭建环境 2023年 踩坑记
    QT5.13.1-安装MSVC2017-Windows_msvc2017单独安装-CSDN博客不懂的可以参考以上文章 安装vs2017/2015toolkit是为了编译器(有些公司项目用vs2015写的)安装win10sdk是为了调试器 记得安装完win10sdk后切换到QT选项中的编译器autodetected 后重启一下q......
  • Kali Linux:从入门到掌握局域网攻击 (nmap)
    kaliLinux中使用nmap。使用nmap的之一步是登录KaliLinux,如果需要,就启动一个图形会话(本系列的之一篇文章安装了KaliLinux的Enlightenment桌面环境)。在烂毕安装过程中,安装程序将提示用户输入用来登录的“root”用户和密码。一旦登录到KaliLinux机器,使用慧历备命令s......