首页 > 其他分享 >怎么使用Kafka?收藏这篇短文就可以了

怎么使用Kafka?收藏这篇短文就可以了

时间:2023-11-10 16:04:05浏览次数:36  
标签:短文 -- 这篇 kafka 消息 Kafka CONFIG properties

〇、前言

便于大家对本章内容的理解,我重新整理了一下Kafka中的部分重要概念,以表格的方式呈现出来,请见下表所示:

名词

解释

Broker 节点

一个Kafka节点就是一个Broker,一个和多个Broker可以组成一个Kafka集群

Topic 主题

Kafka根据topic对消息进行归类,发布到kafka集群的每套消息都需要指定一个topic,topic是一个逻辑概念,物理上是不存在的

Producer 生产者

用于向Kafka中发送消息

Consumer 消费者

从Kafka中获取消息

Consumer Group 消费组

每个Consumer都会归属于一个消费组,一条消息可以同时被多个不同的消费组消费,但是只能被一个消费组中的消费者消费

Partition 分片

物理上的概念,可以将一个topic上的数据拆分为多分放到Partition中,每个Patition内部的消息是有序的。

本篇文章的主要目的就是操作一下Kafka,从直观感受上面使用一下它,而不是让它仅仅存在于我们理论和想象中的认知上。

那么对于这种中间件的操作,我们一般来说普遍会采用两种方式:

方式1】通过bin路径下的脚本指令,在控制台端进行使用操作;
方式2】通过对jar包的引用,在代码层面上进行使用操作;

在下面章节中,我们就分别针对控制台层面操作代码层面操作这两个方面,对Kafka进行第一次亲密的接触。

一、控制台层面操作

对于Kafka支持多少控制台指令,在其官网(https://kafka.apache.org/documentation/#quickstart)中就已经详细的列举出来了,我们可以很方面的从官网中获得对某个指令的解释和使用说明,如下所示:

怎么使用Kafka?收藏这篇短文就可以了_java

同样,在我们的安装了Kafka的bin目录下,也存在着对应这些指令的sh脚本文件, 也是它构成了我们可以非常方便的在控制台这一层面操作Kafka的可能性,如下所示:

怎么使用Kafka?收藏这篇短文就可以了_java_02

虽然指令和脚本文件挺多的,但是我们没有必要从头学一遍,毕竟是第一次操作Kafka,我们只做3件事:创建Topic、通过Producer发送消息和通过Consumer接受消息。

1.1> 创建/查看主题(kafka-topics.sh)

当我们发送消息的时候,主题topic是我们重要的参数之一,我们可以针对不同业务创建不同的topic,从而达到对消息的隔离性。在bin目录下,负责管理Topic的脚本是kafka-topics.sh,我们可以通过对它操作来实现topic的管理。下面,我们就来尝试创建一个名称为“muse”的Topic。

kafka_2.13-3.0.0> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic muse --partitions 1 --replication-factor 1

Created topic muse.

--bootstrap-server 】待链接到的Kafka服务地址,此处我们指定localhost:9092
--create 】执行创建Topic主题指令;
--topic 】指定待创建的主题名称,此处我们指定创建名称为“muse”的topic;
--partitions 】指定分区个数,由于我们采用单机模式,即只有1个Broker,所以指定创建1个分区
--replication-factor 】指定创建副本的个数,此处我们指定创建1个副本,即主副本;

创建完主题Topic之后,我们也可以通过 --list指令 查看Kafka下所有主题列表,如下所示:

kafka_2.13-3.0.0> bin/kafka-topics.sh --list --bootstrap-server localhost:9092                

muse

同样的,我们也可以通过 --describe指令 查看刚刚创建的那个名称为“muse”的Topic内的具体描述信息:

kafka_2.13-3.0.0> bin/kafka-topics.sh --describe --topic muse --bootstrap-server localhost:9092                            
Topic: muse	TopicId: iDQpnERjSI2IvaB2kaB2aQ	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824
Topic: muse	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

1.2> 生产端(kafka-console-producer.sh)

在上面,我们已经创建好了名称为“muse”的主题Topic了,那么我们就可以尝试向这个Topic发送消息了。此时,我们可以通过使用kafka-console-producer.sh来发送消息,它可以从本地文件中读取内容,或者我们也可以从命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一行都会被当做一个独立的消息。具体发送语句如下所示:

kafka_2.13-3.0.0> bin/kafka-console-producer.sh --topic muse --bootstrap-server localhost:9092                
>message1
>message2
>

其中,通过使用--bootstrap-server来指定Kafka服务地址;如果配置了Kafka集群,用逗号分割即可。

1.3> 消费端(kafka-console-consumer.sh)

上面我们虽然向Kafka中发送了两条消息——message1message2,但是由于此时并没有任何消费者Consumer,所以这两个消息也无法被读取。那么,我们可以利用kafka-console-consumer.sh来执行消息消费操作,具体消费指令如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092

我们发现执行了上面的指令,控制台没有输入任何内容,那么,我们切换到Producer段,再发送两条消息——message3message4,我们发现,此时Consumer端有消息输出了,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092                

message3
message4

发生上面情况的原因就是,在默认情况下,消费者是从最后一条消息的偏移量+1开始消费,即:Consumer客户端启动之前的消息是不会被消费的。那如果我们想要把Consumer客户端启动之前的消息也获取到,则可以添加--from-beginning参数即可,如下所示:

kafka_2.13-3.0.0> bin/kafka-console-consumer.sh --topic muse --bootstrap-server localhost:9092 --from-beginning

message1
message2
message3
message4

二、代码层面操作

项目中引入kafka-clients的依赖(也可以直接引入spring-kafka的依赖,里面内嵌了kafka-clients)

怎么使用Kafka?收藏这篇短文就可以了_kafka_03

2.1> 编写生产者端

2.1.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

发出消息持久化机制参数

properties.put(ProducerConfig.ACKS_CONFIG, "1");

ACKS_CONFIG的类型有如下3种:

【acks=0】表示producer不需要等待任何broker确认收到消息的ACK回复,就可以继续发送下一条消息。性能最高,但是最容易丢失消息。
【acks=1】表示至少等待leader已经成功将数据写入本地log,但是不需要等待所有follower都写入成功,就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息就会丢失。
【acks=-1/all】需要等待所有min.insync.replicas(默认为1,推荐配置>=2)这个参数配置的副本个数都成功写入日志。这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。

配置失败重试机制

properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300); // 重试间隔300ms

配置缓存相关信息

Producer的消息会先发送到本地缓冲区(BUFFER_MEMORY_CONFIG),而不是发送一次消息连接一次kafka。
kafka本地线程会从缓冲区去取数据(BATCH_SIZE_CONFIG),然后批量发送到Broker,即:一个批次满足16KB就会发送出去。
LINGER_MS_CONFIG的默认值为0,表示消息必须立即被发送,但这样会影响性能。 设置10ms也就是说Producer消息发送完后会进入本地的batch中;如果10ms内,这个batch满足了16KB,那么就会随着batch一起被发送出去。如果10ms内,batch没满,那么也必须要把消息发送出去,不能让消息的发送延迟时间太长。

配置key和value的序列化实现类

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

2.1.2> 同步消息发送

怎么使用Kafka?收藏这篇短文就可以了_java_04

同步消息发送代码如下所示

怎么使用Kafka?收藏这篇短文就可以了_Kafka_05

2.1.3> 异步消息发送

怎么使用Kafka?收藏这篇短文就可以了_面试_06

2.2> 编写消费者端

2.2.1> 初始化配置

创建配置对象Properties

Properties properties = new Properties();

配置kafka的Broker列表

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095");

配置消费组

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "museGroup");

offset的重置策略

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
【解释】offset的重置策略——例如:创建一个新的消费组,offset是不存在的,如何对offset赋值消费。
latest:默认值,只消费自己启动之后发送到主题的消息。
earliest:第一次从头开始消费,以后按照消费offset记录继续消费。

心跳相关配置

/** Consumer给Broker发送心跳的时间间隔 */
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);

/** 如果超过10秒没有接收到消费者的心跳,则会把消费者踢出消费组,然后重新进行rebalance操作,把分区分配给其他消费者 */
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10*1000);

poll相关配置

/** 一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置 */
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

/** 如果两次poll的时间超出了30秒的时间间隔,kafka会认为整个Consumer的消费能力太弱,会将它踢出消费组。将分区分配给其他消费者 */
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30*1000);

配置key和value的反序列化实现类

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

2.2.2> 自动提交offset

怎么使用Kafka?收藏这篇短文就可以了_java_07

自动提交offset

当消费者向Broker的log中poll到消息后,默认情况下,会向broker中名称为“__consumer_offsets”的Topic发送offset偏移量。
自动提交会出现丢失消息的情况
因为如果Consumer还没消费完poll下来的消息就自动提交了偏移量,那么此时如果Consumer挂掉了,那么下一个消费者会从已经提交的offset的下一个位置开始消费消息。那么之前没有被消费的消息就丢失了。

怎么使用Kafka?收藏这篇短文就可以了_Kafka_08

2.2.3> 手动提交offset

手动提交offset

当消费者从kafka的Broker日志文件中poll到消息并且消费完毕之后。再手动提交当前的offset。

怎么使用Kafka?收藏这篇短文就可以了_java_09

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

标签:短文,--,这篇,kafka,消息,Kafka,CONFIG,properties
From: https://blog.51cto.com/muse/8303744

相关文章

  • kafka复习:(2)客户端发送消息并异步处理返回结果
    packagecom.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.time.Duration;importjava.util.Properties;importjava.util.concurrent.E......
  • Kafka入门
    Kafka定义Kafka是一个分布式的流处理平台,它具有以下特性:磁盘保存数据伸缩性术语生产者生产者是消息的创造者,可以指定Topic,Partion,Key,Value,并将消息发送到Kafka集群中的Broker。消费者消费者负责从Kafka集群中读取消息。需要设置偏移量一个分区里面,每个消息的偏移......
  • Kafka JNDI 注入分析(CVE-2023-25194)
    ApacheKafkaClientsJndiInjection漏洞描述ApacheKafka是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。KafkaConnect是一种用于在kafka和其他系统之间可扩展、可靠的流式传输数据的工具。攻击者可以利用基于SASLJAAS配置和SASL协议的任意Kafka......
  • Kafka队列
    ......
  • Introducing the core concepts of Kafka
    IntroductionI havelearntthekafkasince5years,IbelieveIlearndsomthing,Itisontimeforimprovingenglish.SoIdecidedtopickupmyblogs,towritingsomeconceptsofkafkaforconsolidatingmemory.Bytheway, makingmyenglishbetter.How......
  • kafka第三天学习笔记
    在第三天学习Kafka中,你可能会遇到一些关于Kafka的核心概念和特性的深入讨论。以下是一些可能的学习点:Kafka的设计理念:Kafka的设计理念是“发布-订阅”模型,允许消费者根据其需求从多个生产者那里接收消息。这种模型使得Kafka能够以高吞吐量和可扩展的方式处理实时数据流。Ka......
  • kafka第二天学习笔记
    第二天学习Kafka,我们继续深入了解这个分布式流处理平台的核心概念和功能。以下是一些重要的知识点和概念:Kafka的消费者组:消费者组是多个消费者实例的组合,可以共同消费一个topic中的消息。消费者组中的每个消费者会均匀分配topic中的消息,实现负载均衡和高可用性。Kafka的分区策略:当......
  • Spring Kafka: UnknownHostException: 34bcfcc207e0
    参考:https://stackoverflow.com/questions/69527813/spring-kafka-unknownhostexception-34bcfcc207e0我遇到的问题和@AdánEscobar是一样的。在SpringBoot整合kafka的时候日志报了SpringKafka:UnknownHostException:34bcfcc207e0,34bcfcc207e0经过排查是容器的ID。解决......
  • kafka配置-代码配置篇
    KafkaProducerConfig@Configuration@EnableKafkapublicclassKafkaProducerConfig{/***ProducerTemplate配置*/@Bean(name="kafkaTemplate")publicKafkaTemplate<String,String>kafkaTemplate(){returnne......
  • kafka配置-yml篇
    spring:kafka:template:#当使用kafkaTemplate的sendDefault方法的时候,使用的是这里配置的topicdefaultTopic:topic-1#partition-num和replication-numKafkaProperties没有提供配置的地方bootstrap-servers:127.0.0.1:9092produ......