首页 > 其他分享 >springboot项目配置多个kafka

springboot项目配置多个kafka

时间:2023-04-24 14:46:34浏览次数:42  
标签:ConsumerConfig springboot 多个 configProps CONFIG factory kafka consumer

1.spring-kafka

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>

2.配置文件相关信息


kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false

kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false

3.kafka配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${kafka.consumer.enable.auto.commit}")
    private String autoCommit;

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;


    @Value("${kafka.consumer.group.id.pic}")
    private String groupIdPic;

    @Value("${kafka.consumer.concurrency.pic}")
    private int concurrencyPic;

    @Value("${kafka.consumer.enable.auto.commit.pic}")
    private String autoCommitPic;

    @Value("${kafka.bootstrap-servers.pic}")
    private String bootstrapServerPic;


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        String bootstrapServers = bootstrapServer;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }




    @Bean
    public ConsumerFactory<String, String> consumerFactoryPic() {
        String bootstrapServers = bootstrapServerPic;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryPic());
        factory.setConcurrency(concurrencyPic);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

4.消费主题消息

@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            String jsonString = message.value();
            if (StringUtils.isNoneBlank(jsonString)) {
                log.info("消费:{}",jsonString);
                //TODO ....
            }
        } catch (Exception e) {
            log.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            if (StringUtils.isNoneBlank(message.value())) {
                  //TODO ....
            }
        } catch (Exception e) {
            logger.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

 

标签:ConsumerConfig,springboot,多个,configProps,CONFIG,factory,kafka,consumer
From: https://www.cnblogs.com/128-cdy/p/17293059.html

相关文章

  • springboot集成redis时总报错Connection refused: no further information: localhost
    nacos上配置的关于redis的key值不是springboot需要的固定写法如:sping.redis.host=spring.redis.port=sping.redis.password=spring.redis.database=我写的是一个自定义的key如com.dream.redis.host改为springboot认识的即可其他和springboot集成的组件类似,切记......
  • elasticsearch+filebeat+kafka+kibana——filbeat篇章——overview
    filbeat篇章——overviewhttps://www.elastic.co/guide/en/beats/filebeat/8.7/filebeat-overview.html#filebeat-overview Filebeatisalightweightshipperforforwardingandcentralizinglogdata.Installedasanagentonyourservers,Filebeatmonitorsthelog......
  • kafka设计理念解析
    一.引言kafka是广泛使用的流处理组件,我们知道怎么使用它,也知道它的实现原理。但是更重要的部分是它的设计理念,即kafka设计者当时是如何考量各种方案的,了解这些,对提升我们的设计能力非常有帮助。二.动机我们将Kafka设计为一个统一平台,来处理大型公司可能拥有的所有实时数据流......
  • SpringBoot 文件打包zip,浏览器下载出去
    本地文件打包@GetMapping("/downloadZip")publicvoiddownloadZip(HttpServletResponseresponse)throwsIOException{try{response.setContentType("application/octet-stream");response.setHeader("......
  • Git 将代码推送到多个远程仓库
    如果使用Git管理代码(例如使用git命令或GitGUI客户端),可以将代码推送到多个远程仓库,包括Gitee和GitHub。具体做法是:首先在GitHub上创建一个新的空仓库。将GitHub仓库的URL添加为一个新的远程仓库,可以为这个远程仓库指定一个别名如github。例如,如果你的GitH......
  • SpringBoot+React 前后端分离
    SpringBoot+React前后端分离写个转发数据的小工具,本来只想开个SpringBoot服务带个页面,但感觉有点难受,正好之前研究了React,尝试一下前后端分离。后端简单用SpringBoot起个服务,写个接口处理请求:@RestController@RequestMapping("/data")publicclassDataController{......
  • 基于SpringBoot+Vue的音乐网站
    本次项目是基于SpringBoot+Vue的前后端分离项目,旨在熟练相关框架,掌握相关技术,拓展个人知识面。音乐来源:本地用户页面:Web项目亮点:根据歌词、音乐旋律、定位时间线(老师的意见)确定好方向,开始项目、收集资料、准备相关的开发环境和软件等。了解项目的结构与逻辑,确定基本功能,需求......
  • springboot~关于md5签名引发的问题
    事实是这样的,我有个接口,这个接口不能被篡改,于是想到了比较简单的md5对url地址参数进行加密,把这个密码当成是sign,然后服务端收到请求后,使用相同算法也生成sign,两个sign相同就正常没有被篡改过。问题的出现接口中的参数包括userId,extUserId,时间,其中extUserId字符编码,中间会有+......
  • springboot集成JWT token验证
    登录模式基于session登录基于session的登录(有回话状态),用户携带账号密码发送请求向服务器,服务器进行判断,成功后将用户信息放入session,用户发送请求判断session中是否有用户信息,有的话放行,没有的话进行拦截,但是考虑到时App产品,牵扯到要判断用户的session,需要sessionID,还要根据sess......
  • Springboot yml配置参数加密 ,jasypt自定义解密器
    原文链接:https://www.cnblogs.com/JCcccit/p/16868137.html前言 最近项目组开始关注一些敏感数据的明文相关的事宜,其实这些东西也是都有非常成熟的解决方案。既然最近着手去解决这些事情,那么也顺便给还未了解的大伙普及一下。Springbootyml配置参数数据加密(数据加密篇......