首页 > 其他分享 >keycloak~EventListenerProvider初始化kafka引出的类加载问题

keycloak~EventListenerProvider初始化kafka引出的类加载问题

时间:2023-07-19 17:24:50浏览次数:36  
标签:ConsumerConfig EventListenerProvider kafka kafkaProperties put 加载 CONFIG keycloa

EventListenerProvider初始

keycloak提供的事件处理机制,可以通过实现EventListenerProvider接口来实现自定义的事件处理逻辑。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProvider实现类,并将其注册到keycloak的事件处理机制中。

  • 构造方法,在每个keycloak后台操作时,它都会重新构建实例
  • OnEvent方法,在事件发生时执行,不会出现类加载问题,因为这样类已经被加载了

EventListenerProviderFactory

EventListenerProviderFactory是进行事件处理器的生产工厂,用于创建EventListenerProvider实例。在keycloak启动时,会通过ServiceLoader机制加载所有的EventListenerProviderFactory实现类,并将其注册到keycloak的事件处理机制中。

  • init方法:keycloak启动时会执行,用于初始化EventListenerProviderFactory实例,可以在此方法中进行一些初始化操作。
  • postInit方法:keycloak启动时会执行,在init方法之后,会执行这个方法
  • create方法:在kc后台开启这个EventListenerProviderFactory之后,每次请求都会执行这个create方法,对于它生产的provider对象,可能考虑使用单例的方式, 避免每次请求都创建一个新的对象
  • close方法:在keycloak程序关闭后或者当前事件被注册时,这个方法才会执行

问题

  • 问题描述:在EventListenerProviderFactory的init方法中,通过kafka发送消息,会出现类加载问题,因为在keycloak启动时,kafka的类的加载器还没有被加载,所以会出现类加载问题。
  • 解决:需要将类加载器这块,修改成当前类加载器去加载对应的文件,如下代码解决了类无法加载的问题
  @Override
  public void postInit(KeycloakSessionFactory keycloakSessionFactory) {
    try {
      this.executorService = Executors.newFixedThreadPool(2);
      Properties kafkaProperties = new Properties();
      kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          ConfigFactory.getInstance().getStrPropertyValue("kafka.host"));
      kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "kc-ListenerProviderFactory");
      kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
      kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
      kafkaProperties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
      kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
      kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
      kafkaProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
      // 需要使用当前类加载器,否则会出现无法加载StringDeserializer的情况
      Class<?> stringDeserializerClass =
          getClass().getClassLoader().loadClass("org.apache.kafka.common.serialization.StringDeserializer");
      kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
      kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, stringDeserializerClass);
      this.kafkaConsumerAdd = new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_add");
      executorService.submit(kafkaConsumerAdd);
      this.kafkaConsumerRemove=new KcKafkaConsumer(keycloakSessionFactory,kafkaProperties, "black_list_remove");
      executorService.submit(kafkaConsumerRemove);
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e);
    }
  }
  • 对于kafka-clients实现消费者的话,代码还是比较简单的
public class KcKafkaConsumer implements Runnable {
  private static final Logger logger = Logger.getLogger(ConfigFactory.class);
  private final AtomicBoolean closed = new AtomicBoolean(false);
  private final KeycloakSessionFactory keycloakSessionFactory;
  private KafkaConsumer<String, String> kafkaConsumer;

  public KcKafkaConsumer(KeycloakSessionFactory keycloakSessionFactory, Properties properties, String topic)
      throws ClassNotFoundException {
    this.keycloakSessionFactory = keycloakSessionFactory;
    this.kafkaConsumer = new KafkaConsumer<>(properties);
    this.kafkaConsumer.subscribe(Collections.singleton(topic));
  }

@Override
  public void run() {
    try {

      while (!closed.get()) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        // 处理Kafka消息
        for (ConsumerRecord<String, String> record : records) {
          System.out.println("Topic:" + record.topic() + ",Received message: " + record.value());
          //TODO: 处理Kafka消息的具体逻辑
        }
      }
    } finally {
      kafkaConsumer.close();
    }
  }
  public void shutdown() {
    closed.set(true);
    kafkaConsumer.close();
  }
}

标签:ConsumerConfig,EventListenerProvider,kafka,kafkaProperties,put,加载,CONFIG,keycloa
From: https://www.cnblogs.com/lori/p/17566206.html

相关文章

  • springcloud - kafka实践
    springcloud可以通过KafkaTemplate来发布消息,让后消费者使用来订阅@KafkaListener主题消息。一、添加依赖1<dependencyManagement>2<dependencies>3<dependency>4<groupId>org.springframework.cloud</groupId>5<artifactId&g......
  • docker kafka-manger
    实现"DockerKafkaManager"的过程及代码解释:整个过程可以分为以下几个步骤:步骤描述步骤一安装Docker步骤二下载KafkaManager镜像步骤三创建并启动KafkaManager容器步骤四配置Kafka集群连接下面是每个步骤具体需要做的事情以及相应的代码:步骤一......
  • springboot - kafka实践
    Kafka是一个开源的分布式流处理平台,由Apache软件基金会开发和维护。它是一种高性能、可持久化、可扩展的消息队列系统,常用于解决大规模数据传输和处理的问题。以下是Kafka的一些核心概念和主要特点:消息和主题:Kafka基于发布订阅模式,消息被发布到一个或多个主题(Topic)中。每条消......
  • Kafka 集群参数配置介绍
    目录Broker端存储信息相关参数ZooKeeper相关参数Topic相关参数数据存留相关参数Topic级别参数保存消息JVM参数操作系统参数文件描述符限制文件系统类型Swappiness提交时间Broker端Broker端参数也被称为静态参数(StaticConfigs),必须在Kafka的配置文件server.properties......
  • kafka-es.go
    packageesimport("context""encoding/json""fmt""kafka/mongo""log""os""github.com/olivere/elastic")varclient*elastic.Clientvarhost="http://192.168.184.10:9200"......
  • 2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?
    2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?答案2023-07-16:什么是零拷贝?零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。➢零拷贝技术可以减少数据......
  • 2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?
    2023-07-16:讲一讲Kafka与RocketMQ中零拷贝技术的运用?答案2023-07-16:什么是零拷贝?零拷贝(英语:Zero-copy)技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域。这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。➢零拷贝技术可以减少数据拷贝和......
  • 2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?
    2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?答案2023-07-14:在Kafka中,文件的布局采用了Topic/Partition的方式,每个分区对应一个物理文件夹,且在分区文件级别上实现了顺序写入。然而,当一个Kafka集群拥有大量的主题和每个主题拥有数百个分区时,在高并发写入消息的情况下,IO操作......
  • 2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?
    2023-07-14:讲一讲Kafka与RocketMQ中存储设计的异同?答案2023-07-14:在Kafka中,文件的布局采用了Topic/Partition的方式,每个分区对应一个物理文件夹,且在分区文件级别上实现了顺序写入。然而,当一个Kafka集群拥有大量的主题和每个主题拥有数百个分区时,在高并发写入消息的情况下,IO操作会变......
  • 同步同一个Kafka Topic的不同消费者
    分布式系统中的同步很困难。您可能的目标是尽可能多地防止它。但有时业务需求需要协调对数据新鲜度有严重依赖的不同服务。为了概括起见,假设架构由_Service-A_、_Service-B_和_Service-C_组成。它们都使用来自同一个Kafka主题的消息,但显然根据各自的业务逻辑、API和SLA对......