前提
最近在使用运维团队给到的kafka集群时,需要使用sasl证人连接,这里记录一下
将运维人员给的sasl证书文件client_truststore.jks放在项目resource文件夹下
配置consumer
@Configuration
public class SSLKafkaConsumerConfig {
@Value("${outer.kafka.bootstrap.servers}")
private String bootStrapServers;
@Value("${outer.kafka.enable.auto.commit}")
private String enableAutoCommit;
@Value("${outer.kafka.auto.commit.interval}")
private String autoCommitInterval;
@Value("${outer.kafka.session.timeout}")
private String sessionTimeout;
@Value("${outer.kafka.auto.offset.reset}")
private String autoOffsetReset;
@Value("${outer.kafka.consumer.concurrency}")
private Integer consumerConcurrency;
@Value("${outer.kafka.consumer.poll.timeout}")
private Integer consumerPollTimeout;
@Value("${outer.kafka.security.protocol}")
private String securityProtocol;
//@Value("${outer.kafka.ssl.algorithm}")
private String sslAlgorithm;
//@Value("${outer.kafka.ssl.password}")
private String sslPassword;
@Value("${outer.kafka.sasl.mechanism}")
private String saslMechanism;
@Value("${outer.kafka.sasl.jaas.config}")
private String saslJaasConfig;
//@Value("${outer.kafka.sasl.truststore}")
private String trustStore;
@Value("${outer.kafka.consumer.service.product.consumer.group}")
private String groupId;
@Value("${outer.kafka.consumer.service.product.topic}")
private String serviceProductTopic;
public Map<String, Object> kafkaConfigs() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
configMap.put("security.protocol", securityProtocol);
configMap.put("sasl.mechanism", saslMechanism);
configMap.put("sasl.jaas.config", saslJaasConfig);
configMap.put("ssl.endpoint.identification.algorithm", "");
configMap.put("ssl.truststore.location", ClassUtils.getDefaultClassLoader().getResource("").getPath() + "client_truststore.jks");
configMap.put("ssl.truststore.password", sslPassword);
return configMap;
}
@Bean
public KafkaConsumer<String, String> sslKafkaConsumer() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfigs());
consumer.subscribe(Arrays.asList(serviceProductTopic));
return consumer;
}
}
业务类中注入使用即可
@Autowired
@Qualifier("sslKafkaConsumer")
private KafkaConsumer<String, String> sslKafkaConsumer;
比如消费
while (true) {标签:configMap,outer,private,认证,Value,put,SASL,kafka From: https://blog.51cto.com/u_11334685/5740096
ConsumerRecords<String, String> records = null;
try {
records = sslKafkaConsumer.poll(timeoutMs);
sslKafkaConsumer.commitSync();
} catch (ConcurrentModificationException e) {
//e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("eService消费ssl kafka 消息失败。");
LOGGER.error(e.getMessage());
}
if (Objects.isNull(records)) {
continue;
}
System.out.print(records)
}