最近项目需要用到kafa进行数据流处理,下面将安装部署kafka的方法简单介绍下。
1:配置java环境
修改/etc/bashrc文件,添加JAVA_HOME
cat /etc/bashrc
export JAVA_HOME=/root/jdk-11.0.16.1
export PATH=JAVA_HOME/bin:.
2:下载Kafka
https://dlcdn.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
3:解压缩
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1
4:启动zookeeper
Cd zookeeper的目录,执行如下命令
$ bin/zookeeper-server-start.sh config/zookeeper.properties
5:单机版kafka启动
bin/kafka-server-start.sh config/server.properties
成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可供使用。
6:配置kafka集群
cd config/
vi server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
############################# Socket Server Settings #############################
#The address the socket server listens on. If not configured, the host name will be equal to the value of
#java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#FORMAT:
#listeners = listener_name://host_name:port
#EXAMPLE:
#listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
#Listener name, hostname and port the broker will advertise to clients.
#If not set, it uses the value for “listeners”.
#advertised.listeners=PLAINTEXT://your.host.name:9092
#Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
#The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
#The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
#The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
#A comma separated list of directories under which to store log files
log.dirs=/home/kfk/kafka_2.13-3.3.1/logs
#The default number of log partitions per topic. More partitions allow greater
#parallelism for consumption, but this will also result in more files across
#the brokers.
num.partitions=1
#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
#This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
#The replication factor for the group metadata internal topics “__consumer_offsets” and “__transaction_state”
#For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
#Messages are immediately written to the filesystem but by default we only fsync() to sync
#the OS cache lazily. The following configurations control the flush of data to disk.
#There are a few important trade-offs here:
#1. Durability: Unflushed data may be lost if you are not using replication.
#2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
#The settings below allow one to configure the flush policy to flush data after a period of time or
############################# Log Retention Policy #############################
#The following configurations control the disposal of log segments. The policy can
#be set to delete segments after a period of time, or after a given size has accumulated.
#A segment will be deleted whenever either of these criteria are met. Deletion always happens
#from the end of the log.
#The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
#A size-based retention policy for logs. Segments are pruned from the log unless the remaining
#segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
#The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824
#The interval at which log segments are checked to see if they can be deleted according
#to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#Zookeeper connection string (see zookeeper docs for details).
#This is a comma separated host:port pairs, each corresponding to a zk
#server. e.g. “127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”.
#You can also append an optional chroot string to the urls to specify the
#root directory for all kafka znodes.
#配置连接Zookeeper集群地址
zookeeper.connect=bigdata-pro01:2181,bigdata-pro02:2181,bigdata-pro03:2181,bigdata-pro04:2181,bigdata-pro05:2181
#Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
#The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
#The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
#The default value for this is 3 seconds.
#We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
#However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
6.1 配置KAFKA_HOME变量
export KAFKA_HOME=/home/kfk/kafka_2.13-3.3.1
6.2 分发配置好的kafa包
tar cvf kafka_2.13-3.3.1.tar kafka_2.13-3.3.1
scp kafka_2.13-3.3.1.ta kfk@bigdata-pro02:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro02:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro03:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro04:/home/kfk/
scp kafka_2.13-3.3.1.tar kfk@bigdata-pro05:/home/kfk/
6.2 在其他节点修改broker.id
将broker.id 顺序递增
6.3 启动集群
在集群的服务器中进入到kafka的安装目录执行如下命令:
bin/kafka-server-start.sh -daemon config/server.properties
这样就完成了kafa的启动
6.4 关闭集群
在集群服务器中进入到kafka的安装目录执行如下命令:
bin/kafka-server-stop.sh stop
就可以关闭kafka服务。
7:kafa基本操作
1)查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper bigdata-pro01:2181 --list
2)创建topic
[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
3)删除topic
[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除。
4)发送消息
[kfk@bigdata-pro01 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
hello world
atguigu atguigu
5)消费消息
[kfk@bigdata-pro01 kafka]$ bin/kafka-console-consumer.sh
–zookeeper hadoop102:2181 --topic first
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --topic first
[kfk@bigdata-pro01 kafka]$ bin/kafka-console-consumer.sh
–bootstrap-server hadoop102:9092 --from-beginning --topic first
–from-beginning:会把主题中以往所有的数据都读取出来。
6)查看某个Topic的详情
[akfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
7)修改分区数
[kfk@bigdata-pro01 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6