首页 > 其他分享 >微服务同时接入多个Kafka

微服务同时接入多个Kafka

时间:2022-12-11 10:01:05浏览次数:64  
标签:多个 接入 private kafka props put Kafka CONFIG class

准备工作

自己搭建一个Kafka
从官方下载Kafka,选择对应Spring Boot 的版本,好在Kafka支持的版本范围比较广,当前最新版本是3.2.1,支持2.12-3.2.1 范围的版本,覆盖了Spring Boot 2.0x-Spring Boot 3.0.x。
​https://kafka.apache.org/downloads​

微服务同时接入多个Kafka_bootstrap

解压安装

进入bin目录,执行如下命令,按照如下顺序启动
Linux

# 配置文件选择自己对应的目录
zookeeper-server-start.sh ../config/zookeeper.properties

Windows

windows/zookeeper-server-start.bat ../config/zookeeper.properties

打开另外一个终端,启动KafkaServer
Linux

kafka-server-start.sh ../config/server.properties

Windows

windows/kafka-server-start.bat ../config/server.properties

最小化配置Kafka
如下是最小化配置Kafka
pom.xml 引入依赖

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

application.properties

server.port=8090
spring.application.name=single-kafka-server

#kafka 服务器地址
spring.kafka.bootstrap-servers=localhost:9092
#消费者分组,配置后,自动创建
spring.kafka.consumer.group-id=default_group

KafkaProducer 生产者

@Slf4j
@Component
@EnableScheduling
public class KafkaProducer {

@Resource
private KafkaTemplate kafkaTemplate;

private void sendTest() {
//topic 会自动创建
kafkaTemplate.send("topic1", "hello kafka");
}

@Scheduled(fixedRate = 1000 * 10)
public void testKafka() {
log.info("send message...");
sendTest();
}
}

KafkaConsumer 消费者

@Slf4j
@Component
public class KafkaConsumer {

@KafkaListener(topics = {"topic1"})
public void processMessage(String spuId) {
log.warn("process spuId ={}", spuId);
}

}

运行效果:


微服务同时接入多个Kafka_spring_02

多Kafka配置
配置稍微复杂了一点,灵魂就是手动创建,同样引入依赖
pom.xml

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

application.properties

server.port=8090
spring.application.name=kafka-server

#kafka1
#服务器地址
spring.kafka.one.bootstrap-servers=localhost:9092
spring.kafka.one.consumer.group-id=default_group


#kafka2
spring.kafka.two.bootstrap-servers=localhost:9092
spring.kafka.two.consumer.group-id=default_group2

第一个Kafka配置,需要区分各Bean的名称
KafkaOneConfig

@Configuration
public class KafkaOneConfig {

@Value("${spring.kafka.one.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.one.consumer.group-id}")
private String groupId;

@Bean
public KafkaTemplate<String, String> kafkaOneTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean(name = "kafkaOneContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

private ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

}

kafkaOneTemplate 定义第一个Kafka的高级模板,用来发送消息
kafkaOneContainerFactory 消费监听容器,配置在@KafkaListener中,
producerFactory 生产者工厂
consumerFactory 消费者工厂
producerConfigs 生产者配置
consumerConfigs 消费者配置

同样创建第二个Kafka,配置含义,同第一个Kafka
KafkaTwoConfig

@Configuration
public class KafkaTwoConfig {

@Value("${spring.kafka.two.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.two.consumer.group-id}")
private String groupId;

@Bean
public KafkaTemplate<String, String> kafkaTwoTemplate() {
return new KafkaTemplate<>(producerFactory());
}

@Bean(name = "kafkaTwoContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

}

创建一个测试的消费者,注意配置不同的监听容器containerFactory
KafkaConsumer

@Slf4j
@Component
public class KafkaConsumer {

@KafkaListener(topics = {"topic1"}, containerFactory = "kafkaOneContainerFactory")
public void oneProcessItemcenterSpuMessage(String spuId) {
log.warn("one process spuId ={}", spuId);
}

@KafkaListener(topics = {"topic2"},containerFactory = "kafkaTwoContainerFactory")
public void twoProcessItemcenterSpuMessage(String spuId) {
log.warn("two process spuId ={}", spuId);
}
}

创建一个测试的生产者,定时往两个topic中发送消息
KafkaProducer

@Slf4j
@Component
public class KafkaProducer {

@Resource
private KafkaTemplate kafkaOneTemplate;
@Resource
private KafkaTemplate kafkaTwoTemplate;

private void sendTest() {
kafkaOneTemplate.send("topic1", "hello kafka one");
kafkaTwoTemplate.send("topic2", "hello kafka two");
}

@Scheduled(fixedRate = 1000 * 10)
public void testKafka() {
log.info("send message...");
sendTest();
}
}

最后运行效果:


微服务同时接入多个Kafka_bootstrap_03

标签:多个,接入,private,kafka,props,put,Kafka,CONFIG,class
From: https://blog.51cto.com/u_15733182/5928107

相关文章

  • function~排序多个班级的成绩
    题目描述把m个班级的学生成绩由高到底进行排序。输入第1行是一个整数m(0<m<100),表示需要排序的班级数。 后面有m组数,每组数分两行:第一行是一个整数n(0<n<50),表示一个班级......
  • 华普物联 HP-ERS-T200的 MQTT工作模式接入阿里云示例教程
    一、需准备事项:1、注册阿里云平台账号https://account.aliyun.com/login/login.htm2、设备接入阿里MQTT物联网平台参数计算工具http://www.hpiot.cn/index/Download/do......
  • 华普物联HP-E10的MQTT工作模式接入华为云示例教程
    示例操作流程1、注册并登录华为云https://auth.huaweicloud.com/authui/login.html#/login2、登录后,选择产品-->IoT物联网-->设备接入IoTDA  点击设备接入IoTDA3......
  • Cookie_原理分析以及发送多个Cookie
    Cookie_原理分析实现原理:基于响应头set-cookie和请求头cookie实现      发送多个Cookiecookie的细节:1.一次可不可以发送多个cookie?可......
  • element-ui:多个el-dialog弹框切换会出现闪烁
    场景使用多个element-ui组件el-dialog弹框切换打开A弹框,点击关闭,紧接着打开B弹框会出现一个明显的闪烁解决给第一个弹框关闭加一点延迟//先打开另一个对话框this.BD......
  • SDK 接入|游戏语音之“范围语音”接入实践
    语音是线上游戏用户的主要交流方式,大多数用户会通过游戏中的内置语音功能与其他玩家沟通,而一些用户在游戏没有内置语音功能的情况下,通过其他语音软件与玩家沟通。并且,游戏......
  • KubeSphere 接入外部 Elasticsearch 最佳实践
    作者:张坚,科大讯飞开发工程师,云原生爱好者。大家好,我是张坚。今天来聊聊如何在KubeSphere中集成外置的ES组件。KubeSphere在安装完成时候可以启用日志组件,这样会安装ES......
  • 【生产】kafka 统计脚本
    kafka根据时间统计topic是否有数据#!/bin/shforxin`cat/home/kafka/kafka_2.11-1.0.2/bin/topic.list_bak`dobrokers="192.168.27.13:9092"#topic="cmp_ddr_topic"pat......
  • 【生产】kafka 调优
    适用于es和kafkajstat-gcutilpidjstat-gcutilpid1000通过jstat-gcutil1000查看到kafka进程GC情况主要看YGC,YGCT,FGC,FGCT这几个参数,如果这几个值不是......
  • KubeSphere 接入外部 Elasticsearch 最佳实践
    作者:张坚,科大讯飞开发工程师,云原生爱好者。大家好,我是张坚。今天来聊聊如何在KubeSphere中集成外置的ES组件。KubeSphere在安装完成时候可以启用日志组件,这样会安......