首页 > 其他分享 >kafka 拦截器

kafka 拦截器

时间:2023-02-25 12:22:14浏览次数:32  
标签:拦截器 kafka record 消息 ProducerRecord public

生产者拦截器

拦截器(Interceptor)是早在 Kafka 0.10.0.0 中就已经引入的一个功能,Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现 org.apache.kafka.clients.producer. ProducerInterceptor 接口。ProducerInterceptor 接口中包含3个方法:

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();

KafkaProducer 在将消息序列化和计算分区之前会调用生产者拦截器的 onSend() 方法来对消息进行相应的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、key 和 partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩(Log Compaction)的功能。

KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement() 方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的I/O线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close() 方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

ProducerInterceptor 接口与 Partitioner 接口一样,它也有一个同样的父接口 Configurable,具体的内容可以参见 Partitioner 接口的相关介绍。

下面通过一个示例来演示生产者拦截器的具体用法,ProducerInterceptorPrefix 中通过 onSend() 方法来为每条消息添加一个前缀“prefix1-”,并且通过 onAcknowledgement() 方法来计算发送消息的成功率。

//代码清单4-5生产者拦截器示例
public class ProducerInterceptorPrefix implements 
        ProducerInterceptor<String,String>{
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(
            ProducerRecord<String, String> record) {
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.topic(), 
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(
            RecordMetadata recordMetadata, 
            Exception e) {
        if (e == null) {
            sendSuccess++;
        } else {
            sendFailure ++;
        }
    }

    @Override
    public void close() {
        double successRatio = (double)sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 发送成功率="
                + String.format("%f", successRatio * 100) + "%");
    }

    @Override
    public void configure(Map<String, ?> map) {}
}

实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数 interceptor.classes 中指定这个拦截器,此参数的默认值为“”。示例如下:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptorPrefix.class.getName());

然后使用指定了 ProducerInterceptorPrefix 的生产者连续发送10条内容为“kafka”的消息,在发送完之后客户端打印出如下信息:

[INFO] 发送成功率=100.000000%

如果消费这10条消息,会发现消费了的消息都变成了“prefix1-kafka”,而不是原来的“kafka”。

KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。下面我们再添加一个自定义拦截器 ProducerInterceptorPrefixPlus,它只实现了 Interceptor 接口中的 onSend() 方法,主要用来为每条消息添加另一个前缀“prefix2-”,具体实现如下:

public ProducerRecord<String, String> onSend(
        ProducerRecord<String, String> record) {
    String modifiedValue = "prefix2-"+record.value() ;
    return new ProducerRecord<>(record.topic(),
            record.partition(), record.timestamp(),
            record.key(), modifiedValue, record.headers());
}

接着修改生产者的 interceptor.classes 配置,具体实现如下:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptorPrefix.class.getName() + ","
                + ProducerInterceptorPrefixPlus.class.getName());

此时生产者再连续发送10条内容为“kafka”的消息,那么最终消费者消费到的是10条内容为“prefix2-prefix1-kafka”的消息。如果将 interceptor.classes 配置中的两个拦截器的位置互换:

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptorPrefixPlus.class.getName() + ","
                + ProducerInterceptorPrefix.class.getName());

那么最终消费者消费到的消息为“prefix1-prefix2-kafka”。

如果拦截链中的某个拦截器的执行需要依赖于前一个拦截器的输出,那么就有可能产生“副作用”。设想一下,如果前一个拦截器由于异常而执行失败,那么这个拦截器也就跟着无法继续执行。在拦截链中,如果某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

标签:拦截器,kafka,record,消息,ProducerRecord,public
From: https://www.cnblogs.com/fxh0707/p/17154120.html

相关文章

  • Spring for Apache Kafka: @KafkaListener 的使用示例(消费)
    版本Version2.7.8-- 阅读Version2.7.8的SpringforApacheKafka官方文档,检出其中的注解@KafkaListener的使用方式。关键词:ConsumerRecord、Message、......
  • Kafka 分区器
    分区器消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker......
  • SpringBoot30 - 整合Kafka
    SpringBoot整合Kafka安装​ windows版安装包下载地址:https://kafka.apache.org/downloads​ 下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下......
  • 拦截器
    拦截器机制前端发送请求,通过控制器完成定义好的方法,在将数据渲染到前端。拦截器分为三个方法分别是preHandle、postHandle、afterCompletion,我们可以分别利用这三个方法在......
  • 文件上传与拦截器
    静态资源访问静态资源访问,默认在根路径下加上资源名称即可访问;也可以在配置文件中,自定义访问路径spring.mvc.static-path-pattern=/<自定义路径>/**以上是默认static目录......
  • 一文读懂Kafka Connect核心概念
    概览KafkaConnect是一种用于在ApacheKafka和其他系统之间可扩展且可靠地流式传输数据的工具。它使快速定义将大量数据移入和移出Kafka的连接器变得简单。KafkaC......
  • Kafka 序列化
    序列化生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成......
  • Kafka 上手实战
    参数配置bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以......
  • 想完全弄懂kafka?看这篇就够了
    有人说世界上有三个伟大的发明:火,轮子,以及Kafka。发展到现在,ApacheKafka无疑是很成功的,Confluent公司曾表示世界五百强中有三分之一的企业在使用Kafka。今天便和大家分......
  • kafka常用操作
    如果把一个项目/微服务当成一个消费组,那么一个topic可能在多个消费组【一个topic被多个项目订阅】,一个消费组可能有多个topic。【一个项目订阅了多个topic】。一个消费组内......