首页 > 其他分享 >模拟spring-kafka实现kafka的consumer监听

模拟spring-kafka实现kafka的consumer监听

时间:2022-12-29 22:13:41浏览次数:41  
标签:ConsumerConfig CONFIG spring kafka props put new consumer class

背景:因为某些原因,无法直接使用springboot提供的@KafkaListener,改为模拟springboot注解的方式搬过来实现

首先创建一个业务处理的service,这个service主要用于消费下来的消息的处理

@Service
public class MsgProcessService {
    public void consumeMsg(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
            System.out.println("receive msg:{}", records);         
            ack.acknowledge();        
    }
}

然后创建消费者的监听配置类,从afterPropertiesSet()开始执行,注解和常用的套路一样

@Configuration
@EnableKafka
public class KafkaClient implements InitializingBean, Closeable {

    @Resource
    private MsgProcessService msgProcessService;
    /**
     * 消费者监听者容器
     */
    private KafkaMessageListenerContainer<String, String> listenerContainer;

    @Override
    public void afterPropertiesSet() {
        new Thread(this::initListeners, "kafkaInitThread").start();
    }

    @Override
    public void close() {
        if (listenerContainer != null) {
            listenerContainer.stop();
        }
    }

    private void initListeners() throws NoSuchMethodException {
        //第一步:构造BatchMessagingMessageListenerAdaper
        Method method = MsgProcessService.class.getMethod("consumeMsg", List.class, Acknowledgment.class);
        InvocableHandlerMethod handlerMethod = new InvocableHandlerMethod(msgProcessService, method);
        handlerMethod.setMessageMethodArgumentResolvers(new HandlerMethodArgumentResolverComposite());
        BatchMessagingMessageListenerAdapter<String, String> messageListener = new BatchMessagingMessageListenerAdapter<>(msgProcessService, method);
        messageListener.setHandlerMethod(new HandlerAdapter(handlerMethod));          
        //第二步:构造ConsumerFactory
        DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory<>(buildConsumerConfigs());
        //第三步:构造ContainerProperties
        ContainerProperties containerProperties = new ContainerProperties("myTopic");
        containerProperties.setMessageListener(messageListener);
        containerProperties.setGroupId("group007");
        containerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL); 
        //第四步:构造KafkaMessageListenerContainer
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        this.listenerContainer = container;
        //第五步:启动container
        container.start();
    }

    public Map<String, Object> buildConsumerConfigs() {
        Map<String, Object> props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //从哪儿开始消费,默认为latest
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");       
        //每次只处理一条消息
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        //两次poll的最大时间间隔设置为5s
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,5000);
        //取消自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //设置数据key和value的序列化处理类
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return props;
    }

}

 

标签:ConsumerConfig,CONFIG,spring,kafka,props,put,new,consumer,class
From: https://www.cnblogs.com/yb38156/p/17013648.html

相关文章

  • [Spring] Spring 中bean的生命周期
    在平时的工作中,我们的很多项目都是利用Spring进行搭建的。最近有空,基于源码好好分析一下,Bean在Spring中的生命周期这里我们先写一个简单的小例子<?xmlversion="1.0"e......
  • Spring AOP源码(四):具体执行流程 - 责任链模式
    1、AOP动态代理的字节码文件1.1、代理对象class的核心伪代码1publicclassMathCalculator$$EnhancerBySpringCGLIB$$9bfe5203extendsMathCalculatorimplementsS......
  • Springoot - 整合MyBatis
    1.导入JDBC驱动因为我的是Mysql数据库版本是8.0.20导入对应版本的驱动即可<!--mysql依赖--><dependency><groupId>mysql</groupId>......
  • 从Spring中学到的【2】--容器类
    容器类我们在实际编码中,常常会遇到各种容器类,他们有时叫做POJO,有时又叫做DTO,VO,DO等,这些类只具有容器的作用,具有完全的get,set方法,作为信息载体,作数据传输用。其实,很多地......
  • SpringBoot - WebMvcConfigurer 配置类
    WebMvcConfigurer:1.publicvoidconfigurePathMatch(PathMatchConfigurerconfigurer)路径匹配规则一般不用修改2.publicvoidconfigureContentNegotiation(ContentNe......
  • SpringBoot - 内容协商机制
    1.内容协商机制根据客户端接收能力不同,SpringBoot返回不同媒体类型的数据比如:客户端Http请求Accept:application/xml则返回xml数据,客户端Http请求Accept:a......
  • Spring声明式事务配置管理方法
    事务配置首先在/WEB-INF/applicationContext.xml添加以下内容:<!--配置事务管理器--><beanid="transactionManager"class="org.springframework.orm.hibernate3.Hibernat......
  • spring aop学习记录文档
    ProxyFactory:https://www.cnblogs.com/5207/p/6055152.htmlProxyConfig:https://www.cnblogs.com/mayang2465/p/12141814.htmlProxyFactory:https://juejin.cn/post/704079......
  • SpringBoot 的属性配置文件
    0、概述本文内容会解答下面几个问题:1、SpringBoot默认配置文件的名称是什么?配置文件默认存放位置是什么?2、如何指定配置文件名称?如何指定配置文件存放位置?3、如何使用pro......
  • Spring源码:ConfigFileApplicationListener
    /**Copyright2012-2017theoriginalauthororauthors.**LicensedundertheApacheLicense,Version2.0(the"License");*youmaynotusethisfileexcept......