kafka的安装以及使用
kafka依赖jdk
和zookeeper
,jdk安装这里就不叙述了,从zookeeper开始介绍。
一、zookeeper安装及使用
1.单节点安装
1.1安装
tar -zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /soft/
ln -sv /soft/apache-zookeeper-3.8.0-bin /soft/zookeeper
cat >>/etc/profile.d/kafka.sh <<'EOF'
#!/bin/bash
export ZK_HOME=/soft/zookeeper
export PATH=$PATH:$ZK_HOME/bin
EOF
source /etc/profile.d/kafka.sh
1.2 修改配置文件
主要就修改一下数据存放路径dataDir
。
cp /soft/apache-zookeeper-3.8.0-bin/conf/{zoo_sample.cfg,zoo.cfg}
mkdir -p /soft/apache-zookeeper-3.8.0-bin/data
vim /soft/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
...
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/soft/apache-zookeeper-3.8.0-bin/data
clientPort=2181
...
1.3 启动zookeeper
#启动
zkServer.sh start
#查看状态
zkServer.sh status
#关闭
zkServer.sh stop
2.zookeeper的znode的基本操作
2.1、查看:
查看“/”路径下的所有znode列表:ls /
查看“/kafka-3_2_0/cluster/id”znode的值:get /kafka-3_2_0/cluster/id
2.2、创建:
创建一个“/guojie-linux01”znode:create /guojie-linux01
多级zonde创建时,其⽗znode必须存在:create /guojie-linux01/语言/python #错误示范
创建znode时可以指定数据:create /guojie-linux01/语言/python 好难
2.3、修改:
修改⼀个znode的值:set /guojie-linux01/语言/python 好像没那么难
2.4、删除:
删除空的znode:delete /guojie-linux01/语言/python
删除非空的znode,即递归删除:deleteall /guojie-linux01
二.kafka部署
2.1 单节点部署
tar -zxvf kafka_2.12-3.2.0.tgz -C /soft/
ln -sv /soft/kafka_2.12-3.2.0 /soft/kafka
cat >>/etc/profile.d/kafka.sh <<'EOF'
export KAFKA_HOME=/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin
EOF
source /etc/profile.d/kafka.sh
2.2修改配置文件
vim /soft/kafka/config/server.properties
...
#broker.id集群里要唯一
broker.id=13
#zookeeper连接地址,在后面加版本号或者其它用于区分
zookeeper.connect=elk3:2181/kafka-3_2_0
...
2.3kafka启动
#启动
kafka-server-start.sh -daemon /soft/kafka/config/server.properties
#连接zookeeper查看
zkCli.sh
ls /
#停止
kafka-server-stop.sh
三.kafka集群部署
3.1 参考单节点部署,做以下调整
install -d /soft/kafka_2.12-3.2.0/data
3.2 所有节点配置
vim /soft/kafka/config/server.properties
...
#broker.id集群里要唯一,每个节点都要不一样
broker.id=13
#修改数据存放路径
log.dirs=/soft/kafka_2.12-3.2.0/data
#zookeeper连接地址,在后面加版本号或者其它用于区分
zookeeper.connect=elk3:2181/kafka-3_2_0
...
3.3 所有节点启动kafka
#zookeeper节点要启动
zkServer.sh start
#所有节点启动kafka
kafka-server-start.sh -daemon /soft/kafka/config/server.properties
3.4zookeeper节点验证
zkCli.sh ls /kafka-3_2_0/brokers/ids | grep "^\["
四.kafka的基本应用
4.1 kafka的topic管理
4.1.1 创建topic
(1)创建一个名为“my-topic1”的topic,分区数量为3,副本数为3
kafka-topics.sh --bootstrap-server elk1:9092 --create --topic my-topic1 --partitions 3 --replication-factor 3
(2)创建一个名为“my-topic2”的topic,分区数量为20,副本数量为3
kafka-topics.sh --bootstrap-server elk1:9092 --create --topic my-topic2 --partitions 20 --replication-factor 3
温馨提示:
1)topic已存在则无法创建;
2)副本的数量是1~32767
3)副本数量 >= broker数量
4.1.2 列出kafka集群可用的topic
kafka-topics.sh --bootstrap-server elk3:9092 --list
4.1.3 查看所有topic的详细信息
#查看所有topic的详细信息
kafka-topics.sh --bootstrap-server elk1:9092 --describe
#查看单个topic的详细信息
kafka-topics.sh --bootstrap-server elk1:9092 --describe --topic my-topic1
4.1.4 修改分区数量
#修改主分区数量
kafka-topics.sh --bootstrap-server elk1:9092 --topic my-topic1 --alter --partitions 5
温馨提示:
1)分区数量只能调大,无法调小;
2)"--alter"和"--replication-factor"无法同时使用,简而言之,就是不能直接修改副本;
3)分区数量重平衡及修改副本数量,比较复杂,不在记录,有兴趣查阅资料。
4.1.5 删除topic
#删除单个topic
kafka-topics.sh --bootstrap-server elk1:9092 --delete --topic my-topic1
#删除多个topic,使用逗号隔开即可
kafka-topics.sh --bootstrap-server elk1:9092 --delete --topic my-topic1,my-topic2
4.2 消费者组
4.2.1 启动一个消费者和生产者
#启动生产者
kafka-console-producer.sh --bootstrap-server elk1:9092 --topic my-topic1
#启动消费者1
kafka-console-consumer.sh --bootstrap-server elk1:9092 --topic my-topic1
#启动消费者2,加--from-beginning选项表示从头开始消费
kafka-console-consumer.sh --bootstrap-server elk1:9092 --topic my-topic1 --from-beginning
4.2.2 查看消费者组信息
#查看所有消费者组列表
kafka-consumer-groups.sh --bootstrap-server elk1:9092 --list
#查看所有消费者组的详细信息
kafka-consumer-groups.sh --bootstrap-server elk1:9092 --describe --all-groups
#查看某个消费者组的详细信息
kafka-consumer-groups.sh --bootstrap-server elk1:9092 --describe --group console-consumer-60340
4.2.3 消费者组案例
#启动一个生产者
kafka-console-producer.sh --bootstrap-server elk1:9092 --topic my-topic1
#启动消费者1并指定消费者组
kafka-console-consumer.sh --bootstrap-server elk1:9092 --topic my-topic1 --from-beginning --consumer-property group.id="group1"
#启动消费者2并指定消费者组
kafka-console-consumer.sh --bootstrap-server elk1:9092 --topic my-topic1 --from-beginning --consumer-property group.id="group1"
温馨提示:
1)同一个消费者组(consumer group)的消费者(consumer)不能同时去同一个分区(parition)读取数据,避免数据重复消费;
2)当一个topic的分区数量增大时,消费者组的各个消费者将重新分配(rebalance),即重新分配待消费分区的所属权;
3)当同一个消费者组的消费者数量发生变化时,也会触发rebalance,即重新分配待消费者的所属权;
五、kafka性能测试(拓展)
5.1 kafka开源监控组件-kafka-eagle
5.1.1 安装mariadb
#使用yum安装
yum -y install mariadb-server
#启动并设置开机自启动
systemctl enable mariadb --now
#创建数据库
create database if not exists `kafka` default character set utf8mb4 collate
utf8mb4_unicode_ci;
#创建用户并授权
create user 'admin'@'localhost' identified by '123456';
grant all privileges on kafka.* to 'admin'@'localhost';
flush privileges;
#测试
mysql -uadmin -p123456
5.1.2 安装kafka-eagle
tar -zxvf kafka-eagle-bin-2.0.8.tar.gz -C /soft/
tar -zxvf /soft/kafka-eagle-bin-2.0.8/efak-web-2.0.8-bin.tar.gz
cat >>/etc/profile.d/kafka.sh<<'EOF'
export EFAK_HOME=/soft/kafka-eagle-bin-2.0.8/efak-web-2.0.8
export PATH=$PATH:$EFAK_HOME/bin
EOF
source /etc/profile.d/kafka.sh
修改配置文件
vim /soft/kafka-eagle-bin-2.0.8/efak-web-2.0.8/conf/system-config.properties
5.1.3 修改kafka脚本开启JMX
关闭kafka之后修改
vim /soft/kafka/bin/kafka-run-class.sh
找到# JMX port to use将其注释并添加如下内容
# JMX port to use
#if [ $JMX_PORT ]; then
# KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT "
#fi
JMX_PORT=9999
JMX_RMI_PORT=9998
ISKAFKASERVER="false"
if [[ "$*" =~ "kafka.Kafka" ]]; then
ISKAFKASERVER="true"
fi
if [ $JMX_PORT ] && [ "true" == "$ISKAFKASERVER" ]; then
KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.rmi.port=$JMX_RMI_PORT "
echo set KAFKA_JMX_PORT:$KAFKA_JMX_OPTS
fi
5.1.4 启动并访问
ke.sh start
标签:教程,--,server,topic,sh,kafka,soft
From: https://www.cnblogs.com/luguojie/p/18627203