首页 > 其他分享 >kafka(三)调优

kafka(三)调优

时间:2022-10-17 20:46:49浏览次数:55  
标签:ProducerConfig -- max sh server 调优 kafka

kafka局部调优

kafka常用命令
        cd /opt/kafka/kafka/bin/
        ##启动ZK
        ./zookeeper-server-start.sh -daemon /opt/kafka/kafka/config/zookeeper.properties  
        ##启动kafka
        ./kafka-server-start.sh -daemon /opt/kafka/kafka/config/server.properties

        ##删除topics
        ./kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic events
        ##创建topics  一个分区  单个副本
        ./kafka-topics.sh --create --topic events --bootstrap-server localhost:9092  --partitions 1 --replication-fator 3
        ##desc topics
        ./kafka-topics.sh --list --bootstrap-server localhost:9092
        ./kafka-topics.sh --describe --topic events --bootstrap-server localhost:9092
        ##分区数只能增加,不能减少
        ./kafka-topics.sh --alter --topic events --bootstrap-server localhost:9092 --partitions 1 

        ##创建命令行生产者
        ./kafka-console-producer.sh --topic events --bootstrap-server localhost:9092
        ##打印topic 增量消费
        ./kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server localhost:9092
硬件
        服务器台数=2*(生产者峰值(20M/s)*副本数/100)+1
        磁盘:kafka顺序读写(固态和机械差不多) 100g*2个副本*3天/0.7 =1T
        内存:kafka内存=堆内存(10-15g)+页缓存(segment=1g 默认) 
            (分区数*1g*25%)/服务器台数
        jmap -heap 2321 查看内存堆栈信息
        CPU:num.io.threads=8(总cpu 50% 写磁盘的线程数)  
            num.replica.fetchers=1(1/3*总cpu50% 副本拉取线程数 )
            num.network.threads=3(2/3*总cpu50% 数据传输线程数 )  
            总CPU32建议kafka设置24个给系统预留8个
生产者参数
    read-only(必须重启) per-broker(动态针对单个broker节点) 
    cluster-wide(动态针对集群的节点)

        val ioproperties = new Properties()
        //批次大小
        ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")
        ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms
        //压缩 gzip snappy lz4 zstd
        ioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
        //缓冲区大小
        ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")
        //数据可靠性分区副本>=2
        ioproperties.put(ProducerConfig.ACKS_CONFIG,"-1")
        //重试次数为Int最大值  设置小一点
        ioproperties.put(ProducerConfig.RETRIES_CONFIG,"5")
        //幂等性(去除数据重复)
        ioproperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,"true")
boker
        replica.lag.time.max.ms=30s Leader未收到Follower的消息时间
        auto.leader.rebalance.enable=false 自动平衡,建议关闭
        leader.imbalance.per.broker.percentage=10% //不平衡的比率   一般不开启
        leader.imbalance.check.interval.seconds=300s  负载均衡检查时间
        log.segment.bytes=1g log日志大小
        log.index.interval.bytes=4K 每4k大小数据创建一个索引
        log.retention.hours 默认7天    log.retention.check.interval.ms 检查周期5分钟
        //delete策略和compact策略
        log.cleanup.policy=delete  最大时间戳作为该文件的最大时间戳
        log.cleanup.policy=compact 
            //对于相同key的不同value,只保留最后一个版本。(适用于用户信息等)
        num.io.threads=8 //写磁盘的线程数
        num.replica.fetchers=1 //副本拉取线程数
        num.network.threads //数据传输线程数
        log.flush.interval.message //强制页缓存刷写到磁盘的条数
        log.flush.interval.ms      //每隔多久刷写一次
消费者
        enable.auto.commit=true //自动提交偏移量
        auto.commit.interval.ms=5s  //提交偏移量的频率
        auto.offset.reset=5s  earliest:自动将偏移量重置为最早的偏移量 latest:最新的偏移量
            none:如果未找到先前偏移量,则消费者抛出异常
        offsets.topic.num.partitions=50 //默认50个分区
        heartbeat.interval.ms=3s //默认心跳时间
        session.timeout.ms=45s //消费者和coordinator 之间的超时时间
        max.poll.interval.ms=5m //消费者处理消息的最大时长
        fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
        max.poll.records=500 //一次消费拉取最大条数

kafka总体调优

吞吐量
        生产者吞吐:
            ioproperties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384")//批次大小  
            ioproperties.put(ProducerConfig.LINGER_MS_CONFIG,"70")//linger.ms

            //压缩 gzip snappy lz4 
            zstdioproperties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy")
            ioproperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432")//缓冲区大小
        增加分区
        消费者吞吐:
            fetch.max.bytes=52428800(50m)//消费者获取服务器段数据最大字节数
            max.poll.records=500 //一次消费拉取最大条数
精确一次
        生产者:ack=-1、幂等性+事务
        broker:分区副本数>=2  ISR里应答>=2
        消费者:事务+手动offset  消费者输出必须支持事务
设置分区
        (1)创建一个分区压测。  目标吞吐量/min(生产者,消费者)(一般3-10个)
单条日志过大(1M以上)
        message.max.bytes=1m //单条日志最大值
        max.request.size=1m //生产者往broker请求的最大值。针对topic级别设置的消息体大小
        replica.fetch.max.bytes=1m //副本同步数据,每批次最大值
        fetch.max.bytes=50m //消费者获取服务器端一批数据最大值
压测
        生产者压测脚本;kafka-producer-perf-test.sh 
        ./kafka-producer-perf-test.sh --topic events  --record-size 1024 
            --num-records 1000000 --throughput 10000 
            --producer-props bootstrap-server=localhost:9092 
               batch.size=16384 linger.ms=0 compression.type=snappy 
               buffer.memory=67108864
        ##throughput:每秒多少条,-1不限流    

        消费者压测脚本;kafka-consumer-perf-test.sh 
        ./kafka-consumer-perf-test.sh --topic events 
            bootstrap-server=localhost:9092   
            --consumer-props max.poll.records=500 
                fetch.max.bytes=50m --messages 100000
        ##messages:需要消费的条数

标签:ProducerConfig,--,max,sh,server,调优,kafka
From: https://www.cnblogs.com/wuxiaolong4/p/16800575.html

相关文章

  • kafka理论
    kafka理论消息队列作用、模式    作用:1.消峰  2.解耦  3.异步通信      模式:1.点对点模式(删除对应的消息,只有一个消费者)      2.发布订阅模式(不......
  • Spark常规性能调优(一)
    1、常规性能调优一:最优资源配置Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的 ,实现了最优的资源配置后,在此基础上再考......
  • 数仓性能调优:如何进行函数下推
    摘要:本文主要描述下函数在满足特征的前提下可以把函数属性定义为下推属性。本文分享自华为云社区《GaussDB(DWS)性能调优:函数下推》,作者:譡里个檔。DWS作为MPP架构的数仓......
  • KafkaConsumer实现多线程消费的一种实现思路——不考虑消息的顺序性问题基础
    背景介绍一种Kafka多线程消费的实现思路以及方案,此方案不考虑消息消费的顺序性问题,假定消息之间没有依赖关系。这个项目是公司里面开发有个SDK的所谓”飞行窗口“特性产......
  • Linux 下搭建 Kafka 环境
    Linux下搭建Kafka环境作者:Grey原文地址:博客园:Linux下搭建Kafka环境CSDN:Linux下搭建Kafka环境环境要求操作系统:CentOS7下载地址安装说明Kafka版本:2.11-......
  • kafka 知识点 笔记
    kafka知识点笔记使用kafka消息队列的好处:1)、解耦合不用保证两台客户端同时在线,发送端先发送消息暂时存储,接收端上线后可以自己再获取消息......
  • Kafka重启出错:Corrupt index found
    1、日志记录FATALFatalerrorduringKafkaServerStablestartup.Preparetoshutdown(kafka.server.KafkaServerStartable)java.lang.IllegalArgumentException:requir......
  • Keeping Multiple Databases in Sync Using Kafka Connect and CDC
    BRIJESHJAGGI SEP20,2022Microservicesarchitectureshavenowbeenwidelyadoptedamongdevelopers,andwithagreatdegreeofsuccess.However,......
  • Kafka暴力关闭导致日志被锁
    ERRORShutdownbrokerbecausealllogdirsind:\work\data\kafka-logshavefailed现象:在windows的命令行里启动kafka之后,当关闭命令行窗口时,就会强制关闭kafka。这种......
  • 备份、恢复与性能调优
    备份与恢复概述根据备份的方法将备份分为:HotBackup(热备)(在线备份)ColdBackup(冷备)(离线备份)WarmBackup(温备)热备是指数据库运行中直接备份,对正在运行的数据库操作没......