首页 > 其他分享 >Kafka 之批量消息发送消费

Kafka 之批量消息发送消费

时间:2024-10-31 15:48:22浏览次数:7  
标签:10 27 批量 kafka 发送 消息 Kafka

前言:

前面我们分享了 Kafka 的一些基础知识,以及 Spring Boot 集成 Kafka 完成消息发送消费,本篇我们来分享一下 Kafka 的批量消息发送消费。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 消息批量发送

Kafka 没有提供批量发送消息的 API,Kafka 的方式是提供一个 RecordAccumulator 消息收集器,将发送给同一个 Topic 同一个 Partition 的消息先缓存起来,当其达到某些条件后,才会一次性的将消息提交给 Kafka Broker。

Kafka 消息的批量发送主要跟以下三个参数有关:

  • batch.size:批量发送消息的大小,默认 16KB,产生的消息达到这个数量后,即刻触发消息批量提交到 Kafka Broker。
  • buffer.memory:生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数,模式是 32 MB,如果超过这个数量,即刻触发消息批量提交到 Kafka Broker。
  • linger.ms:批量发送的的最大时间间隔,单位是毫秒,当达到配置的时间之后,会立刻触发消息批量提交大 Kafka Broker。

以上三个条件满足一个就会触发消息的批量提交。

官方文档传送门

Kafka 批量消息 参数配置

上面我们分析了 Kafka 没有提供批量发送的 API,而是使用了三个参数来控制批量发送的,换句话说,其实我们每次使用 Kafka 发送消息的时候都是批量发送,Kafka 批量发送消息的代码没有什么特殊之处,只需要对上面解释的三个参数进行按需配置即可,本案例的配置如下:

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

Kafka 批量消息 Producer 代码演示

Kafka 批量发送消息代码如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: MyKafkaBatchProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaBatchProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void batchSendMessage() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        log.info("开始消息发送,当前时间:{}", dateStr);
        for (int a = 0; a < 1000; a++) {
            this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");
        }
        log.info("完成消息发送,当前时间:{}", dateStr);
    }

}

在 Kafka 发送完成消息后,我们记录了当前时间,这个时间是用来证明消息是被批量发送的。

Kafka 批量消息 Consumer 代码演示

Kafka 批量消息的代码也没有什么特殊之处,还是使用 @KafkaListener注解来监听消息,只不过参数变成了 List<ConsumerRecord<String, String>> 类型,然后我们在配置中配置了批量消费的模式,批量消费的配置如下:

#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息
spring.kafka.listener.type = batch

Consumer 代码如下:

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @ClassName: MyKafkaBatchConsumer 
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaBatchConsumer {

    @KafkaListener(id = "my-kafka-consumer-01",
            groupId = "my-kafka-consumer-groupId-01",
            topics = "my-topic",
            containerFactory = "myContainerFactory",properties = {"max.poll.records:10"})
    public void listen(List<ConsumerRecord<String, String>> consumerRecords) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        log.info("my-kafka-consumer-groupId-01 消息消费成功,当前时间:{},消息size:{}", dateStr, consumerRecords.size());
        for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
            String value = consumerRecord.value();
            log.info("消息内容:{}",value);
        }
    }

}

这里我们使用了 properties 这个属性配置,后面详细讲解。

** Kafka 批量消息验证**

触发消息发送消费结果如下:

2024-10-27 15:27:17.563  INFO 18320 --- [nio-8086-exec-2] c.o.s.k.producer.MyKafkaBatchProducer    : 完成消息发送,当前时间:2024-10-27 15:27:17
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:27:22,消息size:10
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第0条 kafka 消息
2024-10-27 15:27:22.569  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第1条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第2条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第3条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第4条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第5条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第6条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第7条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第8条 kafka 消息
2024-10-27 15:27:22.570  INFO 18320 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : 消息内容:第9条 kafka 消息

2024-10-27 15:27:17 完成消息发送,:2024-10-27 15:27:22 完成消息消费,时间间隔是 5秒,消息是 10 条,符合预期。

我们修改配置再次演示,将批量发送消息的时间间隔改为 10 秒,同时一次性发送 1000 条消息,是消息的总大小大于 1KB。

#批量发送消息的大小 默认 16KB 我们这里为了演示效果 配置为1Kb
spring.kafka.producer.batch-size = 1024
#生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
spring.kafka.producer.buffer-memory = 33554432
#批量发送的的最大时间间隔,单位是毫秒
spring.kafka.producer.properties.linger.ms=50000

调整消息发送端代码如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
 * @ClassName: KafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaBatchProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void batchSendMessage() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        log.info("开始消息发送,当前时间:{}", dateStr);
        for (int a = 0; a < 1000; a++) {
            this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");
        }
        log.info("完成消息发送,当前时间:{}", dateStr);
    }

}

触发消息发送消费结果如下:

2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-2-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10
2024-10-27 15:41:39.530  INFO 17440 --- [nsumer-01-0-C-1] c.o.s.k.consumer.MyKafkaBatchConsumer    : my-kafka-consumer-groupId-01 消息消费成功,当前时间:2024-10-27 15:41:39,消息size:10

可以看到消息发送和消息消费几乎是同时进行的,因为这里我们打印的是时间只有秒,是看不出差异的,但是也可以根据这个结果看出,消费者并没有等到 10秒后才开始消费,是因为批量发送消息的大小大于了1KB 就触发了批量消息的提交,符合上面我们说的三个条件满足其中一个就触发批量消息提交到 Kafka Broker,结果符合预期。

关于 buffer-memory 这个参数这里不做验证了,有兴趣的朋友可以自己去验证哈。

spring.kafka.consumer.max-poll-records 参数讨论

spring.kafka.consumer.max-poll-records 表示一次调用 poll() 操作时返回的最大记录数,默认为 500 条,上面的案例中我们使用了 properties = {“max.poll.records:10”} 这个配置,其实这个配置也是配置批量拉去消息的最大数量,我们配置的是 10,日志记录每次最多拉去的数量就是 10,使用 properties 的配置方式可以覆盖掉项目配置文件中的配置,也就是局部配置覆盖全局配置,这样做的好处是显而易见的,我们可以针对每个消费端按需做出灵活配置。

总结:本篇简单分享了 Kafka 批量发送消息消费的一些案例,希望可以帮助到有需要的朋友,分享有错误的地方也欢迎大家提出纠正。

如有不正确的地方欢迎各位指出纠正。

标签:10,27,批量,kafka,发送,消息,Kafka
From: https://blog.csdn.net/weixin_42118323/article/details/143217962

相关文章

  • kafka安装-mac
    kafka安装-Macmac和linux安装kafka方式一样1.下载安装官网下载:https://kafka.apache.org/downloads==》kafka_2.13-3.3.1这里安装3.31版本,kafka的安装包版本不区分windows和Linux,都用的一个包,包含windows和Linux的执行脚本。kafka1.x,2.x版本强依赖zk(记录broker,leader-foll......
  • 015_Kafka
    1kafka简介削峰填谷kafka的主要架构1)Producer:消息生产者,就是向kafkabroker发消息的客户端;2)Consumer:消息消费者,向kafkabroker取消息的客户端;3)ConsumerGroup(CG))::消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一......
  • 金蝶云星空批量插入单据到数据库
    ##****************************服务插件*******************#引入clr运行库importclr#添加对cloud插件开发的常用组件的引用clr.AddReference('System')clr.AddReference('System.Data')clr.AddReference('Kingdee.BOS')clr.AddReference('Kingdee.BOS.Core'......
  • kafka结构
    生产者(Producer)消息生产:生产者是消息的源头,负责创建并发送消息到Kafka的Topic。它将应用程序产生的数据转换为消息格式,并根据一定的策略(如轮询、基于键的哈希等)将消息发送到Topic的不同分区。例如,在一个电商系统中,订单创建服务作为生产者,会将新订单的信息封装成消息发送到“订......
  • iconfont 批量把图标加入购物车的方法 并且在C#窗体中使用
    iconfont 是阿里旗下很好用的图标管理网站(https://www.iconfont.cn/),里面有百万个小图标,可以随意下载切换颜色,是很多前端人员的选择。但是网站没有将图标批量加入购物车的功能,很不方便,现记录下批量加入购物车的js代码:在浏览器中按f12打开【开发人员工具】,找到【console(控......
  • 【系统设计】高效的分布式系统:使用 Spring Boot 和 Kafka 实现 Saga 模式
    在现代分布式系统中,管理跨多个服务的长事务至关重要。传统的分布式事务解决方案往往面临性能瓶颈和复杂性问题,而Saga模式作为一种灵活高效的解决方案,逐渐受到开发者的青睐。本文将探讨如何利用SpringBoot和Kafka实现Saga模式,并详细介绍事务补偿机制,帮助你构建稳定......
  • 批量提取
    在Python中批量提取多个Excel文件的数据并将其写入同名的CSV文件,可以使用pandas库来实现。以下是一个简单的脚本示例,它将遍历指定文件夹中的所有Excel文件,读取数据,并将这些数据保存为同名的CSV文件。首先,确保你已经安装了pandas和openpyxl(用于读取Excel文件)库。如果没有安......
  • 不使用docker-compose不使用zookeeper启动ApacheKafka3.8.0单机运行KRAFT模式
    dockerrun-d-v/kafka_data:/opt/kafka-logs-eKAFKA_ENABLE_KRAFT=yes-eKAFKA_PROCESS_ROLES=broker,controller-eKAFKANODEID=1-eKAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.1......
  • 淘宝商家电话采集工具 批量导出淘宝天猫商家联系方式软件 Python使用教程
    分享作者:下去沉淀沉淀吧1030249563(v)以Python为例,下面是一个简单的教程,来介绍如何使用python语音进行淘宝天猫商家爬虫。首先,我们需要安装以下库:requests,beautifulsoup4。可以使用以下命令进行安装:pipinstallrequestspipinstallbeautifulsoup4接下来,我们需要导入这些......
  • Kafka的消费者
    Kafka的消费者Kafka采用消费者组的方式来消费消息,一个消费者组中可以包含多个消费者。消费者对象订阅主题并接收Kafka的消息,然后验证消息并保存结果。尽管一个消费者组中可以包含多个消费者,但是它们订阅的都是同一个主题的消息。1.消费模式当生产者将消息发送到Kafka集群后,会......