首页 > 其他分享 >kafka入门(一):kafka消息消费

kafka入门(一):kafka消息消费

时间:2023-11-18 22:33:26浏览次数:36  
标签:消费 入门 properties static org kafka public String

安装kafka,创建 topic:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka代码示例(一):

主要按照以下步骤:

  • 设置 broker服务器的ip和端口, 设置 消费者群组id

  • 初始化消费者

  • 消费者订阅主题

  • 消费者批量拉取消息

public class KafkaDemo1 {
    public static final String BROKER_LIST = "localhost:9092";
    public static final String TOPIC = "myTopic1";
    public static final String GROUP_ID = "group.demo";

    public static void main(String[] args) {
        consumerRecord();
    }

    public static void consumerRecord() {
        //属性配置
        Properties properties = getProperties(BROKER_LIST, GROUP_ID);
        //消费者初始化
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //消息者订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC));
        //循环
        while (true) {
            //每次拉取 1千条消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("=============> 消费kafka消息:"+ record.value());
            }
        }
    }

    public static Properties getProperties(String brokerList, String groupId) {
        Properties properties = new Properties();
        //序列化
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        //broker服务器的ip和端口,多个用逗号隔开
        properties.put("bootstrap.servers", brokerList);
        //消费者群组id
        properties.put("group.id", groupId);
        return properties;
    }

}


使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
在这里插入图片描述

观察idea 控制台,可以看到 成功消费了消息:

=============> 消费kafka消息:hello kafka

参考资料:

《深入理解kafka 核心设计与实践原理》

标签:消费,入门,properties,static,org,kafka,public,String
From: https://www.cnblogs.com/expiator/p/17841251.html

相关文章

  • Elasticsearch入门
    1、什么是Elasticsearch?Elasticsearch是基于Lucene的Restful的分布式实时全文搜索引擎,每个字段都被索引并可被搜索,可以快速存储、搜索、分析海量的数据。全文检索是指对每一个词建立一个索引,指明该词在文章中出现的次数和位置。当查询时,根据事先建立的索引进行查找,并将查找......
  • rocketmq适合做消费耗时长的任务吗?
    RocketMQ是一个分布式消息队列系统,适用于高吞吐量、可靠性要求高的消息传递场景。虽然RocketMQ不是专门设计为处理消费耗时长的任务的,但在某些情况下也可以用于这样的场景,具体取决于任务的性质和要求。下面我将解释一些相关因素,以帮助你更好地评估RocketMQ是否适合处理消费耗......
  • 大白话说Python+Flask入门(二)
    写在前面笔者技术真的很一般,也许只靠着笨鸟先飞的这种傻瓜坚持,才能在互联网行业侥幸的生存下来吧!为什么这么说?我曾不止一次在某群,看到说我写的东西一点技术含量都没有,而且很没营养,换作一年前的我,也许会怼回去,现在的话,我只是看到了,完事忘记了。早期写文章是为了当笔记用,不会随......
  • 入门c语言--基于c库函数strstr的实现
    #include<stdio.h>#include<assert.h>char*my_strstr(constchar*p1,constchar*p2){ assert(p1&&p2);//检查p1和p2是否为空指针//创建s1,s2来在p1,p2中进行移动,创建指针tmp来保存开始移动时的s1的位置 char*s1=NULL; char*s2=NULL; char*tmp=(char*)p1;//对p1......
  • 路由简单入门
     1地址和视图函数的映射关系---urls.py文件#urlpatterns列表,列表内放了url函数的执行结果#使用很简单,复制一行,改一下第一个参数(正则表达式),第二个参数是视图函数内存地址urlpatterns=[url(r'^admin/',admin.site.urls),#在内部,请求来了,路径匹配成功,内部自动调用......
  • 视图简单入门
    1视图函数views.py函数(可以不放在views中,但是通常放在里面)2视图函数之请求对象 -#1请求方式(GET,POST)浏览器地址栏中发出的请求都是get请求 print(request.method)-#2请求参数get请求这种形式:http://127.0.0.1/index?name=lqz&age=18print(request.G......
  • Python全栈开发从入门到入土【新版】
    【Python初级】【一】计算机基础【补充】计算机五大组成部分【二】编程语言和Python语言介绍【三】Python解释器和Pycharm的安装【补充】Python相关补充【补充】Pycharm相关补充【四】Python语法入门之常量和变量【五】Python基础之垃圾回收机制【六】Python基础之基本......
  • Java零基础入门-字符串
    Java零基础入门-字符串前言Java是一门非常强大的编程语言,在计算机领域拥有广泛的应用。作为Java的入门阶段,掌握字符串的基本概念以及使用方法是非常重要的,它也是Java程序中经常使用的一种数据类型。本文将从Java字符串的定义、创建、内容操作等方面进行详细阐述,给读者带来更加全......
  • kafka安装教程
    检查java8没有就安装java-version安装jdk1.8yum-yinstalljava-1.8.0-openjdk下载kafka(网速很慢)wgethttps://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz解压缩tar-xzfkafka_2.13-3.5.0.tgzcdkafka_2.13-3.5.0后台启动ZooKeeper服务(这里使用kafka里......
  • 数据库入门:掌握MySQL数据库的五大基本操作,轻松驾驭数据世界!
    对数据库进行查询和修改操作的语言叫做SQL(StructuredQueryLanguage,结构化查询语言)。SQL语言是目前广泛使用的关系数据库标准语言,是各种数据库交互方式的基础。在之前的文章中,我们已经掌握了SQL语言的基本概念以及常用的DDL(数据定义)和DML(数据操作)语句。接下来,我们将探讨如何......