利用kafka自带的zookeeper搭建kafka集群
搭建kafka集群是需要zookeeper的,可是kafka自身就已经带了一个zookeeper,所以不需要额外搭建zookeeper的集群,只需要将kafka自带的zookeeper配置成一个集群就可以。
目录
1、kafka的下载和安装
2、配置zookeeper
3、配置kafka
4、启动zookeeper集群
5、启动kafka集群
1、kafka的下载和安装
本次安装采用的kafka版本是2.3.0。
首先,下载安装包。
从kafka的官方网站上找到相应的下载链接之后,用wget下载(wget https://archive.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz),之后得到一个安装包kafka_2.12-2.3.0.tgz。
其次,安装。
预计将kafka安装在/usr/local/目录下,所以将下载的安装包解压到/usr/local/目录下(tar -xzf kafka_2.12-2.3.0.tgz -C /usr/local),解压后得到一个名为kafka_2.12-2.3.0的文件夹,这就是kafka的安装目录了,为了进入方便给安装目录做一个软连接ln -s kafka_2.12-2.3.0 kafka,这样以后cd kafka的时候,实际上就是进入了kafka安装目录kafka_2.12-2.3.0中。
第三,安装集群规划。
预计安装一个3节点的集群,3个节点的ip如下:
172.17.2.136
172.17.2.138
172.17.2.139
2、配置zookeeper
kafka自带的zookeeper的配置文件是kafka安装目录下config/zookeeper.properties。
第一步,建数据目录和日志目录。
为了配置zookeeper,要提前为zookeeper建好数据目录和日志目录。
cd /usr/local/kafka
mkdir -p zookeeper/data zookeeper/log
第二步,在配置文件中写入配置项。
要配置哪些配置项是提前考虑好的,如下所示:
# the directory where the snapshot is stored.
dataDir=/usr/local/kafka/zookeeper/data
#修改为自定义的zookeeper日志目录
dataLogDir=/usr/local/kafka/zookeeper/log
# the port at which the clients will connect
clientPort=2181
#注释掉
#maxClientCnxns=0
#设置连接参数,添加如下配置
#为zk的基本时间单元,毫秒
tickTime=2000
#Leader-Follower初始通信时限 tickTime*10
initLimit=10
#Leader-Follower同步通信时限 tickTime*5
syncLimit=5
#设置broker Id的服务地址
server.0=172.17.2.136:2888:3888
server.1=172.17.2.138:2888:3888
server.2=172.17.2.139:2888:3888
第三步,建myid文件
在zookeeper的数据目录/usr/local/zookeeper/data下,建一个文本文件myid,内容为每个zookeeper节点的编号。因为是3个节点kafka集群,所以zookeeper集群也是3个节点,他们的编号分别是0、1、2.
经过以上3个步骤,一个节点的zookeeper就配置完成了。
3、配置kafka
kafka的配置文件是config/server.properties。
第一步,创建kafka数据日志目录。注意这里虽然叫日志目录,可不是真的kafka运行日志存放的目录,耳熟kafka中的消息数据存放的目录,因为kafka中消息是以write append log的形式存放的,所以这里的日志目录实际是数据目录,但是数据又的确是以日志的形式存放,所以就叫数据日志目录。
# 创建kafka的数据日志目录,命名为kfkwalog
mkdir -p /usr/local/kafka/kfkwalog
第二步,写入配置项。
kafka的配置项很多,需要更改的配置项如下:
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://172.17.2.136:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://172.17.2.136:9092
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/kafka/kfkwalog
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.17.2.136:2181,172.17.2.138:2181,172.17.2.139:2181
经过以上3步骤,136一个kafka节点配置好了,然后将kafka安装目录打包,之后scp到138、139两个节点,同时在138和139两个节点做对应的修改,主要包括:
(1)是创建zookeeper的数据目录zookeeper/data和日志目录zookeeper/log
(2)创建zookeeper数据目录下的myid文件,138下myid内容是1,138下myid的内容是2。这样以后,136的myid是0,138的myid是2,139的myid是2。
(3)创建kafka的数据日志目录kfkwalog
(4)修改kafka配置文件中的broker.id,136的broker.id是0,138的broker.id是1,139的broker.id是2
(5)修改kafka配置文件中的listeners和advertised.listeners,将其中的ip修改为对应的节点ip。
就完成了kafka集群和zookeeper集群的配置工作了。
4、启动zookeeper集群
三个节点依次启动,首先启动136上的zookeeper,然后启动138、139的。
首先,启动采用如下的命令启动zookeeper。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper/zookeeper.log 2>1 &
然后,查看zookeeper是否启动成功。
[root@kfkby ~]# jps -l
130374 sun.tools.jps.Jps
111887 kafka.Kafka
47439 org.apache.zookeeper.server.quorum.QuorumPeerMain #如果jps有这行输出,说明zookeeper已经正常启动起来了
5、启动kafka集群
启动kafka和查看kafka启动是否成功的命令如下:
nohup bin/kafka-server-start.sh config/server.properties > logs/kafka/kafka.log 2>1 &
[root@kfkby ~]# jps -l
130374 sun.tools.jps.Jps
111887 kafka.Kafka # 如果jps有这一行输出,说明kafka已经正常启动起来了
47439 org.apache.zookeeper.server.quorum.QuorumPeerMain
用下面的命令测试kafka是否正常启动了:
[root@THQ-DEV-5 bin]# ./kafka-topics.sh --bootstrap-server 192.168.0.10:9092 --list
EDGE_QY_VEH_INFO
V2x_V1_Screen_Config_Down
__consumer_offsets
edge-qyobu-up
test
v2x_v1_obu_cmd_down
v2x_v1_obu_original_up
[root@THQ-DEV-5 bin]#
四、测试Kafka集群
1、创建topic:test
/home/erp/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --replication-factor 1 --partitions 1 --topic test
2、列出已创建的topic列表
/home/erp/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
3、模拟客户端去发送消息
/home/erp/kafka/bin/kafka-console-producer.sh --broker-list 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092 --topic test
4、模拟客户端去接受消息
/home/erp/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test
标签:--,zookeeper,kafka,集群,172.17,自带,目录 From: https://www.cnblogs.com/iancloud/p/16810510.html