首页 > 其他分享 >怎么使用Kafka?收藏这篇短文就可以了

怎么使用Kafka?收藏这篇短文就可以了

时间:2023-08-26 15:31:41浏览次数:38  
标签:短文 -- 这篇 kafka 消息 Kafka CONFIG properties

〇、前言

便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:

名词 解释
Broker 节点 一个Kafka节点就是一个Broker,一个和多个Broker可以组成一个Kafka集群
Topic 主题 Kafka根据topic对消息进行归类,发布到kafka集群的每套消息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的
Producer 生产者 用于向Kafka中发送消息
Consumer 消费者 从Kafka中获取消息
Consumer Group 消费组 每个Consumer都会归属于一个消费组,一条消息可以同时被多个不同的消费组消费,但是只能被一个消费组中的消费者消费
Partition 分片 物理上的概念,可以将一个topic上的数据拆分为多分放到Partition中,每个Patition内部的消息是有序的。

本篇文章的主要目的就是操作一下Kafka,从直观感受上面使用一下它,而不是让它仅仅存在于我们理论和想象中的认知上。

那么对于这种中间件的操作,我们一般来说普遍会采用两种方式:

方式1】通过bin路径下的脚本指令,在控制台端进行使用操作;<br> 【方式2】通过对jar包的引用,在代码层面上进行使用操作;

在下面章节中,我们就分别针对控制台层面操作代码层面操作这两个方面,对Kafka进行第一次亲密的接触。

一、控制台层面操作

对于Kafka支持多少控制台指令,在其官网(https://kafka.apache.org/documentation/#quickstart)中就已经详细的列举出来了,我们可以很方面的从官网中获得对某个指令的解释和使用说明,如下所示:

同样,在我们的安装了Kafka的bin目录下,也存在着对应这些指令的sh脚本文件, 也是它构成了我们可以非常方便的在控制台这一层面操作Kafka的可能性,如下所示:

虽然指令和脚本文件挺多的,但是我们没有必要从头学一遍,毕竟是第一次操作Kafka,我们只做3件事:创建Topic、通过Producer发送消息和通过Consumer接受消息。

1.1> 创建/查看主题(kafka-topics.sh)

当我们发送消息的时候,主题topic是我们重要的参数之一,我们可以针对不同业务创建不同的topic,从而达到对消息的隔离性。在bin目录下,负责管理Topic的脚本是kafka-topics.sh,我们可以通过对它操作来实现topic的管理。下面,我们就来尝试创建一个名称为“muse”的Topic。

kafka_2.13-3.0.0> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1

Created topic muse.

--bootstrap-server 】待链接到的Kafka服务地址,此处我们指定localhost:9092;<br> 【 --create 】执行创建Topic主题指令;<br> 【 --topic 】指定待创建的主题名称,此处我们指定创建名称为“muse”的topic;<br> 【 --partitions 】指定分区个数,由于我们采用单机模式,即只有1个Broker,所以指定创建1个分区;<br> 【 --replication-factor 】指定创建副本的个数,此处我们指定创建1个副本,即主副本;

创建完主题Topic之后,我们也可以通过 --list指令 查看Kafka下所有主题列表,如下所示:

kafka_2.13-3.0.0> bin/kafka-topics.sh --list --bootstrap-server localhost:9092                

muse

同样的,我们也可以通过 --describe指令 查看刚刚创建的那个名称为“muse”的Topic内的具体描述信息:

kafka_2.13-3.0.0> bin/kafka-topics.sh --describe --topic muse --bootstrap-server localhost:9092                            
Topic: muse	TopicId: iDQpnERjSI2IvaB2kaB2aQ	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
Topic: muse	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

1.2> 生产端(kafka-console-producer.sh)

在上面,我们已经创建好了名称为“muse”的主题Topic了,那么我们就可以尝试向这个Topic发送消息了。此时,我们可以通过使用kafka-console-producer.sh来发送消息,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。具体发送语句如下所示:

kafka_2.13-3.0.0> bin/kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092                
>message1
>message2
>

其中,通过使用--bootstrap-server来指定Kafka服务地址;如果配置了Kafka集群,用逗号分割即可。

1.3> 消费端(kafka-console-consumer.sh)

上面我们虽然向Kafka中发送了两条消息——message1message2,但是由于此时并没有任何消费者Consumer,所以这两个消息也无法被读取。那么,我们可以利用kafka-console-consumer.sh来执行消息消费操作,具体消费指令如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                

我们发现执行了上面的指令,控制台没有输入任何内容,那么,我们切换到Producer段,再发送两条消息——message3message4,我们发现,此时Consumer端有消息输出了,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                

message3
message4

发生上面情况的原因就是,在默认情况下,消费者是从最后一条消息的偏移量+1开始消费,即:Consumer客户端启动之前的消息是不会被消费的。那如果我们想要把Consumer客户端启动之前的消息也获取到,则可以添加--from-beginning参数即可,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning

message1
message2
message3
message4

二、代码层面操作

项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面内嵌了kafka-clients)

2.1> 编写生产者端

2.1.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

发出消息持久化机制参数

properties.put(ProducerConfig.ACKS_CONFIG, "1");

ACKS_CONFIG的类型有如下3种:

【acks=0】表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。<br> 【acks=1】表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。<br> 【acks=-1/all】需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。

配置失败重试机制

properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次<br> properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

配置缓存相关信息

Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。<br> kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。<br> LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。

配置key和value的序列化实现类

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

2.1.2> 同步消息发送

同步消息发送代码如下所示

2.1.3> 异步消息发送

2.2> 编写消费者端

2.2.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

配置消费组

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");

offset的重置策略

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");<br> 【解释】offset的重置策略——例如:创建一个新的消费组,offset是不存在的,如何对offset赋值消费。<br> latest:默认值,只消费自己启动之后发送到主题的消息。<br> earliest:第一次从头开始消费,以后按照消费offset记录继续消费。

心跳相关配置

/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);

poll相关配置

/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

配置key和value的反序列化实现类

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

2.2.2> 自动提交offset

自动提交offset

当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。<br> 自动提交会出现丢失消息的情况。 <br> 因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。

2.2.3> 手动提交offset

手动提交offset

当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

标签:短文,--,这篇,kafka,消息,Kafka,CONFIG,properties
From: https://blog.51cto.com/u_15003301/7244824

相关文章

  • 20,000+ 字,彻底搞懂 Kafka!
    1、为什么有消息系统1、解耦合2、异步处理例如电商平台,秒杀活动。一般流程会分为:风险控制库存锁定生成订单短信通知更新数据通过消息系统将秒杀活动业务拆分开,将不急需处理的业务放在后面慢慢处理;流程改为:风险控制库存锁定消息系统生成订单短信通知更新数据......
  • Kafka生产问题总结及性能优化实践
    Kafka可视化管理工具kafka-manager安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html 线上环境规划 JVM参数设置kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置,参看JVM调优专题修改bin/kafka-start-server.sh中的jvm设置,假设机器是32G内......
  • kafka名词解释
    Apachekafka是消息中间件的一种。举个例子:生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,......
  • 网易一面:单节点2000Wtps,Kafka怎么做的?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • 【知识整理】基于Springboot的Kafka消费者动态操作
    基于Springboot的Kafka消费者动态操作1.问题​ 在基于Springboot开发Kafka相关业务时,遇到如下业务场景:执行部分操作时,如停止服务替换镜像、执行特殊业务处理等,需要先停止Consumer接收Kafka消息,待处理完成后再开启Consumer接续接收Kafka消息为并发消费Kafka消息,可通过配置sp......
  • kafka设计原理详解
      Kafka核心总控制器Controller在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(KafkaController),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集......
  • kafka发送超大消息
    kafka发送超大消息设置 最近开发一cdc框架,为了测试极端情况,需要kafka传递100万条数据过去,1个G左右,由于其他环节限制,不便进行拆包(注:测下来,大包走kafka不一定性能更好,甚至可能更低)。测试百万以上的变更数据时,报消息超过kafkabroker允许的最大值,因此需要修改如下参数,......
  • Kafka快速实战以及基本原理详解
     这一部分主要是接触Kafka,并熟悉Kafka的使用方式。快速熟练的搭建kafka服务,对于快速验证一些基于Kafka的解决方案,也是非常有用的。一、Kafka介绍​ChatGPT对于ApacheKafka的介绍:ApacheKafka是一个分布式流处理平台,最初由LinkedIn开发并于2011年开源。它主要用于解决大规模......
  • Kafka入门到精通学习路线图 技术文章
    Kafka入门到精通学习路线图技术文章Kafka是一个分布式流式处理平台,被广泛应用于大规模数据处理和实时数据流分析的场景中。以下是一个从入门到精通的学习路线图,帮助你系统地学习和掌握Kafka的相关技术。1.学习Kafka的概念和基础知识:-了解Kafka的起源和背景,掌握Kafka的基本概......
  • 初识kafka,先了解这些就够了
    一、了解Kafka中的相关概念MQ作为消息中间件,对于我们来说,已经并不陌生了,那么,由于Kafka它在众多的MQ间是非常火热的,那么必然也是我们需要着重关注的中间件之一了,为了更加清晰的了解Kafka,我们先从Kafka的体系结构入手,看看大体上都包含哪些东西。具体请见下图所示:其中有一些我们很......