首页 > 其他分享 >Kafka 之自定义消息拦截器【Kafka 拦截器】

Kafka 之自定义消息拦截器【Kafka 拦截器】

时间:2024-11-10 14:44:00浏览次数:3  
标签:拦截器 自定义 springframework kafka org import Kafka

前言:

拦截器这个概念相信大部分朋友都不会陌生,Spring MVC 拦截器相信大家都用过,拦截器的核心思想就是运行应用程序在不修改业务逻辑的前提下,动态的实现一组可插拔的事件处理器链,它可以在业务链路中的前后各个点进行对应的拦截,做一些统一的处理,Sping MVC 的拦截器大家都了解,本篇我们来分享一下 Kafka 的拦截器。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

Kafka 之顺序消息

Kafka 之事务消息

Kafka 如何保证消息不丢失?【消息手动 ACK】

什么是 Kafka 的拦截器?

Kafka 的拦截器分为 Producer 拦截器和 Consumer 拦截器,使用 Producer 拦截器可以在消息发送前及消息发送成功后植入自定义的业务逻辑,而 Consumer拦截器支持在消息消费前以及提交位移后编写特定逻辑,不管是 Producer 拦截器还是 Consumer 拦截器都支持拦截器链,可以将一系列的拦截器组装成一个拦截器链,Kafka 会按照添加顺序一次执行拦截器逻辑,Kafka 为我们提供了两个拦截器接口,分别是 ProducerInterceptor 和 ConsumerInterceptor,我们实现该接口重新方法实现自定义业务其逻辑即可。

ProducerInterceptor 源码分析

ProducerInterceptor 是 Kafka 的生产者拦截器,实现了 Configurable 接口,提供了 onSend、onAcknowledgement、close、configure 四个方法,方法的作用如下:

  • onSend:该方法会在消息发送之前被调用,如果你想给发送出去的消息进行统一处理,可以从这里下手。
  • onAcknowledgement:该方法会在消息成功提交或者发送失败后调用,我们的异步消息发送中有个 callback,onAcknowledgement 方法会在 callback 方法之前调用,需要注意的是该方法和 onSend 方法不是在同一个线程中调用,如果在这两个方法中使用贡献变量的时候就要特别注意,一般不建议在这个方法中加入过多的业务逻辑,否则会影响 Kafka 的性能。
  • close:拦截器关闭前的处理。
  • configure:初始化配置。
public interface ProducerInterceptor<K, V> extends Configurable {

    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

public interface Configurable {
    void configure(Map<String, ?> var1);
}

ConsumerInterceptor 源码分析

ConsumerInterceptor 是 Kafka 的消费者拦截器,同样实现了 Configurable 接口,提供了 onConsume、onCommit、close、configure 四个方法,方法的作用如下:

  • onConsume:该方法会在消费者正式消费之前被调用,如果你想对消息消费之前做一些统一处理,可以在该方法中实现。
  • onCommit:该方法会在 Kafka 提交 offset 之后调用,通常可以在该方法中进行一些日志记录等。
  • close:拦截器关闭前的处理。
  • configure:初始化配置。
package org.apache.kafka.clients.consumer;

import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);

    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);

    void close();
}

自定义实现 Kafka 生产者拦截器

Kafka 给我们提供了生产者拦截器接口,现在我们自己来实现一个 Kafka 生产者拦截器,自定义 Kafka 生产者拦截器代码如下:

 package com.order.service.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * @ClassName: MyKafkaProducerInterceptor
 * @Author: Author
 * @Date: 2024/10/31 11:11
 * @Description:
 */
@Slf4j
public class MyKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {

    //消息发送前的确认
    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> producerRecord) {
        log.info("消息发送前操作");
        return producerRecord;
    }

    //消息确认
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        log.info("消费发送后的回调");
    }

    //拦截器关闭后的操作
    @Override
    public void close() {

    }

    //初始化相关操作
    @Override
    public void configure(Map<String, ?> map) {

    }
}

在实现 Kafka 生产者拦截器的代码中,我这里只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。

配置 Kafka 生产者拦截器

我们自定义了 Kafka 生产者拦截器,要想自定义的生产者拦截器生效,我们还需要配置该拦截器,核心代码如下:

//自定义生产者消息拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaProducerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

可以看到我们传入了一个 List 对象,也印证了前面说的 Kafka 支持一个拦截器链。

本案例完整的生产者配置代码如下:

package com.order.service.config;

import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import com.order.service.interceptor.MyKafkaProducerInterceptor;
import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Value("${spring.kafka.producer.properties.linger.ms}")
    private String lingerMs;

    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //批量发送消息的大小 默认 16KB
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //批量发送的的最大时间间隔,单位是毫秒
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        //自定义分区器配置
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        //自定义生产者消息拦截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add(MyKafkaProducerInterceptor.class.getName());
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }


}

自定义实现 Kafka 消费者拦截器

Kafka 给我们提供了消费者拦截器接口,现在我们自己来实现一个 Kafka 消费者拦截器,自定义 Kafka 消费者拦截器代码如下:

package com.order.service.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

/**
 * @ClassName: MyKafkaConsumerInterceptor
 * @Author: Author
 * @Date: 2024/10/31 11:11
 * @Description:
 */
@Slf4j
//@Component
public class MyKafkaConsumerInterceptor implements ConsumerInterceptor<Object, Object> {

    //消息消费前的处理
    @Override
    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        log.info("消费前处理");
        return consumerRecords;
    }

    //拦截器关闭前的处理
    @Override
    public void close() {
        log.info("拦截器关闭前的处理");
    }

    //Kafka 提交 offset 前的处理
    @Override
    public void onCommit(Map map) {
        log.info("消息消费提交offset");
    }

    //初始化配置
    @Override
    public void configure(Map<String, ?> map) {
        log.info("拦截器初始化配置");
    }
}

同样在实现 Kafka 消费者拦截器的代码中,我这里同样只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。

配置 Kafka 消费者拦截器

我们自定义了 Kafka 消费者拦截器,要想自定义的消费者拦截器生效,我们同样也还需要配置该拦截器,核心代码如下:

//添加自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaConsumerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

可以看到我们传入了一个 List 对象,同样也印证了前面说的 Kafka 支持一个拦截器链。

本案例完整的消费者配置代码如下:

package com.order.service.config;

import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

import java.util.*;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String offsetReset;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitIntervalMs;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;

    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(12);
        //是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //kafak 服务器
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
        //消费组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //一次调用poll()操作时返回的最大记录数,默认值为500
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        //自动提交时间间隔 默认 5秒
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        //添加自定义拦截器
        List<String> interceptors = new ArrayList<>();
        interceptors.add(MyKafkaConsumerInterceptor.class.getName());
        props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
        return props;
    }


    /**
     * @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
     * @date 2024/10/22 19:41
     * @description kafka 消费者工厂
     */
    @Bean("myContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
        // 并发创建的消费者数量
        factory.setConcurrency(3);
        // 开启批处理
        factory.setBatchListener(true);
        //拉取超时时间
        factory.getContainerProperties().setPollTimeout(1500);
        //是否自动提交 ACK kafka 默认是自动提交
        if (!enableAutoCommit) {
            //共有7中方式
            factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        }

        return factory;
    }

    /*@Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(){
        ConsumerRecordRecoverer recordRecoverer=new DeadLetterPublishingRecoverer(new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(getMyKafkaProps())));
        BackOff backOff=new FixedBackOff(10,3L);
        return new SeekToCurrentErrorHandler(recordRecoverer, backOff);
    }*/

    @Bean
    @Primary
    public BatchErrorHandler kafkaBatchErrorHandler() {
        // 创建 SeekToCurrentBatchErrorHandler 对象
        SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
        // 创建 FixedBackOff 对象
        BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
        batchErrorHandler.setBackOff(backOff);
        // 返回
        return batchErrorHandler;
    }


}

Kafka 自定义拦截器结果验证

Kafka 自定义拦截器结果验证之生产者代码

前面已经多次分享了 Kafka 的生产者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: ManualKafkaProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 手动ACK消息生产者
 */
@Slf4j
@Component
public class ManualKafkaProducer {


    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    //同步发送消息
    public void sendManualMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(new Date());
        //同步发送消息
        try {
            kafkaTemplate.send("manual-ack-topic", message).get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        log.info("Manual ACK 消息生产者完成消息发送,当前时间:{}", dateStr);
    }

}

Kafka 自定义拦截器结果验证之消费者代码

前面已经多次分享了 Kafka 的消费者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * @ClassName: ManualAckKafkaConsumer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 手动 ACK 消息消费
 */
@Slf4j
@Component
public class ManualAckKafkaConsumer {

    @KafkaListener(id = "my-kafka-manual-consumer",
            groupId = "my-kafka-consumer-manual-groupId-01",
            topics = "manual-ack-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message, Acknowledgment acknowledgment) {
        log.info("Manual ACK 消息消费成功,消息内容:{}", message);
        //手动提交 ACK
        acknowledgment.acknowledge();
    }

}

Kafka 自定义拦截器结果验证

我们出发消息发送消费控制台得到如下日志:

2024-11-02 19:02:53.343  INFO 38324 --- [nio-8086-exec-5] c.o.s.i.MyKafkaProducerInterceptor       : 消息发送前操作
2024-11-02 19:02:53.449  INFO 38324 --- [ad | producer-1] c.o.s.i.MyKafkaProducerInterceptor       : 消费发送后的回调
2024-11-02 19:02:53.449  INFO 38324 --- [nio-8086-exec-5] c.o.s.k.producer.ManualKafkaProducer     : Manual ACK 消息生产者完成消息发送,当前时间:2024-11-02 19:02:53
2024-11-02 17:02:53.450  INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor       : 消费前处理
2024-11-02 19:02:53.451  INFO 38324 --- [-consumer-0-C-1] c.o.s.k.consumer.ManualAckKafkaConsumer  : Manual ACK 消息消费成功,消息内容:我是一条同步消息
2024-11-02 19:02:53.456  INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor       : 消息消费提交offset

根据控制台的日志结果来看,结果符合预期。

总结:本篇我们分享了 Kafka 的拦截器的相关操作,有了拦截器我们可以对 Kafka 的消息生产消费进行统一处理,可以让我们的业务更灵活,代码逻辑更严谨,希望可以帮助到有需要的小伙伴。

如有不正确的地方欢迎各位指出纠正。

标签:拦截器,自定义,springframework,kafka,org,import,Kafka
From: https://blog.csdn.net/weixin_42118323/article/details/143415916

相关文章

  • Kafka - 启用安全通信和认证机制_SSL + SASL
    文章目录官方资料概述制作kakfa证书1.1openssl生成CA1.2生成server端秘钥对以及证书仓库1.3CA签名证书1.4服务端秘钥库导入签名证书以及CA根证书1.5生成服务端信任库并导入CA根数据1.6生成客户端信任库并导入CA根证书2配置zookeeperSASL认证2.1编写zk_server......
  • Kafka 核心要点解析
    目录一、Kafka消息发送流程二、Kafka的设计架构三、Kafka分区的目的四、Kafka保证消息有序性的方式五、ISR、OSR、AR概念六、Kafka在什么情况下会出现消息丢失七、保证Kafka可靠性的方法八、Kafka数据去重九、生产者提高吞吐量的方法十、Zookeeper在Kafka......
  • 手把手教你搭建Windows+YOLO11+CUDA环境,以EMA注意演示如何改进YOLO11, 训练自定义数据
    YOLOv11目标检测创新改进与实战案例专栏文章目录:YOLOv11创新改进系列及项目实战目录包含卷积,主干注意力,检测头等创新机制以及各种目标检测分割项目实战案例专栏链接:YOLOv11目标检测创新改进与实战案例文章目录YOLOv11目标检测创新改进与实战案例专栏前言本......
  • 织梦网站怎么修改自定义,织梦自定义字段管理
    在织梦CMS中,自定义字段可以帮助你扩展文章或页面的属性,以下是一些管理自定义字段的步骤:登录后台:打开织梦CMS的后台管理页面,输入用户名和密码登录。进入模型管理:在后台左侧菜单中,点击“核心”>“频道模型”>“管理内容模型”。选择模型:选择需要添加自定义字段......
  • 鸿蒙项目实战(三):自定义弹窗开发实践
    自定义弹窗选型合理选择不同的系统能力实现弹窗,有利于提升应用开发效率,实现更好的功能需求,因此了解自定义弹窗的选型和差异非常重要。在应用开发中,为了选择出合适的弹窗选型,从使用场景上,需要重点关注以下两点:弹窗与界面代码解耦在开发业务逻辑时,例如遇到一些网络请求失败的场......
  • ethereum.FilterQuery 日志查询处理自定义事件
    前言:在开发中也是遇到这个问题了,并非常见的Transfer,Approve等在ERC20中定义的事件,只要你的事件在sol文件中存在,那还好处理,但是如果不存在,刚开始接触的时候,你可能就有点懵,我也是找了两天,查阅了很多资料,并没什么niao用,偶然看见了区块浏览器中的log才恍然大悟。问题:Uniswa......
  • Kafka 分区的目的?
    Kafka分区的主要目的包括以下几点:提高吞吐量:分区允许多个消费者并行读取数据,从而显著提高系统的整体吞吐量。每个分区可以由不同的消费者实例处理,实现负载均衡。数据分布:通过分区,数据可以分布在多个Broker上,避免单个Broker成为性能瓶颈。这使得Kafka能够支持大规模的数据存储......
  • 【Linux】为终端命令自定义快件键并弹窗提醒 设置快捷键切换网络代理(Network Proxy)Dis
    【Linux】为终端命令自定义快件键并弹窗提醒设置快捷键切换网络代理(NetworkProxy)Disabled/Manual并弹窗提醒可以自定义快捷键执行终端命令,执行完毕会有弹窗提醒。下面给一个例子,设置快捷键切换网络代理(NetworkProxy)Disabled/Manual并弹窗提醒。适用于Ubuntu系统,为......
  • 理解Web登录机制:会话管理与跟踪技术解析(四)-拦截器Interceptor、异常处理
    本文将详细探讨如何通过拦截器实现登录校验,并介绍如何通过异常处理来确保系统的鲁棒性。我们将通过具体的示例,深入分析如何在Spring框架中配置拦截器与异常处理,以便为开发者提供一套高效、安全的登录校验和异常管理方案。目录前言拦截器Interceptor快速入门Interceptor......
  • Qt 窗口强制禁用系统阴影(自定义菜单)
    解决方法当只使用Qt::FramelessWindowHint时,不会显示系统阴影,因为自定义菜单一般都会添加Qt::Popup,添加Qt::Popup后系统会默认添加阴影效果。在添加Qt::Popup后,继续添加Qt::NoDropShadowWindowHint枚举解决。 最小复现代码:QDialog*dia=newQDialog();dia->setW......