首页 > 系统相关 >Linux 或 Windows 安装 Kafka,示例实现生产与消费消息(一)

Linux 或 Windows 安装 Kafka,示例实现生产与消费消息(一)

时间:2023-01-01 00:22:59浏览次数:46  
标签:示例 Windows zookeeper server kafka topic -- Kafka properties

下载:wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz

  注意:kafka正常运行,必须配置zookeeper,kafka安装包已经包括zookeeper服务

解压:tar -zxvf  kafka_2.12-3.3.1.tgz

修改config 目录下 server.properties文件

修改config 目录下 zookeeper.properties 文件

启动Kafka,kafka2.12目录下运行 zookeeper 和 kafka

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    nohup bin/kafka-server-start.sh config/server.properties &
   
    ./bin/zookeeper-server-start.sh -daemon  config/zookeeper.properties 
    ./bin/kafka-server-start.sh -daemon  config/server.properties 

检查是否启动成功:

  ps -ef|grep kafka  或  lsof -i:2181

创建一个 topic 命名为:my-topic
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-topic

查看已创建的topic信息:
    bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

命令行的工具生产消息:
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
    This is a message

消费工具消费消息:
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

使用Ctrl-C停止生产者和消费者客户端。

【啰嗦一下】:

  若要模拟分布式可以复制多个server.properties 文件,并修改 broker.id,port,log.dirs

  再一一启动时指定不同的server.properties文件启动即可

API地址:https://kafka.apache.org/documentation/#api

Windos 环境启动kafka:

与linux相同的压缩包,解压后修改server.properties 文件,zookeeper.properties文件
    到bin/windows目录下执行cmd 打开两个窗口,再分别执行以下两个命令,具体目录根据自己的文件目录修改
    zookeeper-server-start.bat  D:/ProgramFiles/kafka2.12/config/zookeeper.properties
    kafka-server-start.bat  D:/ProgramFiles/kafka2.12/config/server.properties
    其创建生产消费等命令与linux类似,把sh文件换成bat文件执行命令

生产消息代码:

public class KafkaMyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(props);
            //生产数据
        for (int i = 0; i < 5; i++) {
//            topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!
//            key:键值,value:要发送的数据,数据格式为String类型的。
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("mytopic", Integer.toString(i), "Hello kafka A" + i);
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (Objects.nonNull(e)) {
                        System.out.println("发送消息失败:" + e.getMessage());
                    }
                    if (Objects.nonNull(metadata)) {
                        System.out.println("同步发送消息结果:topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
                    }
                }
            });
        }
        producer.close();
    }
}

消费消息代码:

public class KafkaMyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group1");//组名 不同组名可以重复消费
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("mytopic")); //需要先订阅一个topic,也就是指定消费哪一个topic
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(20));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

 

标签:示例,Windows,zookeeper,server,kafka,topic,--,Kafka,properties
From: https://www.cnblogs.com/zhey/p/17016134.html

相关文章

  • Windows 安装以及配置Nginx
    1、windows下安装Nginx1.1从nginx官网下载相应的安装包:​​http://nginx.org/​​ 1.2建议下载 下载稳定版1.3解压到相应的目录,比如我是e盘然后修改目录名字为nginx......
  • 【好软推荐】Scoop - Windows快速软件安装指南
    在平常生活中如果要安装像git、java、node这些环境的时都需要先去官网下载安装程序,点击安装,之后还需要配置,不仅过程麻烦,而且工具多了之后整理起来也相当不容易,配置也很杂,整......
  • windows 安装mysql数据库
    本文推荐使用 windows安装zip包方式安装mysql数据库。5.7之后的版本没有data目录,懒得找解决方案,直接推荐使用5.6的社区版本。1.下载安装mysql官网,找到对应的目录,下载......
  • 软件安装——Windows gcc安装教程
    Windowsgcc安装教程1、下载MinGWhttps://sourceforge.net/projects/mingw/files/Installer/mingw-get-setup.exe/download2、安装点击继续一直下一步下载完后会打开......
  • lazarus linux/windows简易三层
    Lazarus下特别是linuxaarch64下使用简单的三层控件特别难找,综合已有的代码,在2022年最后一天,写了个简单的三层,其中服务端使用了UNIDAC和RealThinClientSDK,客户端简单封装......
  • 软件安装——Windows7 VmWare Tools
    VMwareWin7    这里有个坑,在安装VMtools的时候,是失败的,我原来以为是镜像的问题,Windows7镜像版本如果比较低,是安装不了VMtools的。在尝试更换高版本Windows7后,我......
  • VS2017: cannot open source file “Windows.h“
    在VS2017中打开一个VC++项目,#include行提示cannotopensourcefile"Windows.h"解决方法:右击Project->Properties->General->WindowsSDKVersion,选择10.0.xxxx......
  • windows本地安装git,并下载开源项目代码到本地
    1下载git安装包打开百度首页,输入git进行搜索,然后点击进入git官网下载对应系统的安装包。官网地址如下:https://git-scm.com/然后呢,你会发现,从官网下载真的很慢,可以复制下载......
  • 第05章 Windows版IntelliJ IDEA内置工具的使用
    第05章Windows版IntelliJIDEA内置工具的使用IntelliJIDEAIntelliJIDEA中内置了非常多的实用工具,日常开发中掌握了这些工具的使用会让开发变得更加顺畅。1.Wind......
  • windows server 2019 2012 server 2022 无线网卡驱动安装报错,无线网卡驱动不能安装, i
    windowsserver2019无线网络服务安装段落无效windowsserver2019无线网卡驱动安装报错,无线网卡驱动不能安装,inf服务安装段落无效 indowsserver2019安装无线驱......