前言:
拦截器这个概念相信大部分朋友都不会陌生,Spring MVC 拦截器相信大家都用过,拦截器的核心思想就是运行应用程序在不修改业务逻辑的前提下,动态的实现一组可插拔的事件处理器链,它可以在业务链路中的前后各个点进行对应的拦截,做一些统一的处理,Sping MVC 的拦截器大家都了解,本篇我们来分享一下 Kafka 的拦截器。
Kafka 系列文章传送门
Kafka 客户端工具使用分享【offsetexplorer】
什么是 Kafka 的拦截器?
Kafka 的拦截器分为 Producer 拦截器和 Consumer 拦截器,使用 Producer 拦截器可以在消息发送前及消息发送成功后植入自定义的业务逻辑,而 Consumer拦截器支持在消息消费前以及提交位移后编写特定逻辑,不管是 Producer 拦截器还是 Consumer 拦截器都支持拦截器链,可以将一系列的拦截器组装成一个拦截器链,Kafka 会按照添加顺序一次执行拦截器逻辑,Kafka 为我们提供了两个拦截器接口,分别是 ProducerInterceptor 和 ConsumerInterceptor,我们实现该接口重新方法实现自定义业务其逻辑即可。
ProducerInterceptor 源码分析
ProducerInterceptor 是 Kafka 的生产者拦截器,实现了 Configurable 接口,提供了 onSend、onAcknowledgement、close、configure 四个方法,方法的作用如下:
- onSend:该方法会在消息发送之前被调用,如果你想给发送出去的消息进行统一处理,可以从这里下手。
- onAcknowledgement:该方法会在消息成功提交或者发送失败后调用,我们的异步消息发送中有个 callback,onAcknowledgement 方法会在 callback 方法之前调用,需要注意的是该方法和 onSend 方法不是在同一个线程中调用,如果在这两个方法中使用贡献变量的时候就要特别注意,一般不建议在这个方法中加入过多的业务逻辑,否则会影响 Kafka 的性能。
- close:拦截器关闭前的处理。
- configure:初始化配置。
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
public interface Configurable {
void configure(Map<String, ?> var1);
}
ConsumerInterceptor 源码分析
ConsumerInterceptor 是 Kafka 的消费者拦截器,同样实现了 Configurable 接口,提供了 onConsume、onCommit、close、configure 四个方法,方法的作用如下:
- onConsume:该方法会在消费者正式消费之前被调用,如果你想对消息消费之前做一些统一处理,可以在该方法中实现。
- onCommit:该方法会在 Kafka 提交 offset 之后调用,通常可以在该方法中进行一些日志记录等。
- close:拦截器关闭前的处理。
- configure:初始化配置。
package org.apache.kafka.clients.consumer;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
void close();
}
自定义实现 Kafka 生产者拦截器
Kafka 给我们提供了生产者拦截器接口,现在我们自己来实现一个 Kafka 生产者拦截器,自定义 Kafka 生产者拦截器代码如下:
package com.order.service.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* @ClassName: MyKafkaProducerInterceptor
* @Author: Author
* @Date: 2024/10/31 11:11
* @Description:
*/
@Slf4j
public class MyKafkaProducerInterceptor implements ProducerInterceptor<Object, Object> {
//消息发送前的确认
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> producerRecord) {
log.info("消息发送前操作");
return producerRecord;
}
//消息确认
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
log.info("消费发送后的回调");
}
//拦截器关闭后的操作
@Override
public void close() {
}
//初始化相关操作
@Override
public void configure(Map<String, ?> map) {
}
}
在实现 Kafka 生产者拦截器的代码中,我这里只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。
配置 Kafka 生产者拦截器
我们自定义了 Kafka 生产者拦截器,要想自定义的生产者拦截器生效,我们还需要配置该拦截器,核心代码如下:
//自定义生产者消息拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaProducerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
可以看到我们传入了一个 List 对象,也印证了前面说的 Kafka 支持一个拦截器链。
本案例完整的生产者配置代码如下:
package com.order.service.config;
import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import com.order.service.interceptor.MyKafkaProducerInterceptor;
import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author :author
* @description:
* @modified By:
* @version: V1.0
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.producer.batch-size}")
private String batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Value("${spring.kafka.producer.properties.linger.ms}")
private String lingerMs;
public Map<String, Object> getMyKafkaProps() {
Map<String, Object> props = new HashMap<>(10);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//批量发送消息的大小 默认 16KB
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数 默认 32M
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//批量发送的的最大时间间隔,单位是毫秒
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
//自定义分区器配置
//props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
//自定义生产者消息拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaProducerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
return props;
}
@Bean
public ProducerFactory<String, String> newProducerFactory() {
return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(newProducerFactory());
}
}
自定义实现 Kafka 消费者拦截器
Kafka 给我们提供了消费者拦截器接口,现在我们自己来实现一个 Kafka 消费者拦截器,自定义 Kafka 消费者拦截器代码如下:
package com.order.service.interceptor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.Map;
/**
* @ClassName: MyKafkaConsumerInterceptor
* @Author: Author
* @Date: 2024/10/31 11:11
* @Description:
*/
@Slf4j
//@Component
public class MyKafkaConsumerInterceptor implements ConsumerInterceptor<Object, Object> {
//消息消费前的处理
@Override
public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
log.info("消费前处理");
return consumerRecords;
}
//拦截器关闭前的处理
@Override
public void close() {
log.info("拦截器关闭前的处理");
}
//Kafka 提交 offset 前的处理
@Override
public void onCommit(Map map) {
log.info("消息消费提交offset");
}
//初始化配置
@Override
public void configure(Map<String, ?> map) {
log.info("拦截器初始化配置");
}
}
同样在实现 Kafka 消费者拦截器的代码中,我这里同样只是记录了日志,真正在项目中使用的时候,根据自己的业务需求增加业务逻辑即可。
配置 Kafka 消费者拦截器
我们自定义了 Kafka 消费者拦截器,要想自定义的消费者拦截器生效,我们同样也还需要配置该拦截器,核心代码如下:
//添加自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaConsumerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
可以看到我们传入了一个 List 对象,同样也印证了前面说的 Kafka 支持一个拦截器链。
本案例完整的消费者配置代码如下:
package com.order.service.config;
import com.order.service.interceptor.MyKafkaConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;
import java.util.*;
/**
* @author :author
* @description:
* @modified By:
* @version: V1.0
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String autoCommitIntervalMs;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
public Map<String, Object> getMyKafkaProps() {
Map<String, Object> props = new HashMap<>(12);
//是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//kafak 服务器
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
//消费组id
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//一次调用poll()操作时返回的最大记录数,默认值为500
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//自动提交时间间隔 默认 5秒
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
//添加自定义拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add(MyKafkaConsumerInterceptor.class.getName());
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
return props;
}
/**
* @return org.springframework.kafka.config.KafkaListenerContainerFactory<org.springframework.kafka.listener.ConcurrentMessageListenerContainer < java.lang.String, java.lang.String>>
* @date 2024/10/22 19:41
* @description kafka 消费者工厂
*/
@Bean("myContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));
// 并发创建的消费者数量
factory.setConcurrency(3);
// 开启批处理
factory.setBatchListener(true);
//拉取超时时间
factory.getContainerProperties().setPollTimeout(1500);
//是否自动提交 ACK kafka 默认是自动提交
if (!enableAutoCommit) {
//共有7中方式
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
}
return factory;
}
/*@Bean
@Primary
public ErrorHandler kafkaErrorHandler(){
ConsumerRecordRecoverer recordRecoverer=new DeadLetterPublishingRecoverer(new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(getMyKafkaProps())));
BackOff backOff=new FixedBackOff(10,3L);
return new SeekToCurrentErrorHandler(recordRecoverer, backOff);
}*/
@Bean
@Primary
public BatchErrorHandler kafkaBatchErrorHandler() {
// 创建 SeekToCurrentBatchErrorHandler 对象
SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
// 创建 FixedBackOff 对象
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
batchErrorHandler.setBackOff(backOff);
// 返回
return batchErrorHandler;
}
}
Kafka 自定义拦截器结果验证
Kafka 自定义拦截器结果验证之生产者代码
前面已经多次分享了 Kafka 的生产者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:
package com.order.service.kafka.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @ClassName: ManualKafkaProducer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description: 手动ACK消息生产者
*/
@Slf4j
@Component
public class ManualKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//同步发送消息
public void sendManualMessage(String message) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(new Date());
//同步发送消息
try {
kafkaTemplate.send("manual-ack-topic", message).get();
} catch (Exception e) {
e.printStackTrace();
}
log.info("Manual ACK 消息生产者完成消息发送,当前时间:{}", dateStr);
}
}
Kafka 自定义拦截器结果验证之消费者代码
前面已经多次分享了 Kafka 的消费者代码了,这里用来做 Kafka 拦截器验证的案例代码如下:
package com.order.service.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* @ClassName: ManualAckKafkaConsumer
* @Author: Author
* @Date: 2024/10/22 19:22
* @Description: 手动 ACK 消息消费
*/
@Slf4j
@Component
public class ManualAckKafkaConsumer {
@KafkaListener(id = "my-kafka-manual-consumer",
groupId = "my-kafka-consumer-manual-groupId-01",
topics = "manual-ack-topic",
containerFactory = "myContainerFactory")
public void listen(String message, Acknowledgment acknowledgment) {
log.info("Manual ACK 消息消费成功,消息内容:{}", message);
//手动提交 ACK
acknowledgment.acknowledge();
}
}
Kafka 自定义拦截器结果验证
我们出发消息发送消费控制台得到如下日志:
2024-11-02 19:02:53.343 INFO 38324 --- [nio-8086-exec-5] c.o.s.i.MyKafkaProducerInterceptor : 消息发送前操作
2024-11-02 19:02:53.449 INFO 38324 --- [ad | producer-1] c.o.s.i.MyKafkaProducerInterceptor : 消费发送后的回调
2024-11-02 19:02:53.449 INFO 38324 --- [nio-8086-exec-5] c.o.s.k.producer.ManualKafkaProducer : Manual ACK 消息生产者完成消息发送,当前时间:2024-11-02 19:02:53
2024-11-02 17:02:53.450 INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor : 消费前处理
2024-11-02 19:02:53.451 INFO 38324 --- [-consumer-0-C-1] c.o.s.k.consumer.ManualAckKafkaConsumer : Manual ACK 消息消费成功,消息内容:我是一条同步消息
2024-11-02 19:02:53.456 INFO 38324 --- [-consumer-0-C-1] c.o.s.i.MyKafkaConsumerInterceptor : 消息消费提交offset
根据控制台的日志结果来看,结果符合预期。
总结:本篇我们分享了 Kafka 的拦截器的相关操作,有了拦截器我们可以对 Kafka 的消息生产消费进行统一处理,可以让我们的业务更灵活,代码逻辑更严谨,希望可以帮助到有需要的小伙伴。
如有不正确的地方欢迎各位指出纠正。
标签:拦截器,自定义,springframework,kafka,org,import,Kafka From: https://blog.csdn.net/weixin_42118323/article/details/143415916