首页 > 其他分享 >kafka使用SASL认证

kafka使用SASL认证

时间:2022-10-09 12:34:34浏览次数:56  
标签:configMap outer private 认证 Value put SASL kafka


前提

最近在使用运维团队给到的kafka集群时,需要使用sasl证人连接,这里记录一下

将运维人员给的sasl证书文件client_truststore.jks放在项目resource文件夹下

配置consumer

@Configuration
public class SSLKafkaConsumerConfig {

@Value("${outer.kafka.bootstrap.servers}")
private String bootStrapServers;

@Value("${outer.kafka.enable.auto.commit}")
private String enableAutoCommit;

@Value("${outer.kafka.auto.commit.interval}")
private String autoCommitInterval;

@Value("${outer.kafka.session.timeout}")
private String sessionTimeout;

@Value("${outer.kafka.auto.offset.reset}")
private String autoOffsetReset;

@Value("${outer.kafka.consumer.concurrency}")
private Integer consumerConcurrency;

@Value("${outer.kafka.consumer.poll.timeout}")
private Integer consumerPollTimeout;


@Value("${outer.kafka.security.protocol}")
private String securityProtocol;

//@Value("${outer.kafka.ssl.algorithm}")
private String sslAlgorithm;

//@Value("${outer.kafka.ssl.password}")
private String sslPassword;

@Value("${outer.kafka.sasl.mechanism}")
private String saslMechanism;

@Value("${outer.kafka.sasl.jaas.config}")
private String saslJaasConfig;

//@Value("${outer.kafka.sasl.truststore}")
private String trustStore;


@Value("${outer.kafka.consumer.service.product.consumer.group}")
private String groupId;

@Value("${outer.kafka.consumer.service.product.topic}")
private String serviceProductTopic;


public Map<String, Object> kafkaConfigs() {
Map<String, Object> configMap = new HashMap<>();

configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);

configMap.put("security.protocol", securityProtocol);
configMap.put("sasl.mechanism", saslMechanism);
configMap.put("sasl.jaas.config", saslJaasConfig);

configMap.put("ssl.endpoint.identification.algorithm", "");
configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + "client_truststore.jks");
configMap.put("ssl.truststore.password", sslPassword);

return configMap;
}

@Bean
public KafkaConsumer<String, String> sslKafkaConsumer() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs());
consumer.subscribe(Arrays.asList(serviceProductTopic));
return consumer;
}
}

业务类中注入使用即可

    @Autowired
    @Qualifier("sslKafkaConsumer")
    private KafkaConsumer<String, String> sslKafkaConsumer;

比如消费

while (true) {
ConsumerRecords<String, String> records = null;
try {
records = sslKafkaConsumer.poll(timeoutMs);
sslKafkaConsumer.commitSync();
} catch (ConcurrentModificationException e) {
//e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("eService消费ssl kafka 消息失败。");
LOGGER.error(e.getMessage());
}
if (Objects.isNull(records)) {
continue;
}
System.out.print(records)
}

标签:configMap,outer,private,认证,Value,put,SASL,kafka
From: https://blog.51cto.com/u_11334685/5740096

相关文章

  • kafka从指定位置消费
    消费者消费方式1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给同group下的不同consumer。2、KafkaConsume......
  • Ceph使用---CephX认证机制及用户管理
    一、CephX认证机制介绍Ceph使用cephx协议对客户端进行身份认证cephx用于对ceph保存的数据进行认证访问和授权,用于对访问ceph的请求进行认证和授权检测,与mon通信的请求都要......
  • 39.BasicAuthentication认证
    BasicAuthentication认证介绍BasicAuthentication使用HTTP基本的认证机制通过用户名/密码的方式验证,通常用于测试工作,尽量不要线上使用用户名和密码必须在HTTP报文......
  • 38.认证系统
    认证身份验证是将传入的请求与一组鉴别凭据关联的机制,然后使用权限和限流策略来确定是否允许请求进入再权限和限流检查发生之前,以及在执行其他代码之前,实在在视图的最......
  • MobaXterm注册认证版,亲测可用,操作简单(本机已安装python3环境)
    去github地址下下载代码  解压后在该目录下打开CMD执行MobaXterm-Keygen.py<UserName><Version>命令  生成的文件放在安装目录下,我的是免安装版,放在exe同目......
  • kafka消费者学习
    转自:https://www.cnblogs.com/cxuanBlog/p/11949238.html1.介绍  Kafka消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分......
  • kafka的缺陷
    转自:https://mp.weixin.qq.com/s/_RIvZwK1sJJP8xnUDyAk1Q1.broker和partition的问题全量复制的问题。  2.缓存页写回可能丢失 3.进阶 ......
  • kafka的生产者学习
    转自:https://www.cnblogs.com/cxuanBlog/p/11949238.html1.流程介绍图1大概流程:创建一个ProducerRecord 对象,它是Kafka中的一个核心类,由记录要发送到的主题名称(T......
  • drf自动生成路由、on_delete、登录认证
    目录回顾一、drf入门1.前后端开发模式2.API接口:前后端交互的媒介3.接口测试工具:postman4.restful规范十条5.drf快速入门6.必须继承APIView写视图类二、序列化组件1.功能三......
  • 自动生成路由、登录接口与登录认证
    路由自己写的路由path('books/',views.BookView.as_view({'get':'list','post':'create'})),path('admin/<int:pk>',views.BookView.as_view({'get':'retrieve',......