1.kafka简介
kafka官网地址:https://kafka.apache.org/
kafka的本质 是一个数据存储平台,流平台 , 只是他在做消息发布,消息消费的时候我们可以把他当做消息中间件来用。
Kafka提供了一个Kafka Broker、一个Kafka Producer和一个Kafka Consumer。
以下介绍源自:文心一言。
Kafka Broker在Kafka中扮演着重要的角色,主要作用如下:
- 接收生产者的消息:Kafka Broker接收来自生产者的消息。
- 为消息设置偏移量:Kafka Broker为每条消息设置偏移量,用于标识每条消息的位置。
- 提交消息到磁盘保存:Kafka Broker将消息提交到磁盘上保存,保证消息不会因为意外情况丢失。
- 为消费者提供服务:Kafka Broker为消费者提供服务,对消费者的请求作出响应。
- 返回已经提交到磁盘上的消息:Kafka Broker返回已经提交到磁盘上的消息给消费者。
Kafka Producer是负责向Kafka服务端写入数据的程序,主要作用如下:
- 地位和作用:Producer是Kafka中产生消息的一端,可以将消息生产到多个Topic中,也可以多个Producer将消息生产到同一个Topic中(容易造成消息顺序的混乱),同时Producer还可以决定将此消息存储在哪个partition中(通过partitioner)。
- 产生消息:消息准备发送的过程中,Producer将会和选定Topic下所有的Partition leader保持socket连接;消息由Producer直接通过Socket发送到Broker中而不通过中介任何的路由层,而生产者会自动找到Partition对应的Leader,然后对leader执行消息追加的操作。
- 负载均衡:Producer可以平衡发送压力。
- 同步/异步发送:使用异步发送消息将会以少量的额外延迟换取大额的吞吐量。
Kafka Consumer是用于读取Kafka集群中某些Topic消息的应用程序,是Kafka的核心组件之一,主要作用如下:
- 消费消息:Consumer将Producer生产的消息进行消费处理,完成消费任务。
- 消费方式:Consumer采用主动拉取Broker数据的方式进行消费,即Pull模式。
- 消费者组:Consumer用一个消费者组名来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个Consumer实例上。
- 重平衡机制:如果Consumer实例增减或发生故障,Consumer组会重新分配订阅关系,保证每条消息至少被一个Consumer实例处理。
另外,kafka作为消息中间件时,与以往的消息中间件不同的是:以往的消息中间件消费完,这条数据就消失了。
而kafka是将消息存储到了本地磁盘当中,即使消费完也不会消失,而是通过偏移量offset用来标识消费进度(Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说Kafka broker上的数据超7天或者1G,将会被自动删除。)。
- Broker : 和AMQP里协议的概念一样, 就是消息中间件所在的服务器
- Topic(主题) : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition(分区) : Partition是物理上的概念,体现在磁盘上面,每个Topic包含一个或多个Partition.
- Producer : 负责发布消息到Kafka broker
- Consumer : 消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group(消费者群组) : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
- offset 偏移量: 是kafka用来确定消息是否被消费过的标识,在kafka内部体现就是一个递增的数字。
2.下载与安装
下载
下载页:https://kafka.apache.org/downloads
一般情况下,我们下载最新版就可以了。
点击上方提供的连接进行下载即可。
如:https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
安装
下载完成,使用解压软件进行解压即可使用。
zookeeper配置文件说明
为什么要讲zookeeper?因为kafka的运行依赖于zookeeper,详细原因下面会讲到。
找到KAFKA_HOME/config目录下的zookeeper.properties。
最重要的就是以下两个属性的配置。
上面是默认配置,一般情况下,我们无需进行修改。
dataDir属性:配置的是zookeeper运行数据的存储路径,/tmp/zookeeper代表的意思是:
linux:数据存储在:/tmp/zookeeper目录下,windows:数据存在KAFKA_HOME所在磁盘的/tmp/zookeeper目录下。
当目录不存在时,会自动被创建。
clientPort属性:配置的是zookeeper运行时所占用的端口号,默认值是:2181。
kafka配置文件说明
找到KAFKA_HOME/config目录下的server.properties。
一个kafka服务器就是一个broker。
broker.id属性:属性值具有唯一性,在集群部署kafka的时候,作为区分broker的标识,默认值为:0。
如果你不需要集群部署时(只用一个kafka),不用管这个参数。
但在继续部署时(同时运行多个kafka),我们需要它们的broker.id各不相同(唯一值)。
listeners属性:设置的是kafka服务器的访问端口号,默认值是9092。
如何更改它的端口号呢?
listeners=PLAINTEXT://:9082
在此配置文件当中增加如上代码, 我们就把broker的端口号改成9082啦。
log.dirs属性:该属性配置的是kafka运行日志存储路径,/tmp/kafka-logs表达的含义是:
linux:数据存储在:/tmp/kafka-logs目录下,windows:数据存在KAFKA_HOME所在磁盘的/tmp/kafka-logs目录下。
当目录不存在时,会自动被创建。
说明:想懒省事儿的话,上面的属性均采用默认值即可。
3.部署
说明:启动kafka要先启动zookeeper。
看了很多教程,都教你需要单独下载zookeeper,这纯属扯淡。
kafka默认自带了zookeeper,何必再画蛇添足呢。
Apache ZooKeeper是一个分布式的、开源的分布式应用程序协调服务,它主要用于分布式系统中的状态同步和管理。
在Kafka中,ZooKeeper的作用主要体现在以下几个方面:
- 元数据管理:Kafka将所有元数据,如主题、分区、消费者偏移量等,都存储在ZooKeeper中。这使得Kafka能够跟踪和管理其主题和消费者的状态。
- 集群管理:ZooKeeper用于协调和管理Kafka集群中的Broker。例如,ZooKeeper负责处理Broker的加入和离开,以及集群的领导者选举。
- 配置管理:Kafka的一些配置信息,如Broker的ID、端口号等,也存储在ZooKeeper中。
- 主题和分区的管理:ZooKeeper存储了主题和分区的元数据,包括主题的配置信息和分区映射。
- 消费者管理:ZooKeeper用于跟踪消费者的状态,包括消费者的组信息和消费者的偏移量。
- 故障转移:如果Kafka集群中的某些Broker出现故障,ZooKeeper可以帮助进行故障转移,确保集群的可用性。
总的来说,ZooKeeper为Kafka提供了可靠的状态管理和协调功能,使得Kafka能够作为一个高性能、可扩展、可靠的流处理平台运行。
启动zookeeper服务
windows操作步骤
zookeeper的启动命令是:zookeeper-server-start.bat
文件所在路径:KAFKA_HOME/bin/windows
和以往的启动步骤不同,你会发现:双击运行这个文件是无法运行成功的(直接闪退)。
为什么会闪退?
那是因为它的启动需要指定我们之前看的配置文件:zookeeper.properties。
具体启动指令如下:
切换到KAFKA_HOME/bin/windows目录下,输入:cmd,按Enter键,即可打开黑窗口。
输入指令:
zookeeper-server-start.bat ../../config/zookeeper.properties
按Enter键即可运行。
当出现如上字样时,zookeeper就启动成功啦。
访问路径:
http://localhost:2181/
如果运行报错:输入行太长,解决办法见文末推荐。
linux操作步骤
zookeeper的启动命令是:zookeeper-server-start.sh
文件所在路径:KAFKA_HOME/bin
打开操作窗口,切换到KAFKA_HOME/bin/windows目录下。
输入指令:
启动kafka服务(broker)
windows操作步骤
broker的启动命令是:kafka-server-start.bat
文件所在路径:KAFKA_HOME/bin/windows
同样的,kafka-server-start.bat的运行离不开配置文件:server.properties。
server.properties所在路径:KAFKA_HOME/config/server.properties。
再次打开命令窗口。
启动命令:
kafka-server-start.sh ../../config/server.properties
当出现如上字样时,broker就启动成功啦。
访问路径:
http://localhost:9092/
linux操作步骤
broker的启动命令是:kafka-server-start.sh
文件所在路径:KAFKA_HOME/bin
打开操作窗口,切换到KAFKA_HOME/bin目录下。
输入指令:
4.kafka connect
Kafka Connect是一个可扩展的、可插拔的数据传输框架,它可以连接Kafka和其它系统。
这是一个配置其它组件完成数据的发布和订阅的插件,换言之就是:当我们只有这种需求的时候,才需用到这个项服务。
kafka connect配置文件说明
找到KAFKA_HOME/config目录下的connect-standalone.properties和connect-distributed.properties文件。
connect-standalone.properties:当只有一个kafka服务器时,我们需要使用这个配置文件。
connect-distributed.properties:当存在多个kafka服务器时(集群部署),我们需要使用这个配置文件。
以connect-standalone.properties为例:
说明:如果是connect-distributed.properties,这里可以配置多个broker地址,中间使用逗号隔开。
bootstrap.servers属性:指定broker服务器的地址,默认值为:localhost:9092。
offset.storage.file.filename属性:设置偏移量文件connect.offsets的存放地址,默认路径为:/tmp。
linux:connect.offsets存储在:/tmp目录下,windows:connect.offsets存在KAFKA_HOME所在磁盘的/tmp目录下。
当目录不存在时,会自动被创建。
offset.flush.interval.ms属性:偏移量刷新时间间隔,单位:毫秒,默认值为:10000毫秒(10秒)。
plugin.path属性:设置connect插件的存放路径,多个路径之间使用逗号隔开。
在启动kafka connect时,会自动从上面配置的路径,读取connect插件。
一般情况下,我们不在这里配置,而是将插件直接放在:KAFKA_HOME/plugins目录下。
listeners属性:设置的是kafka connect服务器的访问端口号,默认值是8083。
多个地址之间使用逗号隔开。
如何更改它的端口号呢?
listeners=HTTP://:8093
在此配置文件当中增加如上代码, 我们就把kafka connect的端口号改成8093啦。
启动kafka connect
前提:已经启动好了kafka server。
以只运行一个broker进行举例说明
windows操作步骤
broker的启动命令是:connect-standalone.bat。
文件所在路径:KAFKA_HOME/bin/windows
所需配置文件:connect-standalone.properties。
文件所在路径:KAFKA_HOME/config
同样打开黑窗口,运行以下指令:
connect-standalone.bat ../../config/connect-standalone.properties
当出现如上字样时,就说明:kafka connect服务启动成功了。
访问地址:
http://localhost:8083/
3.6.1是kafka服务器的版本号。
linux操作步骤
broker的启动命令是:connect-standalone.sh。
文件所在路径:KAFKA_HOME/bin
所需配置文件:connect-standalone.properties。
文件所在路径:KAFKA_HOME/config
5.便捷启动
写在最后
哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!