首页 > 其他分享 >Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性

Kafka生产调优实践。Kafka消息安全性、消息丢失、消息积压、保证消息顺序性

时间:2024-08-10 14:26:19浏览次数:15  
标签:latency kafka records 调优 消息 ms Kafka

文章目录

搭建Kafka监控平台

官网地址

在这里插入图片描述



下载efak-web-3.0.2-bin.tar.gz安装包之后,efak需要依赖JDK和数据库。数据库支持本地化的SQLLite以及集中式的MySQL。生产环境建议使用MySQL。

数据库不需要建表初始化,EFAK在执行过程中会自己完成初始化。



将efak压缩包解压

[root@worker1 ~]# tar -zxvf efak-web-3.0.2-bin.tar.gz -C /app/kafka/eagle



修改efak解压目录下的conf/system-config.properties。 这个文件中提供了完整的配置,下面只列出需要修改的部分。

######################################
# multi zookeeper & kafka cluster list
# Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead
######################################
# 指向Zookeeper地址
efak.zk.cluster.alias=cluster1
cluster1.zk.list=worker1:2181,worker2:2181,worker3:2181

######################################
# zookeeper enable acl
######################################
# Zookeeper权限控制
cluster1.zk.acl.enable=false
cluster1.zk.acl.schema=digest
#cluster1.zk.acl.username=test
#cluster1.zk.acl.password=test123

######################################
# kafka offset storage
######################################
# offset选择存在kafka中。
cluster1.efak.offset.storage=kafka
#cluster2.efak.offset.storage=zk

######################################
# kafka mysql jdbc driver address
######################################
#指向自己的MySQL服务。库需要提前创建
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://worker1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root



配置EFAK的环境变量

[root@worker1 ~]# vi ~/.bash_profile
export KE_HOME=/app/kafka/eagle/efak-web-3.0.2
PATH=$PATH:#KE_HOME/bin:$HOME/.local/bin:$HOME/bin


[root@worker1 ~]# source ~/.bash_profile



启动EFAK

配置完成后,先启动Zookeeper和Kafka服务,然后调用EFAK的bin目录下的ke.sh脚本启动服务

[root@worker1 bin]$ ./ke.sh start
-- 日志很长,看到以下内容表示服务启动成功
[2023-06-28 16:09:43] INFO: [Job done!]
Welcome to
    ______    ______    ___     __ __
   / ____/   / ____/   /   |   / //_/
  / __/     / /_      / /| |  / ,<   
 / /___    / __/     / ___ | / /| |  
/_____/   /_/       /_/  |_|/_/ |_|  
( Eagle For Apache Kafka® )

Version v3.0.2 -- Copyright 2016-2022
*******************************************************************
* EFAK Service has started success.
* Welcome, Now you can visit 'http://192.168.232.128:8048'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************



EFAK管理页面

接下来就可以访问EFAK的管理页面。http://192.168.232.128:8048。 默认的用户名是admin ,密码是123456

关于EFAK更多的使用方式,比如EFAK服务如何集群部署等,可以参考官方文档。



合理规划Kafka部署环境

机械硬盘kafka使用机械硬盘即可,可以不用给Kafka固态硬盘



内存,修改 bin/kafka-start-server.sh启动脚本,修改JVM启动参数中的内存,默认只申请了1G内存。

[oper@worker1 bin]$ cat kafka-server-start.sh 
......
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
......

对于主流的16核32G服务器,可以适当扩大Kafka的内存。例如:

export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn10G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50 ‐XX:G1HeapRegionSize=16M"



高性能网卡,Kafka本身的服务性能非常高,单机就可以支持百万级的TPS,在高流量冲击下,网络非常有可能优先成为性能瓶颈。对于Kafka服务器,建议配置高性能的网卡。成本允许的话,尽量选择千兆以上的网卡。



合理优化Kafka集群配置

合理配置partition分区数量

Kafka的单个Partition读写效率是非常高的,但是Kafka的Partition设计是非常碎片化。如果Partition文件过多,很容易严重影响Kafka的整体性能。

  • 尽量不要使用过多的Topic,通常不建议超过3个Topic。
  • 一个分区下的副本集数量不要设置过多 将副本数设置为2就可以了。

至于Partition的数量,最好根据业务情况灵活调整。partition数量设置多一些,可以一定程度增加Topic的吞吐量。但是过多的partition数量还是同样会带来partition索引的压力。

Kafka提供了一个生产者的性能压测脚本,可以用来衡量集群的整体性能。

# num-record表示要发送100000条压测消息,record-size表示每条消息大小1KB,throughput表示限流控制,设置为小于0表示不限流。
# properducer-props用来设置生产者的参数。
[oper@worker1 bin]$ ./kafka-producer-perf-test.sh --topic test --num-record 1000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=worker1:9092 acks=1
94846 records sent, 18969.2 records/sec (18.52 MB/sec), 1157.4 ms avg latency, 1581.0 ms max latency.
133740 records sent, 26748.0 records/sec (26.12 MB/sec), 1150.6 ms avg latency, 1312.0 ms max latency.
146760 records sent, 29346.1 records/sec (28.66 MB/sec), 1051.5 ms avg latency, 1164.0 ms max latency.
137400 records sent, 27480.0 records/sec (26.84 MB/sec), 1123.7 ms avg latency, 1182.0 ms max latency.
158700 records sent, 31740.0 records/sec (31.00 MB/sec), 972.1 ms avg latency, 1022.0 ms max latency.
158775 records sent, 31755.0 records/sec (31.01 MB/sec), 963.5 ms avg latency, 1055.0 ms max latency.
1000000 records sent, 28667.259123 records/sec (28.00 MB/sec), 1030.44 ms avg latency, 1581.00 ms max latency, 1002 ms 50th, 1231 ms 95th, 1440 ms 99th, 1563 ms 99.9th.



合理对数据进行压缩

在生产者的ProducerConfig中,有一个配置 COMPRESSION_TYPE_CONFIG,是用来对消息进行压缩的。

public static final String COMPRESSION_TYPE_CONFIG = "compression.type";

生产者配置了压缩策略后,会对生产的每个消息进行压缩,从而降低Producer到Broker的网络传输,也降低了Broker的数据存储压力。

所支持的几种压缩算法中,zstd算法具有最高的数据压缩比,但是吞吐量不高,lz4在吞吐量方面的优势比较明显。压缩消息必然增加CPU的消耗,如果CPU资源紧张,就不要压缩了。

关于数据压缩机制,可以在Broker的conf/server.properties文件中进行配置。正常情况下,Broker从Producer端接收到消息后不会对其进行任何修改,但是如果Broker端和Producer端指定了不同的压缩算法,就会产生很多异常的表现。

compression.type

Type:	string
Default:	producer
Valid Values:	[uncompressed, zstd, lz4, snappy, gzip, producer]



如果开启了消息压缩,那么在消费者端自然是要进行解压缩的。

在Kafka中,消息从Producer到Broker再到Consumer会一直携带消息的压缩方式,这样当Consumer读取到消息集合时,自然就知道了这些消息使用的是哪种压缩算法,也就可以自己进行解压了。但是这时要注意的是应用中使用的Kafka客户端版本和Kafka服务端版本是否匹配。



优化Kafka客户端使用方式

合理保证消息安全

消息生产者设置好发送者应答参数

  • 消息生产者的acks参数,可以设置为0、1、all或-1
  • Broker的min.insync.replicas参数。如果生产者的acks设置为-1或all,服务端并不是强行要求所有Paritition都完成写入再返回,而是可以配置多少个Partition完成消息写入后,再往Producer返回消息。



生产者端的幂等性配置

// 值为 true 或 false
public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";

如果要开启幂等性,那么

  • 生产者消息缓存机制中的 max.in.flight.requests.per.connection <=5
  • 重试次数retries>0
  • 应答机制中的acks必须为all

其他补充点:

  • 如果没有设置冲突配置,则默认启用幂等性。
  • 如果设置了冲突的配置,并且幂等性没有显式启用,则幂等性被禁用。
  • 即显示开启的幂等性,又有冲突配置,就抛异常



使用生产者事务机制发送消息

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 提交事务
void commitTransaction() throws ProducerFencedException;
// 4 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

SpringBoot集成Kafka时使用的KafkaTemplate就提供了事务消息机制

在这里插入图片描述



消费者端合理使用提交方式

如果消费者要使用异步方式进行业务处理,那么如果业务处理失败,此时消费者已经提交了Offset,这个消息就无法重试了,这就会造成消息丢失。

因此在消费者端,尽量不要使用异步处理方式,在绝大部分场景下,就能够通过Kafka的消费者重试机制,保证消息安全处理。此时,在消费者端,需要更多考虑的问题,就变成了消费重试机制造成的消息重复消费的问题。



消费者防止消息重复消费

问题的产生:消费者业务处理时间较长,此时消费者正常处理消息的过程中,Broker端就已经等不下去了,认为这个消费者处理失败了。这时就会往同组的其他消费者实例投递消息,这就造成了消息重复处理。这就给消费者端带来不必要的幂等性问题。

解决方式:

  • 根据业务ID校验。比如对于订单消息,消费者根据订单ID去确认一下这个订单消息有没有处理过。

  • 统一的方式处理幂等性问题

    将Offset放到Redis中自行进行管理。通过Redis中的offset来判断消息之前是否处理过。

    具体实现详情请参考上文《客户端 — 客户端属性分析 — 消息重复消费问题》



生产环境常见问题分析

消息零丢失方案

在这里插入图片描述



1、 生产者发送消息到Broker不丢失

生产者丢失消息的原因就和ack机制有关,如果将acks设置为1、all或-1 就表示接收到Broker成功将消息写入文件后的ack应答



2、Broker端保证消息不丢失

Broker端丢失消息的原因主要有两种:Leader partition重新选举、服务器非正常关机 pageCache中的消息还未刷盘

《服务端的Zookeeper元数据梳理 — Partition故障恢复机制》 《服务端日志 — 合理配置刷盘频率》

消息生产者将acks设置为all可以有效避免因为Leader partition故障导致的消息丢失;

而pageCache刷盘可以通过下面的参数来设定合理的配置

# 多长时间进行一次强制刷盘。默认是Long.MAX。
flush.ms

# 表示当同一个Partiton的消息条数积累到这个数量时,就会申请一次刷盘操作。默认是Long.MAX。
log.flush.interval.messages

#当一个消息在内存中保留的时间,达到这个数量时,就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空,则生效的是下一个参数。
log.flush.interval.ms

# 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
log.flush.scheduler.interval.ms



3、消费者保证消息不丢失

同步处理业务逻辑,同步手动提交offset



消息积压如何处理

在这里插入图片描述

  • 如果业务运行正常,只是因为消费者处理消息过慢,造成消息加压。

    增加消费者个数,最多让一个消费者组下的消费者个数=Partition分区数,让一个Consumer负责一个分区,将消费进度提升到最大。

    如果还是不够,那就新创建一个topic,给这个topic更多的partition,然后启动一批消费者,将消息从旧topic中搬运到新topic中,这些消费者不处理业务消息,仅仅是搬运。这样就可以创建更多的消费者消费从新topic消费了

  • 如果是消费者业务处理异常导致的消息积压。

    使用一种降级方案。先启动一个Consumer将Topic下的消息先转发到其他队列中,然后再慢慢分析新队列里的消息处理问题。类似于死信队列的处理方式。



如何保证消息顺序

Kafka很难保证消息的顺序,因为Kafka设计的最优先重点是海量吞吐。

在这里插入图片描述



这个问题要分两个方面考虑:

  1. Topic下各个partition是并发处理消息的 ,producer要保证一组有序的消息发送到一个partition中
  2. consumer要从partition中顺序处理消息



问题一:producer要保证一组有序的消息发送到一个partition中

第一种简单粗暴的方式就只为Topic创建一个partition,这种方式放弃了多partition带来的高吞吐量。

第二种方式,自己定义一个分区路由机制,实现org.apache.kafka.clients.producer.Partitioner接口,将消息分配到同一个Partition上。

此时发送方保证了发送时的消息顺序性。但是存在网络问题,导致发送的一批消息中某一个消息丢失,这样消息到Broker时就还是没办法保证顺序性。Kafka是通过幂等性中单调递增的sequenceNumber来保证消息是顺序,因为是单调递增的,所以还能判断是否存在消息丢失一旦Kafka发现Producer传过来的SequenceNumber出现了跨越,那么就意味着中间有可能消息出现了丢失,就会往Producer抛出一个OutOfOrderSequenceException异常。



问题二:Partition中的消息有序后,如何保证Consumer的消费顺序是有序的

// 一次拉取消息的数量
public static final String FETCH_MAX_BYTES_CONFIG = "fetch.max.bytes";
Note that the consumer performs multiple fetches in parallel.
    
    
public static final int DEFAULT_FETCH_MAX_BYTES = 50 * 1024 * 1024;

从上方fetch.max.bytes参数的最后一个解释可以知道Consumer其实是每次并行的拉取多个Batch批次的消息进行处理的。所以在Kafka提供的客户端Consumer中,是没有办法直接保证消费的消息顺序。

我们能做的就是在Consumer的处理逻辑中,将消息进行排序。比如将消息按照业务独立性收集到一个集合中,然后在集合中对消息进行排序。

RocketMQ中提供了顺序消息的实现。他的实现原理是先锁定一个队列,消费完这一个队列后,才开始锁定下一个队列,并消费队列中的消息。再结合MessageQueue中的消息有序性,就能保证整体消息的消费顺序是有序的。

标签:latency,kafka,records,调优,消息,ms,Kafka
From: https://blog.csdn.net/qq_44027353/article/details/141090476

相关文章

  • Kafka服务端的各种机制实现原理
    文章目录zookeeper整体元数据ControllerBroker选举机制LeaderPartition选举机制LeaderPartition自动平衡机制Partition故障恢复机制HW一致性保障-Epoch更新机制zookeeper整体元数据Zookeeper客户端工具:prettyZoo下载地址我们知道kafka集群选举相关都是基于zook......
  • 单体应用提高性能及处理高并发-异步处理与消息队列
            在单体应用中,应对高并发和提升性能是开发者常面对的挑战。异步处理与消息队列是两个有效的手段,可以帮助开发者将耗时操作与主线程分离,减少阻塞,提高系统的响应速度和吞吐量。1.异步处理异步处理允许应用程序在执行耗时操作时不阻塞主线程。这对于提高系统性......
  • Linguistics-English-高频词辩析 + 常用句式:keep me posted(有消息通知我 )
    高频词辨析someVSany:some:肯定句式,ihavesomepen.caniborrowsomepen?any:疑问/否定句式idon'thaveanypen.doyouhaveanypen?fewVSafew,littleVSalittlefew+可数名词复数:很少(几乎没有),表"否定".afew+可数名词复......
  • JVM(Java Virtual Machine)性能调优
    JVM(JavaVirtualMachine)性能调优是优化Java应用程序性能的关键步骤,涉及多个方面的考虑和调整。以下是一个详尽的JVM性能调优指南,涵盖了主要的技术点、调优策略和具体步骤。一、JVM性能调优概述JVM性能调优的主要目标是提高Java应用程序的响应速度、吞吐量和稳定性,同时减......
  • Kafka整合SpringBoot
    前文Kafka客户端详解引入依赖<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target>......
  • 【IO】IPC通信机制函数(消息队列,共享内存,信号量集函数整理汇总)
            整理了一下IPC通信的函数,包括消息队列,共享内存,信号量集;信号量集的使用是在共享内存的基础上使用,函数太多啦,慢慢学吧cc,争取全部记住        其中在使用有关信号量集的函数的时候,进行简单的封装函数功能之后,再进行使用,会更加方便,在文章最后对信号量集的......
  • 基于 go-zero 框架的项目中集成 Kafka
    Kafka封装Kafka集成指南本文档描述了如何在基于go-zero框架的项目中集成Kafka。1.项目结构在项目中添加以下文件和目录:└──consts└──kafka.go└──pkg└──kafka├──consumer.go└──producer.go2.常量定义在consts......
  • KubeSphere 部署 Kafka 集群实战指南
    本文档将详细阐述如何利用Helm这一强大的工具,快速而高效地在K8s集群上安装并配置一个Kafka集群。实战服务器配置(架构1:1复刻小规模生产环境,配置略有不同)主机名IPCPU内存系统盘数据盘用途ksp-registry192.168.9.904840200Harbor镜像仓库ksp-co......
  • 11.面试题——消息队列RabbitMQ
    1.RabbitMQ是什么?特点是什么?RabbitMQ是一种开源的消息队列中间件,用于在应用程序之间进行可靠的消息传递。它实现了AMQP(AdvancedMessageQueuingProtocol)协议,提供了强大的消息处理能力。RabbitMQ的主要特点包括:可靠性:RabbitMQ使用可靠的消息传递机制,确保消息能够安全地传......
  • 【自动驾驶】自定义消息格式的话题通信(C++版本)
    目录新建消息文件更改包xml文件中的依赖关系更改cmakelist文件中的配置执行时依赖改变cmakelist编译顺序发布者程序调用者程序程序测试新建消息文件在功能包目录下,新建msg文件夹,下面新建mymsg.msg文件,其内容为stringnamefloat64value发布者包含该消息,生成头文......