一、集群搭建(kafka使用自带的zookeeper) 前提:必须要有java环境 1、下载地址: http://kafka.apache.org/downloads
2、安装目录 /app/kafka tar -zxvf kafka_2.12-3.6.1.tgz 建立数据和日志存储目录 mkdir -p /app/kafka/data/zookeeper/ mkdir -p /app/kafka/data/kafka/ mkdir -p /app/kafka/log/zookeeper/ mkdir -p /app/kafka/log/kafka/ cd /app/kafka/kafka-3.6.1-src vim config/zookeeper.properties 配置如下:dataDir=/app/kafka/data/zookeeper
dataLogDir=/app/kafka/log/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=10
syncLimit=5
server.1=192.168.43.145:12888:13888
server.2=192.168.43.146:12888:13888
server.3=192.168.43.121:12888:13888
vim config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.43.145:9092 #另外两个节点修改成对应的ip
log.dirs=/app/kafka/log/kafka
zookeeper.connect=192.168.43.145:2181,192.168.43.146:2181,192.168.43.121:2181
所有节点启动服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon ../config/server.properties
查看是否有报错,如果有报错,把防火墙关闭后,再启动
systemctl stop firewalld
systemctl start firewalld
systemctl status firewalld
验证:
sh ./zookeeper-shell.sh 192.168.43.121:2181
ls /brokers/ids
看到
【1,2,3】
说明集群构建成功
二、数据迁移和同步(mirror-maker mm2)
vim ../config/connect-mirror-maker.properties
clusters = A, B #集群名称,代号
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 #源集群
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 #目标集群
# enable and configure individual replication flows
A->B.enabled = true #同步方向 A集群->B集群
# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = .* #同步主题过滤
#B->A.enabled = true #建议注释掉 双向实时同步可能会导致cup,内存爆掉
#B->A.topics = .* #建议注释掉 双向实时同步可能会导致cup,内存爆掉
#同步协议,不加默认同步的主题会带上 集群前缀例如 A.topic,B.topic
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy
# Setting replication factor of newly created remote topics
#两个集群保持一致
replication.factor=1
启动命令:
./connect-mirror-maker.sh -daemon ../config/connect-mirror-maker.propertie
验证:
查看主题列表:
./kafka-topics.sh --bootstrap-server 192.168.43.145:9092 --list
创建主题test
./kafka-topics.sh --bootstrap-server 192.168.43.145:9092 --create --topic test
#最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.43.146:9092 --topic test --time -1
#最早位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.43.146:9092 --topic test --time -2
标签:--,app,zookeeper,192.168,kafka,集群,9092,双活
From: https://www.cnblogs.com/yebuzhiqiu/p/17887184.html