1. 背景
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。
SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。
2. 环境
- 操作系统:linux
- kafka版本:kafka_2.13-2.7.1
- zookeeper版本:apache-zookeeper-3.7.0
- 应用程序版本:spring-boot-2.6.7、JDK1.8
3. 操作步骤
- 生成SSL证书
- 配置zookeeper
- 配置kafka
- 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
- 在业务代码中使用请查看(3.5)
3.1 生成SSL证书
按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥
参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)
3.2 配置zookeeper认证
第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:
Server { org.apache.zookeeper.server.auth.DigestLoginModule required user_admin="1qaz@WSX" user_kafka="1qaz@WSX"; };
第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件
3.3 配置kafka安全认证
第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:
kafka-server-jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="1qaz@WSX" user_admin="1qaz@WSX" user_kafka="1qaz@WSX"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="1qaz@WSX"; };
kafka-client-jaas.conf
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="1qaz@WSX"; };
第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:
增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
... if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf" fi ...
第三步:**修改 kafka 的 server.properties配置文件
#listeners=SSL://10.1.61.121:9092 host.name=node1 #listeners=PLAINTEXT://node1:9092,SSL://node1:9093 listeners=SASL_SSL://node1:9093 #advertised.listeners=SSL://node1:9092 advertised.listeners=SASL_SSL://node1:9093 ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks ssl.keystore.password=Q06688 ssl.key.password=Q06688 ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks ssl.truststore.password=Q06688 ssl.client.auth=required ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.keystore.type=JKS ssl.truststore.type=JKS # kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名 # 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可 ssl.endpoint.identification.algorithm= # 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT security.inter.broker.protocol=SASL_SSL sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true
注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname
3.4 使用kafka客户端进行验证
第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。
consumer.properties:
bootstrap.servers=node1:9093 security.protocol=SASL_SSL ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks ssl.truststore.password=Q06688 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
producer.properties:
bootstrap.servers=node1:9093 security.protocol=SASL_SSL ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks ssl.truststore.password=Q06688 sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)
#生产 crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties >aaa >bbb >ccc > #消费 crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties aaa bbb ccc
3.5 使用Java端代码进行认证(如果不是用springboot可以不用管 )
第一步: yaml 配置文件
spring: kafka: bootstrap-servers: localhost:9093 properties: sasl: mechanism: PLAIN jaas: #此处填写 SASL登录时分配的用户名密码(注意password结尾;) config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX"; security: protocol: SASL_SSL ssl: trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks trust-store-password: Q06688 key-store-type: JKS producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 106384 acks: -1 retries: 3 properties: linger-ms: 1 retry.backoff.ms: 1000 buffer-memory: 33554432
第二步: 使用 kafkaTemplate 的方式,配置一个 config
import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Slf4j @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.retries}") private String retries; @Value("${spring.kafka.producer.batch-size}") private String batchSize; @Value("${spring.kafka.producer.properties.linger-ms}") private int lingerMs; @Value("${spring.kafka.producer.properties.buffer-memory}") private int bufferMemory; @Value("${spring.kafka.producer.key-serializer}") private String keySerializer; @Value("${spring.kafka.producer.value-serializer}") private String valueSerializer; @Value("${spring.kafka.properties.security.protocol}") private String protocol; @Value("${spring.kafka.properties.sasl.mechanism}") private String mechanism; @Value("${spring.kafka.ssl.trust-store-location}") private String trustStoreLocation; @Value("${spring.kafka.ssl.trust-store-password}") private String trustStorePassword; @Value("${spring.kafka.ssl.key-store-type}") private String keyStoreType; @Value("${spring.kafka.properties.sasl.jaas.config}") private String jaasConfig; @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } /** * the producer factory config */ @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> props = new HashMap<String, Object>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.ACKS_CONFIG, acks); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); props.put("security.protocol", protocol); props.put(SaslConfigs.SASL_MECHANISM, mechanism); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword); props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType); props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig); return new DefaultKafkaProducerFactory<String, String>(props); } }
3.6 使用Java端代码进行认证(KafkaProducer KafkaConsumer)
生产者
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerSaslSslExample { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 配置SASL认证方式为SASL_SSL props.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制 // 配置Kerberos认证所需的相关参数 props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";"); Producer<String, String> producer = new KafkaProducer<>(props); // 生产者使用示例 producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> { if (exception == null) { System.out.println("消息发送成功"); } else { exception.printStackTrace(); } }); producer.close(); } }
注意
在这个示例中,你需要替换your-kafka-broker:9092
为你的Kafka代理地址和端口,your-topic
为你要发送消息的主题,your-principal
为Kerberos中的主体名称,/path/to/your/keytab
为Kerberos密钥表文件的路径。
请确保你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的
消费者
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; import java.util.Properties; public class KafkaSaslSslExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";"); // SSL配置 props.put("ssl.truststore.location", "/path/to/truststore.jks"); props.put("ssl.truststore.password", "truststore_password"); props.put("ssl.keystore.location", "/path/to/keystore.jks"); props.put("ssl.keystore.password", "keystore_password"); props.put("group.id", "test"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // ... 消费者逻辑 } }
注意:
在这个示例中,我们配置了KafkaConsumer以使用SASL_SSL协议进行通信,并且指定了SASL的PLAIN认证机制。我们还需要指定SSL的信任库和密钥库的位置以及它们的密码。sasl.jaas.config
属性中应该包含有效的JAAS配置,它定义了用于认证的用户名和密码。
请确保替换属性值中的占位符(<your-username>
, <your-password>
, /path/to/truststore.jks
, truststore_password
, /path/to/keystore.jks
, 和 keystore_password
)为实际值。
标签:SASL,kafka,SSL,props,put,org,KAFKA From: https://www.cnblogs.com/paimianbaobao/p/18258889