首页 > 其他分享 >基于 SASL/SCRAM 让 Kafka 实现动态授权认证

基于 SASL/SCRAM 让 Kafka 实现动态授权认证

时间:2024-07-29 09:29:04浏览次数:18  
标签:-- kafka admin SCRAM SASL Kafka config

一、说明

在大数据处理和分析中 Apache Kafka 已经成为了一个核心组件。然而在生产环境中部署 Kafka 时,安全性是一个必须要考虑的重要因素。SASL(简单认证与安全层)和 SCRAM(基于密码的认证机制的盐化挑战响应认证机制)提供了一种方法来增强 Kafka 集群的安全性。

本文将从零开始部署 ZooKeeperKafka 并通过配置 SASL/SCRAMACL(访问控制列表)来增强 Kafka 的安全性。

 

二、Kafka 的安全机制

kafka 社区在 0.9.0.0 版本正式添加了安全特性,可以满足各种安全性的要求,包括:

  1. Kafka 与 ZooKeeper 之间的安全通信;
  2. Kafka 集群 ZooKeeper 之间的安全通信;
  3. 客户端与服务端之间的安全通信;
  4. 消息级别的权限控制,可以控制客户端(生产者或消费者)的读写操作权限。

 

认证方式 引入版本 适用场景
SSL 0.9.0 SSL做信道加密比较多,SSL认证不如SASL所以一般都会使用SSL来做通信加密。
SASL/GSSAPI 0.9.9 主要是给 Kerberos 使用的。如果你的公司已经做了 Kerberos 认证(比如使用 Active Directory),那么使用 GSSAPI 是最方便的了。因为你不需要额外地搭建 Kerberos,只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。
SASL/PLAIN 0.10.2 简单的用户名密码认证,通常与SSL结合使用,对于小公司来说,没必要搭建公司级别的Kerberos,使用它就比较合适。
SASL/SCRAM 0.10.2 PLAIN的加强版本,支持动态的用户增减。
Deleation Token 1.1 Delegation Token 是在 1.1 版本引入的,它是一种轻量级的认证机制,主要目的是补充现有的 SASL 或 SSL 认证。如果要使用 Delegation Token,你需要先配置好 SASL 认证,然后再利用 Kafka 提供的 API 去获取对应的 Delegation Token。这样 Broker 和客户端在做认证的时候,可以直接使用这个 token,不用每次都去 KDC 获取对应的 ticket(Kerberos 认证)或传输 Keystore 文件(SSL 认证)。
SASL/OAUTHBEARER 2.0 OAuth 2框架的集成。

 

三、环境和软件准备

Apache Kafka 官网 下载对应版本的 Kafka 并解压到你选择的目录。

确保已经安装 Java 才能运行 Kafka,可以通过运行 java -version 来检查 Java 环境。

 

四、部署 Zookeeper

使用 Kafka 内置的 Zookeeper

4.1. 启用 SASL 认证

进入 config 目录,修改 zookeeper.properties 配置文件增加以下内容:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

 

4.2. 配置 JAAS

在 config 目录下创建 zk_jaas.conf 文件,内容如下:

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_zkclient="zkclient";
};

其作用是创建了一个 Server 节点,其中

  • org.apache.zookeeper.server.auth.DigestLoginModule required 是认证逻辑的处理类;
  • username、password 是zookeeper之间通讯的用户名和密码;
  • user_admin="admin" 的结构是 user_[username]=[password] 定义 kafka-broker(zookeeper客户端)连接到 zookeeper 时用的用户名和密码。

注意:Server 内部最后一行的 ; 和 } 后的 ; 不能缺少!

 

4.3. 修改启动文件

进入 bin 目录,修改 zookeeper-server-start.sh 文件;

export KAFKA_HEAP_OPTS= 配置项的参数后添加 JAAS 的配置:

export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M -Djava.security.auth.login.config=../config/zk_jaas.conf"

 

4.4. 启动 Zookeeper

执行命令:./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

-daemon 参数配置后台运行

 

4.5. 测试

可以从官网 Apache ZooKeeper 下载对应版本的 ZooKeeper 并解压;

添加 JAAS 配置,在 confi 目录下创建 zk_client_jaas.conf 文件:

Client{
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zkclient"
    password="zkclient";
};

修改 bin 目录下的 zkCli.sh 文件,在启动命令中增加 JAAS 的配置:

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" "-Dzookeeper.log.file=${ZOO_LOG_FILE}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
     "-Djava.security.auth.login.config=../conf/zk_client_jaas.conf" \
     org.apache.zookeeper.ZooKeeperMain "$@"

执行 zkCli.sh 连接本机已经启动好的 ZooKeeper

进入 Kafka 的 log 目录,查看内置 zk 的日志 zookeeper.out 显示以下内容:

INFO adding SASL authorization for authorizationID: zkclient (org.apache.zookeeper.server.ZooKeeperServer)

代表 ZooKeeper 的 SASL 认证已经配置成功。

 

五、部署 Kafka

5.1. 配置 Kafka Broker

进入 config 目录,修改 server.properties 配置文件增加以下内容:

listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
  • authorizer.class.name 开启 ACL 授权机制并指定实现类;
  • allow.everyone.if.no.acl.found 如果没有找到ACL(访问控制列表)配置,是否允许任何操作;这里设置为 false 指除了超级管理员,其他用户必须配置 ACL 才能访问资源;
  • super.users 超级管理员,无需配置 ACL 拥有所有权限的用户。

 

5.2. 配置 JAAS

在 config 目录下创建 kafka_server_jaas.conf 文件,内容如下:

KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="admin";
};

Client{
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zkclient"
    password="zkclient";
};
  • KafkaServer 中的 username,password 用于 Kafka 集群 Broker 节点之间通信用的账号密码;
  • KafkaServer 中的 user_test="test" 用于 Kafka 客户端(producer,consumer)连接broker时,用该配置下user_[username]=[password]结构配置的账号密码登录;
  • Client 用于 broker 和 zookeeper 之间的认证,对应 zk_jaas.conf 中的 【user_zkclient="zkclient"】 配置;
  • user_admin="admin" 的结构是 user_[username]=[password] 定义 kafka-broker(zookeeper客户端)连接到 zookeeper 时用的用户名和密码。

 

5.3. 修改启动文件

进入 bin 目录,修改 kafka-server-start.sh 文件;

export KAFKA_HEAP_OPTS= 配置项的参数后添加 JAAS 的配置:

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=../config/kafka_server_jaas.conf"

 

5.4. 创建 SCRAM 用户

在启动 Kafka 之前需要先创建好用户,在 bin 目录下执行以下内容:

分别创建 admin(超级管理员) 和 test(客户端用户)

./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin

./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=test]' --entity-type users --entity-name test

SASL/SCRAM 认证的用户信息是动态创建存储在 ZooKeeper 中, 由于上面的配置 kafka_server_jaas.conf 中 Broker 之间的通信是通过 admin 用户的,如果该用户不存在会 启动报错

 

5.5. 启动 Kafka

执行命令:./kafka-server-start.sh -daemon ../config/server.properties

-daemon 参数配置后台运行

 

六、验证 SASL/SCRAM 鉴权

6.1. 客户端认证配置

6.1.1. 管理员配置

进入 config 目录创建 cmd.properties 内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

配置认证的类型以及登录逻辑的处理类和用户,使用超级管理员 admin

注意 最后的 ; 是必须加上的。

6.1.2. 生产者配置

修改 config 目录下的 producer.properties 增加以下内容:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";

生产者也使用超级管理员 admin 来发送消息。

6.1.3. 消费者配置

修改 config 目录下的 consumer.properties 增加以下内容:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";

消费者使用 test 用户来接收消息。

 

6.2. 创建topic

在 bin 目录下执行以下命令:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1 --replication-factor 1 --command-config ../config/cmd.properties
  • bootstrap-server 配置 Kafka 服务端的地址
  • topic 指定topic名称
  • command-config 指定命令的认证配置,这里使用上面创建的 管理员配置

创建成功后可以通过以下命令查看存在的 topic 清单:

./kafka-topics.sh --bootstrap-server localhost:9092 --list --command-config ../config/cmd.properties

 

6.3. 创建消费者

6.3.1. 执行 kafka-console-consumer

在 bin 目录下执行以下命令:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties

执行命令后会发现以下 报错 信息:

ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256
Processed a total of 0 messages

Authentication failed 认证失败,由于消费者的认证使用的是 test 用户,而该用户还未配置任何 ACL 权限。

6.3.2. 配置用户 ACL 权限

Kafka 的 ACL (Access Control Lists) 允许你定义哪些用户可以访问哪些主题,并且可以执行哪些操作(如读、写、创建、删除等)。

执行以下命令:

./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --operation Read --topic test --group test-consumer-group

为 test 用户在资源 topic[test] 下分配只读权限

执行成功,可以通过以下命令查看资源所分配的所有 ACL 清单:

./kafka-acls.sh --bootstrap-server localhost:9092 --topic test --list --command-config ../config/cmd.properties

重新创建消费者:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config ../config/consumer.properties

执行成功后该 shell 窗口会一直阻塞等待消息。

 

6.4. 创建生产者

新开一个 shell 窗口 在 bin 目录下执行以下命令:

./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config ../config/producer.properties

由于生产者的认证使用的是 admin 为 超级管理员 所以无需配置 ACL 权限。

执行成功后会出现 > 符号,输入内容之后,切换到 消费者 窗口就可以看到了。

标签:--,kafka,admin,SCRAM,SASL,Kafka,config
From: https://www.cnblogs.com/zlt2000/p/18329394

相关文章

  • kafka 设置消费者多线程参数说明
    一、设置消费者多线程 参数privatestaticfinalintCONSUMER_THREAD_NUM=1;//订阅topicMap<String,Integer>topicCountMap=Collections.singletonMap(topic,CONSUMER_THREAD_NUM);Map<String,List<KafkaStream<byte[],byte[]>>>messageStr......
  • Java Kafka生产消费测试类
    JavaKafka生产消费测试类:生产者:packagetest.kafkatest;importjava.text.SimpleDateFormat;importjava.util.Properties;importjava.util.Random;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig......
  • kafka 基本架构与性能调优
    目录基本架构一、核心组件二、工作流程三、特点与优势broker的架构一、KafkaBroker的基本概念二、KafkaBroker的网络架构特点三、KafkaBroker的网络架构组成四、KafkaBroker的网络通信流程broker性能为什么那么好1.高吞吐量2.低延迟3.高并发性4.持久性5.可扩展性使用踩......
  • Kafka介绍及安装部署
    本节内容:消息中间件消息中间件特点消息中间件的传递模型Kafka介绍安装部署Kafka集群安装Yahookafkamanagerkafka-manager添加kafkacluster 一、消息中间件消息中间件是在消息的传输过程中保存消息的容器。消息中间件在将消息从消息生产者到消费者时充当中间人的......
  • flume采集数据到kafka脚本编写
    3.2.1数据采集思路:a)配置kafka,启动zookeeper和kafka集群;b)创建kafka主题;c)启动kafka控制台消费者(此消费者只用于测试使用);d)配置flume,监控日志文件;e)启动flume监控任务;f)运行日志生产脚本;g)观察测试。1)启动zookeeper,kafka集群$/opt/module/kafka/bin/kafka-se......
  • 卡夫卡(Kafka)框架详解:从背景到应用实践
    卡夫卡(Kafka)框架详解:从背景到应用实践引言        在大数据和分布式系统日益普及的今天,数据处理和消息传递成为了支撑复杂业务系统的关键基础设施。ApacheKafka,作为一个高性能的分布式消息队列系统,因其高吞吐量、低延迟和可扩展性,成为了众多企业和开发者首选的消息......
  • Apache Kafka 使用详解
    文章目录引言官网链接Kafka原理核心概念工作原理基础使用安装与启动生产者示例消费者示例高级使用KafkaStreams示例:单词计数KafkaConnect示例:使用KafkaConnect将MySQL数据导入Kafka优缺点优点缺点结论引言ApacheKafka是一个分布式流处理平台,由LinkedIn......
  • kafka 基础知识
    1、Kafka简介ApacheKafka是由Apache开发的一种发布订阅消息系统。Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感......
  • docker-compose部署kafka-ui部署以及使用
    1.docker-compose配置脚本version:"3"services:kafka-ui:image:provectuslabs/kafka-ui:v0.7.2container_name:kafka-uihostname:kafka-uiprivileged:truerestart:alwaysenvironment:-DYNAMIC_CONFIG_ENABLED=true......
  • Docker-Compose配置zookeeper+KaFka+CMAK简单集群
    1. 本地DNS解析管理#编辑hosts文件sudonano/etc/hosts#添加以下三个主机IP192.168.186.77zoo1k1192.168.186.18zoo2k2192.168.186.216zoo3k3注:zoo1是192.168.186.77的别名,zoo2是192.168.186.18的别名,zoo3是192.168.186.126的别名,IP自行修改即可,其他配置可以......