首页 > 其他分享 >KAFKA配置 SASL_SSL双重认证

KAFKA配置 SASL_SSL双重认证

时间:2024-06-20 16:26:01浏览次数:27  
标签:SASL kafka SSL props put org KAFKA

1. 背景
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、JDK1.8

 

3. 操作步骤

  1. 生成SSL证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

 

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

 

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

 

 

 

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="1qaz@WSX"
  user_admin="1qaz@WSX"
  user_kafka="1qaz@WSX";
};
 
Client {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="1qaz@WSX";
};

  kafka-client-jaas.conf

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="1qaz@WSX";
};

  

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi

...

第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

 注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname 

 

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

 

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

  producer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688

sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

  第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>

#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

  

3.5 使用Java端代码进行认证(如果不是用springboot可以不用管 )

第一步: yaml 配置文件

spring:
  kafka:
    bootstrap-servers: localhost:9093
    properties:
      sasl:
        mechanism: PLAIN
        jaas:
          #此处填写 SASL登录时分配的用户名密码(注意password结尾;)
          config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
      security:
        protocol: SASL_SSL
    ssl:
      trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jks
      trust-store-password: Q06688
      key-store-type: JKS
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 106384
      acks: -1
      retries: 3
      properties:
        linger-ms: 1
        retry.backoff.ms: 1000
        buffer-memory: 33554432

  第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.retries}")
    private String retries;
    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;
    @Value("${spring.kafka.producer.properties.linger-ms}")
    private int lingerMs;
    @Value("${spring.kafka.producer.properties.buffer-memory}")
    private int bufferMemory;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;
    @Value("${spring.kafka.properties.security.protocol}")
    private String protocol;
    @Value("${spring.kafka.properties.sasl.mechanism}")
    private String mechanism;
    @Value("${spring.kafka.ssl.trust-store-location}")
    private String trustStoreLocation;
    @Value("${spring.kafka.ssl.trust-store-password}")
    private String trustStorePassword;
    @Value("${spring.kafka.ssl.key-store-type}")
    private String keyStoreType;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String jaasConfig;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * the producer factory config
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put("security.protocol", protocol);
        props.put(SaslConfigs.SASL_MECHANISM, mechanism);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);
        props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);
        return new DefaultKafkaProducerFactory<String, String>(props);
    }
}

3.6 使用Java端代码进行认证(KafkaProducer  KafkaConsumer)

生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
 
import java.util.Properties;
 
public class KafkaProducerSaslSslExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-kafka-broker:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        // 配置SASL认证方式为SASL_SSL
        props.put(ProducerConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(ProducerConfig.SASL_MECHANISM, "PLAIN"); // 或者其他支持的SASL机制
        
        // 配置Kerberos认证所需的相关参数
        props.put(ProducerConfig.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 生产者使用示例
        producer.send(new ProducerRecord<>("your-topic", "message-key", "message-value"), (metadata, exception) -> {
            if (exception == null) {
                System.out.println("消息发送成功");
            } else {
                exception.printStackTrace();
            }
        });
        
        producer.close();
    }
}

 注意

在这个示例中,你需要替换your-kafka-broker:9092为你的Kafka代理地址和端口,your-topic为你要发送消息的主题,your-principal为Kerberos中的主体名称,/path/to/your/keytab为Kerberos密钥表文件的路径。

请确保你的Kafka集群已经配置了SSL和SASL认证,并且相关的安全设置是正确的

 

消费者

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
 
import java.util.Properties;
 
public class KafkaSaslSslExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("security.protocol", "SASL_SSL");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<your-username>\" password=\"<your-password>\";");
 
        // SSL配置
        props.put("ssl.truststore.location", "/path/to/truststore.jks");
        props.put("ssl.truststore.password", "truststore_password");
        props.put("ssl.keystore.location", "/path/to/keystore.jks");
        props.put("ssl.keystore.password", "keystore_password");
 
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // ... 消费者逻辑
    }
}

  注意:

在这个示例中,我们配置了KafkaConsumer以使用SASL_SSL协议进行通信,并且指定了SASL的PLAIN认证机制。我们还需要指定SSL的信任库和密钥库的位置以及它们的密码。sasl.jaas.config 属性中应该包含有效的JAAS配置,它定义了用于认证的用户名和密码。

请确保替换属性值中的占位符(<your-username><your-password>/path/to/truststore.jkstruststore_password/path/to/keystore.jks, 和 keystore_password)为实际值。

 

 

标签:SASL,kafka,SSL,props,put,org,KAFKA
From: https://www.cnblogs.com/paimianbaobao/p/18258889

相关文章

  • 剖析 Kafka 消息丢失的原因
    文章目录前言一、生产者导致的消息丢失的场景场景1:消息太大解决方案:1、减少生产者发送消息体体积2、调整参数max.request.size场景2:异步发送机制解决方案:1、使用带回调函数的发送方法场景3:网络问题和配置不当解决方案:1、设置`acks`参数设置为"all"2、设置重试参数......
  • aggregate ‘QSslConfiguration conf‘ has incomplete type and cannot be defined
    用Qt进行网络开发,所以程序中包含了network模块,但编译Qt程序时报错aggregate'QSslConfigurationconf'hasincompletetypeandcannotbedefined,报错截图如下QSslConfiguration类是Qt框架中用于SSL配置的一部分,报错表示编译器没有找到QSslConfiguration的完整定义,需......
  • Tomcat服务器安装SSL证书教程
    Tomcat服务器安装SSL证书教程,主要包括获取证书、安装证书、重启Tomcat以及测试SSL证书是否安装成功等4大步骤,以下是详细图文教程。一、获取证书免费申请SSL证书,证书申请成功后,会获取到颁发证书文件(.zip)压缩格式,当中有包含四种证书格式如:Tomcat、Nginx、IIS、Apache;Tomcat服务......
  • Nginx服务器安装SSL证书教程
    Nginx服务器安装SSL证书教程,主要包括获取证书、安装证书、重启Nginx以及测试SSL证书是否安装成功等4大步骤,以下是详细图文教程。一、获取证书免费申请SSL证书,证书申请成功后,会获取到颁发证书文件(.zip)压缩格式,当中有包含四种证书格式如:Tomcat、Nginx、IIS、Apache;Nginx服务器......
  • Kafka集群保姆级部署教程
    目录资源列表基础环境修改主机名关闭防火墙关闭selinux安装JAVA安装Kafka下载Kafka解压修改配置文件kafka01kafka02kafka03启动服务启动ZK启动Kafka验证测试创建topic查看topic        今天给大家分享的是Kafka分布式集群部署,上次分享的单机版的k......
  • 【2024最新精简版】Kafka面试篇
    文章目录Kafka和RabbitMQ什么区别讲一讲Kafka架构你们项目中哪里用到了Kafka?为什么会选择使用Kafka?有什么好处?使用Kafka如何保证消息不丢失?消息的重复消费问题如何解决的?Kafka如何保证消费的顺序性?Kafka的高可用机制有了解过嘛?Kafka实现高性能的设计有了解......
  • 【Android面试八股文】SSL握手的过程都经历过什么?
    文章目录一、为什么要设计SSL/TLS?二、SSL/TLS的历史概述SSL(SecureSocketsLayer)TLS(TransportLayerSecurity)总结三、SSL/TLS加密过程中如何保证公钥不被篡改?公钥加密计算量太大,如何减少耗用的时间?3.1如何保证公钥不被篡改?3.2公钥加密计算量太大,如何......
  • 批量生产千万级数据 推送到kafka代码
    1、批量规则生成代码1、随机IP生成代码2、指定时间范围内随机日期生成代码3、随机中文名生成代码。packagecom.wfg.flink.connector.utils;importjava.time.LocalDate;importjava.time.LocalDateTime;importjava.time.LocalTime;importjava.util.ArrayList;i......
  • Flink1.17.0-报错: java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.De
    背景:启动Flink的sql-client.sh,创建Kafka的source端表,然后查询Kafka的数据时报错。报错信息:2024-06-1816:10:12org.apache.flink.util.FlinkException:GlobalfailuretriggeredbyOperatorCoordinatorfor'Source:kafka_rmc_cust_analog_u[1]'(operatorbc764cd8ddf7a0c......
  • HTTPS+TLS/SSL
    名词解释TLS:TransportLayerSecuritySSL:SecureSocketsLayer理论基础信息摘要算法根据一段信息计算出一串数字,但是由这串数字没办法还原出原来的信息等于是这串数字由这段信息产生,可以表示这段信息,称为这段信息的摘要主要有以下应用:传输文件时验证文件有无损坏......