首页 > 其他分享 >Kafka - 启用安全通信和认证机制_SSL + SASL

Kafka - 启用安全通信和认证机制_SSL + SASL

时间:2024-11-10 10:46:34浏览次数:7  
标签:ssl ca server kafka SSL SASL Kafka

文章目录


在这里插入图片描述


官方资料

https://kafka.apache.org/documentation/#security
在这里插入图片描述


概述

Kafka的SASL-SSL配置主要用于保护集群的网络传输安全,确保客户端与服务器端的通信通过加密和认证机制来保证数据的安全性和访问控制.。

Kafka支持多种安全协议,最常见的包括SSL(加密传输)、SASL(认证)和ACL(访问控制列表)。配置SASL-SSL组合可以实现:

  • SSL(Secure Sockets Layer):为数据传输提供加密,防止传输过程中的数据泄露和篡改。
  • SASL(Simple Authentication and Security Layer):提供身份认证机制。通过配置不同的SASL机制(如PLAIN、SCRAM-SHA-256等),限制只有经过认证的用户才可以访问Kafka。

制作kakfa证书

1.1 openssl 生成CA

openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes -out cacert.pem -outform PEM

openssl-ca.cnf内容如下

HOME            = .
RANDFILE        = $ENV::HOME/.rnd

####################################################################
[ ca ]
default_ca    = CA_default      # 默认的 ca 部分

[ CA_default ]
base_dir      = .               # 基本目录
certificate   = $base_dir/ca/cacert.pem   # CA 证书
private_key   = $base_dir/ca/cakey.pem    # CA 私钥
new_certs_dir = $base_dir              # 签名后的新证书位置
database      = $base_dir/index/index.txt    # 数据库索引文件
serial        = $base_dir/serial/serial.txt   # 当前序列号

default_days     = 1000         # 证书有效期
default_crl_days = 30           # 下一个 CRL 之前的有效期
default_md       = sha256       # 使用的公钥摘要算法,默认为 sha256
preserve         = no           # 不保留传递的 DN 排序

x509_extensions = ca_extensions # 添加到证书的扩展

email_in_dn     = no            # 在 DN 中不包含电子邮件
copy_extensions = copy          # 从 CSR 复制 SAN 到证书

####################################################################
[ req ]
default_bits       = 4096       # 默认密钥位数
default_keyfile    = cakey.pem  # 默认密钥文件
distinguished_name = ca_distinguished_name   # 默认的区分名
x509_extensions    = ca_extensions           # 默认的 X.509 扩展
string_mask        = utf8only   # 字符串掩码

####################################################################
[ ca_distinguished_name ]
countryName         = Country Name (2 letter code)   # 国家/地区名(两个字母代码)
countryName_default = CN                            # 默认国家/地区

stateOrProvinceName         = State or Province Name (full name)   # 州/省名(全名)
stateOrProvinceName_default = SD                      # 默认州/省名

localityName                = Locality Name (eg, city)            # 地区/城市名
localityName_default        = JN                          # 默认地区/城市名

organizationName            = Organization Name (eg, company)     # 组织名
organizationName_default    = UNKNOWN                       # 默认组织名

organizationalUnitName         = Organizational Unit (eg, division)   # 组织单位名
organizationalUnitName_default = UNKNOWN                            # 默认组织单位名

commonName         = Common Name (e.g. server FQDN or YOUR name)       # 通用名(例如服务器 FQDN 或您的姓名)
commonName_default = HUC                                        # 默认通用名

emailAddress         = Email Address           # 电子邮件地址
emailAddress_default = [email protected]           # 默认电子邮件地址

####################################################################
[ ca_extensions ]
subjectKeyIdentifier   = hash                  # 主题密钥标识符
authorityKeyIdentifier = keyid:always, issuer  # 授权密钥标识符
basicConstraints       = critical, CA:true     # 基本约束
keyUsage               = keyCertSign, cRLSign   # 密钥用途

####################################################################
[ signing_policy ]
countryName            = optional
stateOrProvinceName    = optional
localityName           = optional
organizationName       = optional
organizationalUnitName = optional
commonName             = supplied
emailAddress           = optional

####################################################################
[ signing_req ]
subjectKeyIdentifier   = hash                  # 主题密钥标识符
authorityKeyIdentifier = keyid,issuer           # 授权密钥标识符
basicConstraints       = CA:FALSE               # 基本约束
keyUsage               = digitalSignature, keyEncipherment      # 密钥用途


1.2 生成server端秘钥对以及证书仓库

keytool -genkeypair -keystore serverkeystore.jks -alias KafkaKeyStore -validity 3650 -keyalg RSA -storetype pkcs12

1.3 CA 签名证书

利用服务端秘钥库生成证书请求:

keytool -keystore serverkeystore.jks -alias Kafkakeystore -certreq -file cert-file.csr

CA签名生成签名证书:

openssl ca -config openssl-ca.cnf -policy signing_policy -extensions signing_req -out kafka-cert-signed.cert -days 3650 -infiles ../cert-file.csr

1.4 服务端秘钥库导入签名证书以及CA根证书

keytool -keystore serverkeystore.jks -alias Kafkakeystore -import -file kafka-cert-signed.cert
keytool -keystore serverkeystore.jks -alias CARoot -import -file ./ca/ca/cacert.pem

1.5 生成服务端信任库并导入CA根数据

keytool -keystore server_truststor.jks -alias CARoot -import -file ./ca/cacert.pem

1.6 生成客户端信任库并导入CA根证书

keytool -keystore client_truststor.jks -alias CARoot -import -file ./ca/cacert.pem

2 配置zookeeper SASL认证

2.1 编写zk_server_jass.conf配置

文件信息如下

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="zk"
   password="1qax#EDC5tgb"
   user_kafka="234BCDefg#";
};

其中username与password为zk集群之前认证的用户名密码,user_kafka=“234BCDefg#” 代表用户为kafka,密码为234BCDefg#,kafka认证的用户名以及密码,kafka中需要配置

2.2 修改zk启动脚本

添加启动参数如下:

-Djava.security.auth.login.config=zk_server_jass.conf

2.3 修改zoo.cfg 配置文件

添加参数如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

2.4 zk添加jar包

拷贝kafka lib目录下kafka-clients-2.3.0-SNAPSHOT.jar、lz4-.jar、slf4j-api-.jar、slf4j-log4j12-.jar、snappy-java-.jar 五个jar包到zk的lib目录下

2.5 启动zk验证服务


3 配置kafka server端

3.1 编写kafka_server_jaas.conf

添加jass配置文件,内容如下:

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="234BCDefg#"
   user_admin="234BCDefg#"
   user_alice="234BCDefg#";
};
Client {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="kafka"
   password="234BCDefg#";
};

其中username与password 为kafka集群内部认证的用户名与密码,与user_admin或者其他用户保持一致
user_admin="234BCDefg#"与user_alice="234BCDefg#"配置用户,格式为user_用户名=密码
Client 配置zk认证的用户名与密码,必须与zk中配置的保持一致

3.2 修改kafka启动脚本

添加启动参数如下:

-Djava.security.auth.login.config=kafka_server_jaas.conf

3.3 修改server.properties

修改监听协议

listeners=SASL_SSL://:9093
advertised.listeners=SASL_SSL://10.11.106.63:9093

配置SSL

ssl.keystore.location=./ssl/kakfa-server-keystore.jks
ssl.keystore.password=1qaz#EDC5tgb
ssl.key.password=1qaz#EDC5tgb
ssl.truststore.location=./ssl/server_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
ssl.endpoint.identification.algorithm=

配置SASL

security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

3.4 启动kafka

验证服务是否正常启动


4 配置kafka client 端

客户端添加参数如下:
指定SASL_SSL协议:

security.protocol=SASL_SSL

SASL认证:

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"234BCDefg#\";

SSL配置:

ssl.truststore.location=./ssl/client_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.endpoint.identification.algorithm=

5 验证

命令行执行:

 .\kafka-console-producer.bat --broker-list 127.0.0.1:9093  --topic artisanTest

未指定ssl参数,则server端日志中会出现以下异常:

[2024-05-24 10:20:55,334] INFO [SocketServer brokerId=0] Failed authentication with 127.0.0.1/127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector)

添加SSL 客户端配置文件 client-ssl.properties,内容如下:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\" password=\"234BCDefg#\";
ssl.truststore.location=./ssl/client_truststor.jks
ssl.truststore.password=1qaz#EDC5tgb
ssl.endpoint.identification.algorithm=

再次执行命令,添加配置参数:

 .\kafka-console-producer.bat --broker-list 127.0.0.1:9093  --topic artisanTest --producer.config client-ssl.properties

可正常发送消息


6. 常见问题

  • 测试连接:通过Kafka的console-producerconsole-consumer客户端工具测试SASL-SSL的连接是否配置正确。
  • SSL握手失败:若未正确配置SSL密钥库或信任库,可能会遇到SSL handshake failed错误。
  • 认证失败:若SASL用户名或密码不匹配,可能会出现Failed authentication的异常日志。

在这里插入图片描述

标签:ssl,ca,server,kafka,SSL,SASL,Kafka
From: https://blog.csdn.net/yangshangwei/article/details/143652948

相关文章

  • Kafka 核心要点解析
    目录一、Kafka消息发送流程二、Kafka的设计架构三、Kafka分区的目的四、Kafka保证消息有序性的方式五、ISR、OSR、AR概念六、Kafka在什么情况下会出现消息丢失七、保证Kafka可靠性的方法八、Kafka数据去重九、生产者提高吞吐量的方法十、Zookeeper在Kafka......
  • Kafka 分区的目的?
    Kafka分区的主要目的包括以下几点:提高吞吐量:分区允许多个消费者并行读取数据,从而显著提高系统的整体吞吐量。每个分区可以由不同的消费者实例处理,实现负载均衡。数据分布:通过分区,数据可以分布在多个Broker上,避免单个Broker成为性能瓶颈。这使得Kafka能够支持大规模的数据存储......
  • (已解决)等保/密评用什么SSL证书?
    等保(信息安全等级保护)和密评(商用密码应用安全性评估)是中国网络安全领域的重要标准,它们要求对网络系统进行分层的安全防护和密码学应用的合规性检查。在这些框架下,使用SSL/TLS证书来加密数据传输是常见的安全措施之一。为了满足等保和密评的要求,选择SSL证书时应考虑以下几个关......
  • 一年期的免费SSL证书服务
    随着互联网技术的快速发展,教育机构越来越多地依赖在线平台进行教学、管理和沟通。然而,随着网络攻击和数据泄露事件的频发,确保在线教育平台的安全性变得尤为重要。SSL证书可以为教育机构提供必要的数据保护和信任建立。JoySSL是一家专业的HTTPS安全方案服务商,专注于提供SSL/TLS......
  • 如何获取IP地址专用SSL证书
    随着网络安全意识的不断提高,越来越多的网站和服务开始采用SSL证书来加密数据传输,保护用户信息的安全。对于那些仅通过IP地址访问的服务来说,获取IP地址专用SSL证书就显得尤为重要。1.准备阶段确认公网IP地址确保您拥有一个可以在互联网上访问的公网IP地址。2.注册账号和......
  • 解决Ubuntu的Anaconda中的ssl_verify报错问题
    一、问题运行Anaconda时,遇到了下面的报错:CustomValidationError:Parameterssl_verify='/home/omnisky/anaconda3/ssl/cacert.pem'declaredin<<merged>>isinvalid.ssl_verifyvalue'/home/omnisky/anaconda3/ssl/cacert.pem'mustbeaboole......
  • kafka+zookeeper的搭建
            kafka从2.8版本开始,就可以不用配置zookeeper了,但是也可以继续配置。我目前使用的kafka版本是kafka_2.12-3.0.0.tgz,其中前面的2.12表示是使用该版本的scala语言进行编写的,而后面的3.00才是kafka当前的版本。通过百度网盘分享的文件:apache-zookeeper-3.6.4-bin.......
  • Kafka概述--消息中间件
    目录1.1定义1.2、kafka的中的组成成员1.3消息队列(中间件)1.3.1传统消息队列的应用场景1.3.2消息队列的两种模式1.4Kafka基础架构1.5、kafka的名词概念1.1定义kafka面试非常的重要,做实时或者准实时项目必用工具(绕不开)。Kafka就是用来存储消息的,消息中间件。......
  • Kafka 快速入门-安装部署
    目录1.1安装部署1.1.1集群规划1.1.2集群规划1.1.3集群启停脚本1.1安装部署1.1.1集群规划bigdata01bigdata02bigdata03zkzkzkkafkakafkakafka1.1.2集群规划每次进入linux都会自动进入base环境,如何关闭basecondadeactivate 手动......
  • 使用 Let’s Encrypt 获取免费SSL证书
    文章目录前言申请需求详细步骤:安装Certbot使用DNS验证生成证书找到生成的证书文件将证书文件复制到目标服务器在目标服务器上配置Nginx使用证书注意更新证书本地操作实例总结前言之前不太了解SSL免费证书,只研究过一点自签名SSL证书的知识,前几年的12306就是用的......