首页 > 其他分享 >Springboot集成使用阿里云kafka详细步骤

Springboot集成使用阿里云kafka详细步骤

时间:2023-08-08 23:35:45浏览次数:34  
标签:集成 java Springboot jar kafka conf import org

明确连接认证类型

首先要明确使用哪种连接认证类型

Ons模式参考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/beta

Ons模式的conf内容

KafkaClient {
        com.aliyun.openservices.ons.sasl.client.OnsLoginModule required
        AccessKey="XXX"
        SecretKey="XXX";
};

Plain模式参考

https://github.com/AliwareMQ/aliware-kafka-demos/tree/master/kafka-java-demo/vpc-ssl

Plain模式的conf内容

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxxxxxxxxxxxxxxxxxxxx"
  password="xxxxxxxxxxxxxxxxxxxxx";
};

分别在这两个帖子中下载对应的jks证书和conf文件。

Springboot集成使用阿里云kafka详细步骤_java

或者参考代码的相应目录下

Springboot集成使用阿里云kafka详细步骤_阿里云_02

注意,这两个配置都不能打包到jar包中,否则容易无法识别和出问题,所以我们需要放在服务的明确路径里。

例如/jar/kafka_client_jaas.conf和/jar/kafka.client.truststore.jks

集成

springboot版本为1.5.2。

引入kafka-client的jar包

在项目的pom文件中添加kafka-clients并且排除spring-kafka中的kafka-clients。

因为spring-kafka目前最新版本为2.1.2,其依赖的kafka-clients是1.0.x,但Kafka 服务端版本是 0.10,Client 版本建议 0.10,所以此处需排除依赖重新引入,否则一直报错:disconnected

如下:

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.10.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.kafka</groupId>
					<artifactId>kafka-clients</artifactId>
				</exclusion>
			</exclusions>
		</dependency>

新建KafkaAliyunConfiguration类

KafkaAliyunConfiguration.java

package com.biologic.util;

import java.net.URL;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.util.StringUtils;

@Configuration
@EnableKafka
public class KafkaAliyunConfiguration {

	@Value("${kafka.broker.address}")
	private String brokerAddress;

	@Value("${kafka.sample.topic}")
	private String defaultTopic;

	@Value("${kafka.jks.location}")
	private String jksLocation;
	
	@Value("${kafka.sample.retrycount}")
	private String retrycount;
	
	

	public KafkaAliyunConfiguration() {
		 //如果用-D 或者其它方式设置过,这里不再设置
		   if (null == System.getProperty("java.security.auth.login.config")) {
		       //请注意将 XXX 修改为自己的路径
		       //这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中
		       System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");
		   }
				System.out.println("环境变量中已有config文件,kafka配置为:"+System.getProperty("java.security.auth.login.config"));
	}

	public Map<String, Object> producerConfigs() {
		Map<String, Object> props = new HashMap<String, Object>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
		if (StringUtils.isEmpty(jksLocation)) {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaAliyunConfiguration.class.getClassLoader()
					.getResource("kafka.client.truststore.jks").getPath());
		} else {
			props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksLocation);
		}
		props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
		props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
		props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
		props.put(ProducerConfig.RETRIES_CONFIG, retrycount);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
				"org.apache.kafka.common.serialization.StringSerializer");
		props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
		return props;
	}

	public ProducerFactory<String, String> producerFactory() {
		return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
	}

	@Bean
	public  KafkaTemplate<String, String> kafkaTemplate() {
		KafkaTemplate kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
		kafkaTemplate.setDefaultTopic(defaultTopic);
		return kafkaTemplate;
	}
}

此处定义了四个变量,通过配置文件注入:

brokerAddress kafka服务器地址

defaultTopic kafka默认topic

jksLocation JKS文件地址(开发环境无需定义,直接读取resources下的jks,但生产环境需读取jar包外部的jks文件,所以此处需配置路径)

retrycount 重试次数

配置文件properties中增加相应变量

在application-beta.properties中增加对应配置如下:

kafka.broker.address=39.76.22.123:9093,39.175.15.234:9093,39.126.188.165:9093

kafka.sample.retrycount=100

kafka.sample.topic=save_sample

kafka.jks.location=/jar/kafka.client.truststore.jks

新建KafkaService发送消息

KafkaService.java

package com.biologic.api.service;


import org.springframework.stereotype.Service;

@Service
public interface KafkaService {


	void sendMessage(String topic, String data);


	void releaseKafkaMsg(String barcode, String chip);

}

KafkaServiceImpl.java

package com.biologic.api.service.impl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.biologic.api.service.KafkaService;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

@Service
public class KafkaServiceImpl implements KafkaService {

	@Value("${kafka.sample.topic}")
	private String sampleTopic;

	private Logger LOG = LoggerFactory.getLogger(KafkaServiceImpl.class);

	// private final KafkaTemplate<Integer, String> kafkaTemplate;
	//
	// /**
	// * 注入KafkaTemplate
	// * @param kafkaTemplate kafka模版类
	// */
	// @Autowired
	// public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {
	// this.kafkaTemplate = kafkaTemplate;
	// }

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	public void sendMessage(String topic, String data) {
		LOG.info("kafka sendMessage start");
		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, data);
		future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
			@Override
			public void onFailure(Throwable ex) {
				LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data);
			}

			@Override
			public void onSuccess(SendResult<String, String> result) {
				LOG.info("kafka sendMessage success topic = {}, data = {}", topic, data);
			}
		});
		LOG.info("kafka sendMessage end");
	}

	public void releaseKafkaMsg(String barcode, String chip) {
		try {
			JSONArray data = new JSONArray();
			JSONObject kafka_sample_state = new JSONObject();
			kafka_sample_state.put("plate_id", chip);
			kafka_sample_state.put("barcode", barcode);
			kafka_sample_state.put("status", "release_report");
			data.add(kafka_sample_state);

			JSONObject sample_list = new JSONObject();
			sample_list.put("sample_list", data.toString());
			sendMessage(sampleTopic, sample_list.toString());
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

外部注入路径变量的方式

我们上面的代码中是把conf文件的路径写死的,如果需要变动地址,可以使用以下方式

环境注入conf文件路径

因为代码中会默认获取环境变量中的java.security.auth.login.config配置,所以只需要启动时 赋值路径即可。

-Djava.security.auth.login.config=你的配置绝对路径

完整启动springboot的项目命令如下:

java -jar /jar/report-api-1.0.0-SNAPSHOT.jar --spring.profiles.active=beta  -Djava.security.auth.login.config=/jar/kafka_client_jaas.conf

变量注入conf文件路径

注意 因为类的初始化在注入变量之前,所以conf的路径不能用变量的方式注入,否则会报空指针错误。

如下用法会报错

@Value("${kafka.conf.location}")
	private String confLocation;

	public KafkaAliyunConfiguration() {
		// 如果用-D 或者其它方式设置过,这里不再设置
		if (null == System.getProperty("java.security.auth.login.config")) {
			// 请注意将 XXX 修改为自己的路径
			// 这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中
			System.setProperty("java.security.auth.login.config", confLocation);
			System.out.println("使用配置中的路径,kafka配置为:" + System.getProperty("java.security.auth.login.config"));
		} else {
			System.out.println("环境变量中已有config文件,kafka配置为:" + System.getProperty("java.security.auth.login.config"));
		}
	}

安全层面加固

因为直接conf文件中包含帐号密码容易被其他人查看到,有一种方式是外部引入模版文件,使用环境变量中的帐号密码修改conf文件。

模版文件如下:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="_KAFKA_ALIYUN_USERNAME_"
  password="_KAFKA_ALIYUN_PASSWORD_";
};

使用shell命令从s3中下载conf文件并修改conf文件如下:

initContainers:
        - name: pull-lib
          image: anigeo/awscli:latest
          command: ["/bin/sh","-c"] 
          args: ['aws s3 cp s3://test-env/kafka.client.truststore.jks /jar/ ;aws s3 cp s3://test-env/kafka_client_jaas.conf /jar/ ;sed -i "s/_KAFKA_ALIYUN_USERNAME_/${KAFKA_SSL_USERNAME}/"  /jar/kafka_client_jaas.conf;sed -i "s/_KAFKA_ALIYUN_PASSWORD_/${KAFKA_SSL_PASSWORLD}/"  /jar/kafka_client_jaas.conf']
          env:
            - name: AWS_DEFAULT_REGION
              value: cn-southwest-2
            - name: KAFKA_SSL_USERNAME
              valueFrom:
                secretKeyRef:
                  name: aliyun-kafka
                  key: username
            - name: KAFKA_SSL_PASSWORLD
              valueFrom:
                secretKeyRef:
                  name: aliyun-kafka
                  key: password
          volumeMounts:
            - name: workdir
              mountPath: /jar

可能遇到的问题–org.apache.kafka.common.errors.UnsupportedSaslMechanismException: Client SASL mechanism ‘ONS’ not enabled in the server, enabled mechanisms are [PLAIN]

原因

代码中使用的配置与conf中设置的安全机制不一致。

解决方式

PLAIN模式
代码中

props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

对应conf内容

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="xxxxxxxxxxxxxxxxxxxxx"
  password="xxxxxxxxxxxxxxxxxxxxx";
};

ONS模式
代码中

props.put(SaslConfigs.SASL_MECHANISM, "ONS");

对应conf内容

KafkaClient {
        com.aliyun.openservices.ons.sasl.client.OnsLoginModule required
        AccessKey="XXX"
        SecretKey="XXX";
};

可能遇到的问题–nested exception is java.lang.NullPointerException

使用代码为

public KafkaAliyunConfiguration() {

		if (StringUtils.isEmpty(confLocation)) {
			URL authLocation = KafkaAliyunConfiguration.class.getClassLoader().getResource("kafka_client_jaas.conf");
			if (System.getProperty("java.security.auth.login.config") == null) {
				System.setProperty("java.security.auth.login.config", authLocation.toExternalForm());
			}
			System.out.println("kafka配置为:"+authLocation.toExternalForm());
		} else {
		    System.out.println("kafka配置为:"+confLocation);
			System.setProperty("java.security.auth.login.config", confLocation);
		}		
	}

在进行KafkaAliyunConfiguration初始化时报错空指针。

Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'qualityServiceImpl': Unsatisfied dependency expressed through field 'kafkaService'; nested exception is org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'kafkaServiceImpl': Unsatisfied dependency expressed through field 'kafkaTemplate'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaAliyunConfiguration' defined in URL [jar:file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/com/biologic/util/KafkaAliyunConfiguration.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.biologic.util.KafkaAliyunConfiguration$$EnhancerBySpringCGLIB$$88e40778]: Constructor threw exception; nested exception is java.lang.NullPointerException

原因 初始化KafkaAliyunConfiguration时,变量加载的顺序问题导致无法识别到变量。

解决方式

方式一 初始化时不使用注入的变量
如下:

public KafkaAliyunConfiguration() {
		 //如果用-D 或者其它方式设置过,这里不再设置
		   if (null == System.getProperty("java.security.auth.login.config")) {
		       //请注意将 XXX 修改为自己的路径
		       //这个路径必须是一个文件系统可读的路径,不能被打包到 jar 中
		       System.setProperty("java.security.auth.login.config", "/jar/kafka_client_jaas.conf");
		   }
				System.out.println("环境变量中已有config文件,kafka配置为:"+System.getProperty("java.security.auth.login.config"));
	}

方式二 将bean方法设置成static静态方法

参考 spring boot整合shiro引用配置文件配置是出现的问题

可能遇到的问题–Caused by: java.io.FileNotFoundException: file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jks (No such file or directory)

原因

打入jar包的证书和conf文件无法读取,或者没有设置外部路径导致默认读取项目内的配置。

解决方式

通过外部明确的linux路径进行配置。

可能遇到问题–Configuration Error:Line 3: expected [option key]

ssl.truststore.location = file:/jar/report-api-1.0.0-SNAPSHOT.jar!/BOOT-INF/classes!/kafka.client.truststore.jks


ServiceExceptionHandler.java[line:30] exception ERROR org.apache.kafka.common.KafkaException: Failed to construct kafka producer

Caused by: java.io.IOException: Configuration Error:
	Line 3: expected [option key]

原因–配置文件无法读取或者参数格式错误。

解决方法

通过外部明确的linux路径进行配置jks和conf文件, 并且注意conf中的参数格式–分号,冒号要与原文件一致。

标签:集成,java,Springboot,jar,kafka,conf,import,org
From: https://blog.51cto.com/u_16218512/7013746

相关文章

  • springboot集成mongo
    springboot集成mongo背景linux版本:KylinV10docker版本:18.03mongo版本:5.0.5报错信息Causedby:com.mongodb.MongoCommandException:Commandfailedwitherror18:'Authenticationfailed.'onserver172.18.48.233:8888.Thefullresponseis{"ok&quo......
  • SpringBoot静态资源
    访问顺序:Controller->静态资源->404静态资源默认访问路径前端访问:http://localhost:8080/page4.htmlclasspath:/staticclasspath:/publicclasspath:/resourcesclasspath:/META-INF/resources自定义访问路径自定义后默认访问路径失效yml配置文件配置spring: #匹配方式-即前缀 mvc......
  • golang之操作kafka
     安装第三方包:gogetgithub.com/IBM/sarama 生产者实例:packagemainimport("fmt""github.com/IBM/sarama")funcmain(){//1.生产者配置config:=sarama.NewConfig()config.Producer.RequiredAcks=sarama.WaitForAll/......
  • 如何使用goconvey对gin+gorm+mysql搭建的后台进行集成测试
    集成测试对于项目的质量和稳定性非常重要。那么如何实现一个基于真实数据库的测试流程呢?首先,我们需要创建一个专门用于测试的数据库。比如,我自己使用的是以"test_"开头的数据库名。//创建测试数据库funcSetupForTest()(errerror){ db,err=gorm.Open(setting.DatabaseSe......
  • SpringBoot入门
    1.介绍:SpringBoot是一个基于Spring框架的开源项目,旨在简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。SpringBoot提供了丰富的Spring模块化支持,可以帮助开发者更轻松快捷地构建出企业级应用。它通过自动......
  • 监控Kafka的关键指标
    Kafka架构上面绿色部分PRODUCER(生产者)和下面紫色部分CONSUMER(消费者)是业务程序,通常由研发人员埋点解决监控问题,如果是Java客户端也会暴露JMX指标。组件运维监控层面着重关注蓝色部分的BROKER(Kafka节点)和红色部分的ZOOKEEPER。ZooKeeper也是Java语言写的,监控相对简单,另......
  • Kafka数据对接
    1、数据流向:被动接收数据特点:及时性高,数据延迟小,Kafka的数据发送和接收都是毫秒级的。 2、接入参数  kafka:   security.protocol:SSL   ssl.endpoint.identification.algorithm:   ssl:    protocol:SSL    key-store-type:JKS ......
  • SpringBoot配置文件和修改端口
    我们在上一篇文章中已经运行起了一个简单的基础项目并运行起来了。SpringBoot简介项目创建和运行使用但是我们发现简单版的SpringBoot项目没有配置文件,定制版的项目有一个配置文件application.properties,我们还可以发现有些SpringBoot的项目使用的是xml或者yml配置文件。那么......
  • C# 读取带CheckBox复选框控件的表格-并集成到Windows Service里面
    最近的项目要求读取xls文件内的单元格,并且单元格旁边会有复选框标识类型。搜了下只有java的POI有例子,NOPI看项目文档好像是没有实现读取控件的功能。java实现POI POI如何解析出excel中复选框是否被选中https://blog.csdn.net/qq_29832217/article/details/104413475 C#导......
  • 《面试1v1》Kafka的ack机制
    面试官:嗨,小王!听说你对Kafka的ack机制很感兴趣,是吗?候选人:是的,王哥!我一直想了解一下Kafka的ack机制是怎么回事。面试官:好问题!那么,你知道Kafka的ack机制是用来做什么的吗?候选人:嗯,我知道它是用来确保消息的可靠性传递的。但是具体怎么实现的呢?面试官:很好!简单来说,Kafka的ack机制是......