文章目录
官方资料
https://kafka.apache.org/documentation/#security
概述
Kafka的SASL-SSL配置主要用于保护集群的网络传输安全,确保客户端与服务器端的通信通过加密和认证机制来保证数据的安全性和访问控制.。
Kafka支持多种安全协议,最常见的包括SSL(加密传输)、SASL(认证)和ACL(访问控制列表)。配置SASL-SSL组合可以实现:
- SSL(Secure Sockets Layer):为数据传输提供加密,防止传输过程中的数据泄露和篡改。
- SASL(Simple Authentication and Security Layer):提供身份认证机制。通过配置不同的SASL机制(如PLAIN、SCRAM-SHA-256等),限制只有经过认证的用户才可以访问Kafka。
制作kakfa证书
1.1 openssl 生成CA
openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM
openssl-ca.cnf内容如下
HOME = .
RANDFILE = $ENV::HOME/.rnd
####################################################################
[ ca ]
default_ca = CA_default # 默认的 ca 部分
[ CA_default ]
base_dir = . # 基本目录
certificate = $base_dir/ca/cacert.pem # CA 证书
private_key = $base_dir/ca/cakey.pem # CA 私钥
new_certs_dir = $base_dir # 签名后的新证书位置
database = $base_dir/index/index.txt # 数据库索引文件
serial = $base_dir/serial/serial.txt # 当前序列号
default_days = 1000 # 证书有效期
default_crl_days = 30 # 下一个 CRL 之前的有效期
default_md = sha256 # 使用的公钥摘要算法,默认为 sha256
preserve = no # 不保留传递的 DN 排序
x509_extensions = ca_extensions # 添加到证书的扩展
email_in_dn = no # 在 DN 中不包含电子邮件
copy_extensions = copy # 从 CSR 复制 SAN 到证书
####################################################################
[ req ]
default_bits = 4096 # 默认密钥位数
default_keyfile = cakey.pem # 默认密钥文件
distinguished_name = ca_distinguished_name # 默认的区分名
x509_extensions = ca_extensions # 默认的 X.509 扩展
string_mask = utf8only # 字符串掩码
####################################################################
[ ca_distinguished_name ]
countryName = Country Name (2 letter code) # 国家/地区名(两个字母代码)
countryName_default = CN # 默认国家/地区
stateOrProvinceName = State or Province Name (full name) # 州/省名(全名)
stateOrProvinceName_default = SD # 默认州/省名
localityName = Locality Name (eg, city) # 地区/城市名
localityName_default = JN # 默认地区/城市名
organizationName = Organization Name (eg, company) # 组织名
organizationName_default = UNKNOWN # 默认组织名
organizationalUnitName = Organizational Unit (eg, division) # 组织单位名
organizationalUnitName_default = UNKNOWN # 默认组织单位名
commonName = Common Name (e.g. server FQDN or YOUR name) # 通用名(例如服务器 FQDN 或您的姓名)
commonName_default = HUC # 默认通用名
emailAddress = Email Address # 电子邮件地址
emailAddress_default = [email protected] # 默认电子邮件地址
####################################################################
[ ca_extensions ]
subjectKeyIdentifier = hash # 主题密钥标识符
authorityKeyIdentifier = keyid:always, issuer # 授权密钥标识符
basicConstraints = critical, CA:true # 基本约束
keyUsage = keyCertSign, cRLSign # 密钥用途
####################################################################
[ signing_policy ]
countryName = optional
stateOrProvinceName = optional
localityName = optional
organizationName = optional
organizationalUnitName = optional
commonName = supplied
emailAddress = optional
####################################################################
[ signing_req ]
subjectKeyIdentifier = hash # 主题密钥标识符
authorityKeyIdentifier = keyid,issuer # 授权密钥标识符
basicConstraints = CA:FALSE # 基本约束
keyUsage = digitalSignature, keyEncipherment # 密钥用途
1.2 生成server端秘钥对以及证书仓库
keytool -genkeypair -keystore serverkeystore.jks -alias KafkaKeyStore -validity 3650 -keyalg RSA -storetype pkcs12
1.3 CA 签名证书
利用服务端秘钥库生成证书请求:
keytool -keystore serverkeystore.jks -alias Kafkakeystore -certreq -file cert-file.csr
CA签名生成签名证书:
openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out kafka-cert-signed.cert -days 3650 -infiles ../cert-file.csr
1.4 服务端秘钥库导入签名证书以及CA根证书
keytool -keystore serverkeystore.jks -alias Kafkakeystore -import -file kafka-cert-signed.cert
keytool -keystore serverkeystore.jks -alias CARoot -import -file ./ca/ca/cacert.pem
1.5 生成服务端信任库并导入CA根数据
keytool -keystore server_truststor.jks -alias CARoot -import -file ./ca/cacert.pem
1.6 生成客户端信任库并导入CA根证书
keytool -keystore client_truststor.jks -alias CARoot -import -file ./ca/cacert.pem
2 配置zookeeper SASL认证
2.1 编写zk_server_jass.conf配置
文件信息如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zk"
password="1qax#EDC5tgb"
user_kafka="234BCDefg#";
};
其中username与password为zk集群之前认证的用户名密码,user_kafka=“234BCDefg#” 代表用户为kafka,密码为234BCDefg#,kafka认证的用户名以及密码,kafka中需要配置
2.2 修改zk启动脚本
添加启动参数如下:
-Djava.security.auth.login.config=zk_server_jass.conf
2.3 修改zoo.cfg 配置文件
添加参数如下:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
2.4 zk添加jar包
拷贝kafka lib目录下kafka-clients-2.3.0-SNAPSHOT.jar、lz4-.jar、slf4j-api-.jar、slf4j-log4j12-.jar、snappy-java-.jar 五个jar包到zk的lib目录下
2.5 启动zk验证服务
3 配置kafka server端
3.1 编写kafka_server_jaas.conf
添加jass配置文件,内容如下:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="234BCDefg#"
user_admin="234BCDefg#"
user_alice="234BCDefg#";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="234BCDefg#";
};
其中username与password 为kafka集群内部认证的用户名与密码,与user_admin或者其他用户保持一致
user_admin="234BCDefg#"与user_alice="234BCDefg#"配置用户,格式为user_用户名=密码
Client 配置zk认证的用户名与密码,必须与zk中配置的保持一致
3.2 修改kafka启动脚本
添加启动参数如下:
-Djava.security.auth.login.config=kafka_server_jaas.conf
3.3 修改server.properties
修改监听协议
listeners=SASL_SSL://:9093
advertised.listeners=SASL_SSL://10.11.106.63:9093
配置SSL
ssl.keystore.location=./ssl/kakfa-server-keystore.jks
ssl.keystore.password=1qaz#EDC5tgb
ssl.key.password=1qaz#EDC5tgb
ssl.truststore.location=./ssl/server_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
配置SASL
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
3.4 启动kafka
验证服务是否正常启动
4 配置kafka client 端
客户端添加参数如下:
指定SASL_SSL协议:
security.protocol=SASL_SSL
SASL认证:
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"234BCDefg#\";
SSL配置:
ssl.truststore.location=./ssl/client_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.endpoint.identification.algorithm=
5 验证
命令行执行:
.\kafka-console-producer.bat --broker-list 127.0.0.1:9093 --topic artisanTest
未指定ssl参数,则server端日志中会出现以下异常:
[2024-05-24 10:20:55,334] INFO [SocketServer brokerId=0] Failed authentication with 127.0.0.1/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
添加SSL 客户端配置文件 client-ssl.properties,内容如下:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"234BCDefg#\";
ssl.truststore.location=./ssl/client_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.endpoint.identification.algorithm=
再次执行命令,添加配置参数:
.\kafka-console-producer.bat --broker-list 127.0.0.1:9093 --topic artisanTest --producer.config client-ssl.properties
可正常发送消息
6. 常见问题
- 测试连接:通过Kafka的
console-producer
和console-consumer
客户端工具测试SASL-SSL
的连接是否配置正确。 - SSL握手失败:若未正确配置SSL密钥库或信任库,可能会遇到
SSL handshake failed
错误。 - 认证失败:若SASL用户名或密码不匹配,可能会出现
Failed authentication
的异常日志。