首页 > 其他分享 >kafka代码实践

kafka代码实践

时间:2023-10-29 15:11:29浏览次数:53  
标签:代码 实践 private kafka props put CONFIG public

安装kafka:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka配置:

在实际开发中,会有多种不同的消息,服务器也不一定一样。需要根据不同的需求,进行不同的配置。

  • KafkaConfig:

kafka 配置类。如下:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${bootstrap.servers:127.0.0.1:9092}")
    private String servers;

    @Value("${batch.size:16384}")
    private Integer batchSize;

    @Value("${buffer.memory:33554432}")
    private Integer bufferMemory;

    @Value("${group.id:myGroup}")
    private String consumerGroupId;

    @Value("${auto.commit.interval.ms:100}")
    private String commitInterval;

    @Value("${session.timeout.ms:15000}")
    private String sessionTimeout;


    /**
     * 想直接操作kafka发送消息可以用 kafkaTemplateService 注入
     */
    @Bean("kafkaTemplateService")
    public KafkaTemplate<String, String> kafkaTemplateService() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     * 生产者 factory
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * 生产者配置
     * @return
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        //服务器ip和端口,多个用逗号隔开
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //批量处理个数
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        //序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * 消费者 factory
     *
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    /**
     * 消费者配置
     *
     */
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //消费者群组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    /**
     * kafka监听器工厂,使用 @KafkaListener 注解时可以指定,比如  containerFactory = "kafkaListenerContainerFactory"
     *
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(5);
        return factory;
    }


}

生产者代码:

  • bean对象:
public class MyMsg {

    private String id;
    
    private String name;
    
	//忽略getter、setter
}
  • KafkaConfigProducerService:
@Service
public class KafkaConfigProducerService {

    @Resource(name = "kafkaTemplateService")
    private KafkaTemplate<String, String> kafkaTemplateService;


    public void send()  {
        MyMsg myMsg = new MyMsg();
        myMsg.setName("xu");
        myMsg.setId("5678");

        //发送消息
        kafkaTemplateService.send("myTopic", JSON.toJSONString(myMsg));

    }

}

消费者代码:

@Service
public class KafkaConfigConsumerService {


    /**
     * Kafka监听器,可以监听消息。
     * 指定需要监听的 kafka 主题 topics,可以是多个topic.
     * 指定消费者群组 groupId,可以不写.
     * 消费者的配置,在对应的 KafkaConfig类的 containerFactory 里面。
     *
     */
    @KafkaListener(containerFactory = "kafkaListenerContainerFactory",
            topics = {"myTopic"} )
    public void consume(ConsumerRecord<String, String> consumerRecord)  {
        System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());
        System.out.println("偏移量:" +  consumerRecord.offset());

    }


}

测试结果 :

调用生产者发送消息,消费者成功接收到消息,类似如下:

消费者接收到信息,内容为:{"id":"5678","name":"xu"}
偏移量:1

标签:代码,实践,private,kafka,props,put,CONFIG,public
From: https://www.cnblogs.com/expiator/p/17795897.html

相关文章

  • kafka代码示例
    安装kafka:Windows安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851Linux安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353添加依赖包:<dependency><groupId>org.springframework.k......
  • c语言代码练习41
    问:实现在另一个数组中查找子字符串#define_CRT_SECURE_NO_WARNINGS1#include<stdio.h>#include<assert.h>#include<string.h>intmain(){char*p1="abcdefgdef";char*p2="def";char*ret=strstr(p1,p2);if(r......
  • Java 静态代码块、代码块、构造方法和多态继承的代码执行顺序
    测试代码importlombok.Getter;publicclassExecutionOrder{{System.out.println("ExecutionOrdercode0");}static{System.out.println("ExecutionOrderstaticcode");}{System.out.println(&......
  • 无代码平台的表单平台 JAVA开源项目 毕业设计
    https://gf.bilibili.com/item/detail/1104045029为了帮助小白入门Java,博主录制了本项目配套的《项目手把手启动教程》,希望能给同学们带来帮助。一、摘要基于Vue+SpringBoot+MySQL的无代码平台的表单平台,包括了系统数据中心模块,用来存放管理系统通用的模块,另外分别设计了动态类型......
  • XenDesktop 7.15 LTSR交付桌面和应用实践
    名称IP组件ops192.168.0.218sr、xcenterdc192.168.0.210/10.0.0.1ad、dns、dhcp、实验lan网关ddc10.0.0.2dc、licens、studio、storefontpvs10.0.0.3pvs服务、pvs控制台windows10/serverappdhcpvda、xenservertools、receiverwin10-pvsdh......
  • 安信可小安派AiPi 代码下载
    安信可小安派AiPi代码下载笔记记录AiPi代码下载(直接使用命令行操作,仅需要Type-C接口线即可)在完成环境搭建,和代码编写前提下,使用Type-C接口线下载代码,当然可以自己使用usb-ttl串口线下载程序,但是感觉麻烦,没有直接一根线舒服。以大佬的基于小安派AiPi-Eye-S1的小霸王工程代码为......
  • JavaScript代码,鼠标放上去显示一张图片
     <!DOCTYPEhtml> <html>  <head>    <metacharset="utf-8">    <title>FirstC</title>     </head>  <body>    <h1 >helloworld</h1>    <inputtype=&q......
  • 深入理解Java IO流: 包括字节流和字符流的用法、文件读写实践
    (文章目录)......
  • 代码随想录第四天 | 24. 两两交换链表中的节点 19.删除链表的倒数第N个节点 面试题
    question1:SwapNodesinPairshttps://leetcode.cn/problems/swap-nodes-in-pairs/IwasalittleconfusedatfirstbecauseI'mthinkingwhethershouldIcreatanewhead,butsoonIcameupwiththeideaofcreatpre=Noneandwithan'if-els......
  • 学习笔记7+代码
    一、苏格拉底挑战二、遇见的问题三、实践和代码代码:#include<stdio.h>#include<pthread.h>//线程函数,接受一个void*参数,返回一个void*指针void*thread_function(void*arg){intthread_arg=*((int*)arg);printf("Threadreceivedargument......