首页 > 其他分享 >Kafka入门学习

Kafka入门学习

时间:2023-06-15 17:13:19浏览次数:53  
标签:入门 -- kafka 学习 消息 Kafka CONFIG properties

kafka概述
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/

名词解释

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
Docker安装zookeeper

  1. docker pull zookeeper:3.4.14
  2. docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    Docker安装kafka
  3. docker pull wurstmeister/kafka:2.12-2.3.1
  4. docker run -d --name kafka \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \ --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \ --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \ --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \ --net=host wurstmeister/kafka:2.12-2.3.1

kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息
  1. 创建kafka-demo项目,导入依赖
点击查看代码
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>
2. 生产者发送消息
点击查看代码
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }
}
3. 消费者接收消息
点击查看代码
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }
    }
}
总结
  • 生产者发送消息,多个消费者订阅同一个主题,只能有一个消费者收到消息(一对一)
  • 生产者发送消息,多个消费者订阅同一个主题,所有消费者都能收到消息(一对多)

标签:入门,--,kafka,学习,消息,Kafka,CONFIG,properties
From: https://www.cnblogs.com/wzh-Official/p/17483435.html

相关文章

  • 碎片化学习前端之JavaScript(JS 压缩图片)
    前言图片压缩是前端开发中常见的需求,目前前端主流的解决方案有:Canvas手动实现压缩以及第三方库压缩两种方案。Canvas手动实现压缩Canvas实现压缩主要原理是:将图片绘制到canvas上,然后通过调整canvas的宽高来实现压缩。functioncompressImage(file,maxWidth,maxHeight......
  • [转][Java]入门设置
    1、JDK使用1.8_3712、下载公司的settings.xml文件,覆盖到X:\maven\conf目录下3、修改settings.xml中的localRepository配置为本机资源位置4、在IDEA里设置JDK版本5、运行项目,会使用1XXX端口,公司自有应用端口范围:10000~199996、通过http://localhost:1XX......
  • springboot集成kafka
    导入spring-kafka依赖信息点击查看代码<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--kafkfa-->......
  • echarts飞行航线图学习
    第一次接触这个理解可能不一定正确后面如果我发现有问题会更正1.npm安装echarts npminstallecharts--saveimport*asechartsfrom'echarts'//这里我在data里定义了一个也可不定义根据使用方法灵活调整this.myChart=echarts.init(this.$refs......
  • ctfpwn-堆入门之uaf(新手向)
    例题:程序保护全开,ida打开int__cdeclmain(intargc,constchar**argv,constchar**envp){init(argc,argv,envp);while(1){menu();switch((unsignedint)read_int()){case1u:new_book();break;case2u:......
  • 解析 Postman Newman:从入门到精通
    PostmanNewman是什么?PostmanNewman是一个CLI(命令行界面)工具,可以使用它来运行Postman中的集合(Collection)和环境(Environment)进行自动化测试。它是Postman的命令行CollectionRunner,能够直接从命令行运行Postman集合。使用Newman可以测试API的功能、性能、可靠性和安......
  • Rust学习笔记——基于官网和Rust语言圣经
    安装rust安装1、官网https://www.rust-lang.org/zh-CN/learn/get-started2、运行后选择1会下载VisualStdio;选择2表示你是高级用户或企业用户;选择3即是采用MinGW编译选择1选1表示默认当前配置安装,选2表示自定义这些配置,选3表示取消安装更新与卸载Rust更新Rustrustupup......
  • kafka消费模式
    消费者消费方式:订阅与分配1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给相同group下的不同consumer。2、KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与......
  • Kafka概述
    定义Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。Kafka最新定义:Kafka是一个开源的分布式事件......
  • Kafka安装
    环境说明在安装Kafka之前,请确保已经安装了JDK和Zookeeper。运行Kafka,首先保证Java环境能正常使用,可执java-version查看。安装JDK环境下载jdk安装包curlhttps://download.oracle.com/java/20/latest/jdk-20_linux-x64_bin.tar.gz-ojdk-20_linux-x64_bin.tar.gz新建JDK......