首页 > 其他分享 >kafka拦截器

kafka拦截器

时间:2023-01-31 23:36:22浏览次数:38  
标签:拦截器 void System kafka Override import public

1. 简单介绍

  kafka拦截器用于生产者和消费者对统一对消息做处理。且可以设置多个拦截器,用于链式调用。

  生产者拦截器可以用于生产消息前做处理,消费者可以用于消费消息前做处理。

2. 简单使用

1. 生产者

package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CounterInterceptor implements ProducerInterceptor<String, String>  {

    private int errorCounter = 0;

    private int successCounter = 0;


    /**
     *  发送消息回调
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    /**
     *  发送完成回调
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        // 统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }

    /**
     * 关闭方法
     */
    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }

    /**
     * 配置完成方法
     */
    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("111222");
    }
}

===
package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                System.currentTimeMillis() + "," + record.value().toString());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

对生产者设置拦截器

        // 2. 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("cn.qz.cloud.kafka.interceptor.TimeInterceptor");
        interceptors.add("cn.qz.cloud.kafka.interceptor.CounterInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

2. 消费者

消费者实现另一个接口,设置方法同上:

package cn.qz.cloud.kafka.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

public class MyConsumerInterceptor implements ConsumerInterceptor {
    @Override
    public ConsumerRecords onConsume(ConsumerRecords records) {
        System.out.println("MyConsumerInterceptor===1===" + records.count());
        return records;
    }

    @Override
    public void close() {
//        System.out.println("MyConsumerInterceptor===2");
    }

    @Override
    public void onCommit(Map offsets) {
//        System.out.println("MyConsumerInterceptor===3");
    }

    @Override
    public void configure(Map<String, ?> configs) {
//        System.out.println("MyConsumerInterceptor===4");
    }
}

设置拦截器

        // 2. 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("cn.qz.cloud.kafka.interceptor.MyConsumerInterceptor");
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors)

 

标签:拦截器,void,System,kafka,Override,import,public
From: https://www.cnblogs.com/qlqwjy/p/17081161.html

相关文章

  • 【转载】【SSM】SpringBoot 统一功能处理,(*Spring 拦截器实现与原理)
    ✨1.用户登录权限效验1.1最初用户登录验证1.2SpringAOP用户统一登录验证的问题1.3Spring拦截器1. 自定义拦截器2.将自定义拦截器加入到系统配置1.4拦截器实......
  • .net core 下使用 Kafka 生产者批量发送给消息处理,使用事务(四)
    生产者批量发送消息,使用事务,要么全部失败要么全部成功重要说明事物id必须要设置producerConfig.TransactionalId=Guid.NewGuid().ToString();//必须设置事物id 1......
  • Rabbitmq 与kafka
    Rabbitmq比kafka可靠,kafka更适合IO高吞吐的处理,比如ELK日志收集Kafka和RabbitMq一样是通用意图消息代理,他们都是以分布式部署为目的。但是他们对消息语义模型的定义的假设......
  • 1.5万字长文:从 C# 入门 Kafka
    1.5万字长文:从C#入门Kafka  目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Ka......
  • k8s~fluentd从kafka到elk
    有时为了日志解耦,通常不把日志打到文件,而是直接打到kafka,而为了分析日志,我们可以通过sidecar的方式,把日志从kafka写入到es里,而通过kibana对日志进行分析。我的k8s-fluentd......
  • 1.5万字长文:从 C# 入门 Kafka
    目录1,搭建Kafka环境安装docker-compose单节点Kafka的部署Kafka集群的部署2,Kafka概念基本概念关于Kafka脚本工具主题管理使用C#创建分区分区与复制生产者消......
  • Apache Kafka 的基本概念
    ​​基本概念​​主题Topictopic是Kafka最基础的组织单位,类似于关系数据库中的数据表。做为使用kafka的开发者,你最应该考虑的是和topoc相关的抽象。创建不同的topi......
  • ubuntu下安装kafka集群connector
    1.首先安装kafka集群,安装步骤参考链接如下:2.创建安装connector安装目录mkdir-p/kafka/kafka-1/kafka_2.12-2.2.1/connector-pluginmkdir-p/kafka/kafka-2/kafka_2.12......
  • ubuntu下安装zookeeper和kafka伪集群
    1.创建目录mkdir-p/zookeeper/zkp-1/zookeeper/zkp-2/zookeeper/zkp-32.下载zookeeper链接地址:​​​http://mirror.bit.edu.cn/apache/zookeeper/​​​命令下载:c......
  • spring boot——请求与参数校验——重要概念——拦截器——华章
    拦截器注册:packageorg.example.interceptor_hz.config;importorg.example.interceptor_hz.interceptor.FirstHandlerInterceptor;importorg.example.interceptor_h......