kafka局部调优
kafka常用命令
cd /opt/kafka/kafka/bin/
##启动ZK
./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties
##启动kafka
./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties
##删除topics
./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic events
##创建topics 一个分区 单个副本
./kafka-topics.sh --create --topic events --bootstrap-server localhost:9092 --partitions 1 --replication-fator 3
##desc topics
./kafka-topics.sh --list --bootstrap-server localhost:9092
./kafka-topics.sh --describe --topic events --bootstrap-server localhost:9092
##分区数只能增加,不能减少
./kafka-topics.sh --alter --topic events --bootstrap-server localhost:9092 --partitions 1
##创建命令行生产者
./kafka-console-producer.sh --topic events --bootstrap-server localhost:9092
##打印topic 增量消费
./kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server localhost:9092
硬件
服务器台数=2*(生产者峰值(20M/s)*副本数/100)+1
磁盘:kafka顺序读写(固态和机械差不多) 100g*2个副本*3天/0.7 =1T
内存:kafka内存=堆内存(10-15g)+页缓存(segment=1g 默认)
(分区数*1g*25%)/服务器台数
jmap -heap 2321 查看内存堆栈信息
CPU:num.io.threads=8(总cpu 50% 写磁盘的线程数)
num.replica.fetchers=1(1/3*总cpu50% 副本拉取线程数 )
num.network.threads=3(2/3*总cpu50% 数据传输线程数 )
总CPU32建议kafka设置24个给系统预留8个
生产者参数
read-only(必须重启) per-broker(动态针对单个broker节点)
cluster-wide(动态针对集群的节点)
val ioproperties = new Properties()
//批次大小
ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms
//压缩 gzip snappy lz4 zstd
ioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
//缓冲区大小
ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")
//数据可靠性分区副本>=2
ioproperties.put(ProducerConfig.ACKS_CONFIG,"-1")
//重试次数为Int最大值 设置小一点
ioproperties.put(ProducerConfig.RETRIES_CONFIG,"5")
//幂等性(去除数据重复)
ioproperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
boker
replica.lag.time.max.ms=30s Leader未收到Follower的消息时间
auto.leader.rebalance.enable=false 自动平衡,建议关闭
leader.imbalance.per.broker.percentage=10% //不平衡的比率 一般不开启
leader.imbalance.check.interval.seconds=300s 负载均衡检查时间
log.segment.bytes=1g log日志大小
log.index.interval.bytes=4K 每4k大小数据创建一个索引
log.retention.hours 默认7天 log.retention.check.interval.ms 检查周期5分钟
//delete策略和compact策略
log.cleanup.policy=delete 最大时间戳作为该文件的最大时间戳
log.cleanup.policy=compact
//对于相同key的不同value,只保留最后一个版本。(适用于用户信息等)
num.io.threads=8 //写磁盘的线程数
num.replica.fetchers=1 //副本拉取线程数
num.network.threads //数据传输线程数
log.flush.interval.message //强制页缓存刷写到磁盘的条数
log.flush.interval.ms //每隔多久刷写一次
消费者
enable.auto.commit=true //自动提交偏移量
auto.commit.interval.ms=5s //提交偏移量的频率
auto.offset.reset=5s earliest:自动将偏移量重置为最早的偏移量 latest:最新的偏移量
none:如果未找到先前偏移量,则消费者抛出异常
offsets.topic.num.partitions=50 //默认50个分区
heartbeat.interval.ms=3s //默认心跳时间
session.timeout.ms=45s //消费者和coordinator 之间的超时时间
max.poll.interval.ms=5m //消费者处理消息的最大时长
fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
max.poll.records=500 //一次消费拉取最大条数
kafka总体调优
吞吐量
生产者吞吐:
ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")//批次大小
ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms
//压缩 gzip snappy lz4
zstdioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")//缓冲区大小
增加分区
消费者吞吐:
fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
max.poll.records=500 //一次消费拉取最大条数
精确一次
生产者:ack=-1、幂等性+事务
broker:分区副本数>=2 ISR里应答>=2
消费者:事务+手动offset 消费者输出必须支持事务
设置分区
(1)创建一个分区压测。 目标吞吐量/min(生产者,消费者)(一般3-10个)
单条日志过大(1M以上)
message.max.bytes=1m //单条日志最大值
max.request.size=1m //生产者往broker请求的最大值。针对topic级别设置的消息体大小
replica.fetch.max.bytes=1m //副本同步数据,每批次最大值
fetch.max.bytes=50m //消费者获取服务器端一批数据最大值
压测
生产者压测脚本;kafka-producer-perf-test.sh
./kafka-producer-perf-test.sh --topic events --record-size 1024
--num-records 1000000 --throughput 10000
--producer-props bootstrap-server=localhost:9092
batch.size=16384 linger.ms=0 compression.type=snappy
buffer.memory=67108864
##throughput:每秒多少条,-1不限流
消费者压测脚本;kafka-consumer-perf-test.sh
./kafka-consumer-perf-test.sh --topic events
bootstrap-server=localhost:9092
--consumer-props max.poll.records=500
fetch.max.bytes=50m --messages 100000
##messages:需要消费的条数
标签:ProducerConfig,--,max,sh,server,调优,kafka
From: https://www.cnblogs.com/wuxiaolong4/p/16800575.html