1、拉取镜像
docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest
2、下载docker-compose
curl -L https://get.daocloud.io/docker/compose/releases/download/v2.14.2/docker-compose-`uname -s`-`uname -m` > /usr/local/bin/docker-compose
chmod +x /usr/local/bin/docker-compose
3、编写docker-compose文件
version: "3"
services:
zookeeper: #zk服务名称
image: 'bitnami/zookeeper:latest' #zk镜像名称
restart: 'always'
ports:
- '2181:2181' #映射端口
volumes:
- /mnt/ms/zookeeper/data:/bitnami/zookeeper/data #挂载数据到宿主机
environment:
- ALLOW_ANONYMOUS_LOGIN=yes #启用无密码访问功能
kafka: #服务名称
image: 'bitnami/kafka:latest' #kafka镜像
restart: 'always' #容器异常停止,自动重启
ports:
- '9092:9092' #映射端口
volumes:
- /mnt/ms/kafka/data:/bitnami/kafka #映射kafka消费数据和配置文件
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 #kafka链接的zookeeper地址
- ALLOW_PLAINTEXT_LISTENER=yes #允许使用PLAINTEXT侦听器
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT #监听器名称和安全协议的映射配置。每个监听器的名称只能在map中出现一次。
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 #kafka监听地址
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://192.168.1.1(kafka_ip):9092,EXTERNAL://localhost:9093 #提供外部调用
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT #用于配置broker之间通信使用的监听器名称
depends_on: #kafka运行依赖zookeeper,所以先启动zookeeper
- zookeeper
4、启动服务
启动:
docker-compose up -d
停止:
docker-compose down
检查状态:
docker-compose ps
5、备份zookeeper数据
date=`date +%Y%m%d`
cp -r /mnt/ms/zookeeper/data /mnt/ms/backup/zookeeper_${date}
cd /mnt/ms/zookeeper/data
tar -zcvf zookeeper_${date}.tar.gz zookeeper_${date} && rm -rf zookeeper_${date}
find /mnt/ms/backup/ -name "*.gz" -mtime +7 |xargs rm -rf
6、常用命令
进入kafka容器标签:compose,--,zookeeper,kafka,topic,mytest,docker From: https://blog.51cto.com/u_14458428/5965943
docker exec -it kafka-kafka-1 bash
找到kafka提供的脚本
cd /opt/bitnami/kafka/bin
创建tioic
#解释:创建1个副本3个分区的topic,命名为mytest
./kafka-topics.sh --zookeeper master:2181 --create --topic mytest --replication-factor 1 --partitions 3
查看topic分区
./kafka-topics.sh --zookeeper master:2181 --describe --topic mytest
Topic:mytest PartitionCount:3 ReplicationFactor:1 Configs:
Topic: mytest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mytest Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: mytest Partition: 2 Leader: 0 Replicas: 0 Isr: 0
实验需要打开两个窗口,一个启动生产者一个启动消费者,方便观察
模拟创建一个生产者(producer)来生成消息,创建一个消费者(consumer)去消费消息
创建生产者
./kafka-console-producer.sh --broker-list kafka:9092 --topic mytest
#创建消费者
#解释:生产者会把消息丢进topic,我们在topic:mytest中创建一个命名为group_mytes的消费组来消费数据
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic mytest --consumer-property group.id=group_mytes
#此时在生产者端输入消息,会在消费者端查看到对应消费内容。
#查看消息积压
./kafka-consumer-groups.sh --describe --bootstrap-server kafka:9092 --group group_mytes
Group Topic Pid Offset logSize Lag Owner
group_mytest mytest 0 3 3 0 none
group_mytest mytest 1 2 2 0 none
group_mytest mytest 2 3 3 0 none
#解释:logSize表示消息数量,Offset表示已经消费的消息,Lag代表积压的消息,消息出现积压可以同时调整分区数量和消费者数量来增加消费能力,其中分区数和消费者数量保持一至。