消息队列kafka
应用场景
削峰填谷 异步解耦 顺序收发 分布式事务一致性 大数据分析 分布式缓存同步 蓄流压测
kafka优点
高吞吐量
可扩展
永久存储
高可用性
副本和分区
replication:副本
partition:分区,分区不要超过节点数
分区的优势:
1、实现存储空间的横向扩容,将多个kafka服务器的空间结合利用
2、提升性能,多服务器读写
3、实现高可用
kafka分区、副本有什么作用
分区:负载均衡,个数建议和节点数量相同
副本:高可用 至少两个副本
kafka写入消息的流程
1、生产者先从集群获取分区的leader
2、生产者将消息发送给leader
3、leader将消息写入本地文件
4、follower从leader pull消息
5、follower将消息写入本地后向leader反馈写入成功
6、leader只要接收到一个follower的反馈之后就向生产者反馈写入成功
kafka集群部署
环境准备zookeeper
#在三个Ubuntu20.04节点提前部署zookeeper和kafka三个节点复用
10.0.0.100
10.0.0.101
10.0.0.102
#注意:生产中zookeeper和kafka一般是分开独立部署的,kafka安装前需要安装java环境
确保三个节点的zookeeper启动
[root@node0 ~]#zkServer.sh status
各节点部署 Kafka
kafka下载链接:
http://kafka.apache.org/downloads
Kafka 节点配置
配置文件说明
配置文件
vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.100:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=2 #指定默认的副本数为3,可改为2,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
#指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
部署范例:
第一步:所有节点
#在所有节点上执行安装java(如果和zookeeper复用,则不用再下载)
[root@node0 ~]#apt install openjdk-8-jdk -y
#国内镜像下载
[root@node0 ~]#wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.3.1/kafka_2.13-3.3.1.tgz
#解压缩设值软链接
[root@node0 ~]#tar xf kafka_2.13-2.7.0.tgz -C /usr/local/
[root@node0 ~]#ln -s /usr/local/kafka_2.13-2.7.0/ /usr/local/kafka
#配置PATH变量
[root@node0 ~]#echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node0 ~]#. /etc/profile.d/kafka.sh
#修改配置文件
[root@node0 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.100:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=2 #指定默认的副本数为3,可改为2,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=10.0.0.100:2181,10.0.0.101:2181,10.0.0.102:2181
#指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
#准备数据目录
[root@node0 ~]#mkdir /usr/local/kafka/data
第二步:修改第二个节点配置
[root@node1 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.101:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
第三步:修改第三个节点配置
[root@node2 ~]#vim /usr/local/kafka/config/server.properties
broker.id=3 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的 meta.properties文件
listeners=PLAINTEXT://10.0.0.103:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
第四步:启动服务
在所有kafka节点执行下面操作
vim /usr/local/kafka/bin/kafka-server-start.sh
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
export KAFKA_HEAP_OPTS=" -Xmx1G-Xms1G" #可以调整内存
fi
调用脚本启动
kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
第五步:准备Kafka的service文件
cat /lib/systemd/system/kafka.service
[unit]
Description=Apache kafka
After=network.target
[service] Type=simple
#Environment=JAVA_HOME=/data/server/java
PIDFile=/usr/local/kafka/kafka.pid
Execstart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server. properties
Execstop=/bin/kill -TERM ${MAINPID}
Restart=always
RestartSec=20
[Install]
wantedBy=multi-user.target
systemctl daemon-load
systemctl restart kafka.service
脚本一键部署kafka三台机器(包括zookeeper、java、kafak)
[root@ubuntu2004 ~]#cat install_kafka.sh
#!/bin/bash
#
KAFKA_VERSION=3.3.1
#KAFKA_VERSION=3.2.0
SCALA_VERSION=2.13
#KAFKA_VERSION=-3.0.0
KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.1/kafka_2.13-2.8.1.tgz"
#KAFKA_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz"
ZK_VERSOIN=3.6.3
ZK_URL="https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper-${ZK_VERSOIN}-bin.tar.gz"
ZK_INSTALL_DIR=/usr/local/zookeeper
KAFKA_INSTALL_DIR=/usr/local/kafka
NODE1=10.0.0.100
NODE2=10.0.0.101
NODE3=10.0.0.102
HOST=`hostname -I|awk '{print $1}'`
. /etc/os-release
color () {
RES_COL=60
MOVE_TO_COL="echo -en \\033[${RES_COL}G"
SETCOLOR_SUCCESS="echo -en \\033[1;32m"
SETCOLOR_FAILURE="echo -en \\033[1;31m"
SETCOLOR_WARNING="echo -en \\033[1;33m"
SETCOLOR_NORMAL="echo -en \E[0m"
echo -n "$1" && $MOVE_TO_COL
echo -n "["
if [ $2 = "success" -o $2 = "0" ] ;then
${SETCOLOR_SUCCESS}
echo -n $" OK "
elif [ $2 = "failure" -o $2 = "1" ] ;then
${SETCOLOR_FAILURE}
echo -n $"FAILED"
else
${SETCOLOR_WARNING}
echo -n $"WARNING"
fi
${SETCOLOR_NORMAL}
echo -n "]"
echo
}
install_jdk() {
if [ $ID = 'centos' -o $ID = 'rocky' ];then
yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
else
apt update
apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; }
fi
java -version
}
zk_myid () {
read -p "请输入node编号(默认为 1): " MYID
if [ -z "$MYID" ] ;then
MYID=1
elif [[ ! "$MYID" =~ ^[0-9]+$ ]];then
color "请输入正确的node编号!" 1
exit
else
true
fi
}
install_zookeeper() {
wget -P /usr/local/src/ $ZK_URL || { color "下载失败!" 1 ;exit ; }
tar xf /usr/local/src/${ZK_URL##*/} -C `dirname ${ZK_INSTALL_DIR}`
ln -s /usr/local/apache-zookeeper-*-bin/ ${ZK_INSTALL_DIR}
echo 'PATH=${ZK_INSTALL_DIR}/bin:$PATH' > /etc/profile.d/zookeeper.sh
. /etc/profile.d/zookeeper.sh
mkdir -p ${ZK_INSTALL_DIR}/data
echo $MYID > ${ZK_INSTALL_DIR}/data/myid
cat > ${ZK_INSTALL_DIR}/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_INSTALL_DIR}/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=${NODE1}:2888:3888
server.2=${NODE2}:2888:3888
server.3=${NODE3}:2888:3888
EOF
cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target
[Service]
Type=forking
#Environment=${ZK_INSTALL_DIR}
ExecStart=${ZK_INSTALL_DIR}/bin/zkServer.sh start
ExecStop=${ZK_INSTALL_DIR}/bin/zkServer.sh stop
ExecReload=${ZK_INSTALL_DIR}/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable --now zookeeper.service
systemctl is-active zookeeper.service
if [ $? -eq 0 ] ;then
color "zookeeper 安装成功!" 0
else
color "zookeeper 安装失败!" 1
exit 1
fi
}
install_kafka(){
wget -P /usr/local/src $KAFKA_URL || { color "下载失败!" 1 ;exit ; }
tar xf /usr/local/src/${KAFKA_URL##*/} -C /usr/local/
ln -s ${KAFKA_INSTALL_DIR}_*/ ${KAFKA_INSTALL_DIR}
echo PATH=${KAFKA_INSTALL_DIR}/bin:'$PATH' > /etc/profile.d/kafka.sh
. /etc/profile.d/kafka.sh
cat > ${KAFKA_INSTALL_DIR}/config/server.properties <<EOF
broker.id=$MYID
listeners=PLAINTEXT://${HOST}:9092
log.dirs=${KAFKA_INSTALL_DIR}/data
num.partitions=1
log.retention.hours=168
zookeeper.connect=${NODE1}:2181,${NODE2}:2181,${NODE3}:2181
zookeeper.connection.timeout.ms=6000
EOF
mkdir ${KAFKA_INSTALL_DIR}/data
cat > /lib/systemd/system/kafka.service <<EOF
[Unit]
Description=Apache kafka
After=network.target
[Service]
Type=simple
#Environment=JAVA_HOME=/data/server/java
#PIDFile=${KAFKA_INSTALL_DIR}/kafka.pid
ExecStart=${KAFKA_INSTALL_DIR}/bin/kafka-server-start.sh ${KAFKA_INSTALL_DIR}/config/server.properties
ExecStop=/bin/kill -TERM \${MAINPID}
Restart=always
RestartSec=20
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable --now kafka.service
#kafka-server-start.sh -daemon ${KAFKA_INSTALL_DIR}/config/server.properties
systemctl is-active kafka.service
if [ $? -eq 0 ] ;then
color "kafka 安装成功!" 0
else
color "kafka 安装失败!" 1
exit 1
fi
}
zk_myid
install_jdk
install_zookeeper
install_kafka