首页 > 其他分享 >kafka代码示例

kafka代码示例

时间:2023-10-29 14:44:06浏览次数:43  
标签:示例 spring 代码 kafka 发送 org consumer public

安装kafka:

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka配置:

在 application.properties 添加以下配置:

### kafka生产者
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### kafka消费者
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

生产者代码:

  • KafkaProducerService :

生产者发送消息。

@Component
public class KafkaProducerService {


    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息,处理回调。
     * 在发送消息时会自动创建你设置的 topic。
     *
     */
    public void send()  {
        MyMsg myMsg = new MyMsg();
        myMsg.setName("lin");
        myMsg.setId("1234");

        //发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("myTopic1", "key", JSON.toJSONString(myMsg));
        //处理回调的结果,比如消息发送失败的处理。如果不需要回调,也可以不处理。
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送失败." + ex);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                ProducerRecord<String, String> producerRecord = result.getProducerRecord();
                RecordMetadata recordMetadata = result.getRecordMetadata();
                System.out.println("消息发送成功.producerRecord:"+ JSON.toJSONString(producerRecord)
                                + ",recordMetadata:" + JSON.toJSONString(recordMetadata));

            }

        });

    }


}

  • 调用生产者发送消息:
@RestController
@RequestMapping("/")
public class KafkaController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping(value = "/kafka/send")
    public void send()  {
        kafkaProducerService.send();
    }

}

消费者代码:

  • KafkaConsumerService:
@Component
public class KafkaConsumerService {


    /**
     * Kafka监听器,可以监听消息。
     * 指定需要监听的 kafka 主题 topics,可以是多个topic.
     * 指定消费者群组 groupId,可以不写.
     *
     */
    @KafkaListener( topics = {"myTopic1"} , groupId ="myGroup")
    public void consume(ConsumerRecord<String, String> consumerRecord)  {
        System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());
        System.out.println("偏移量:" +  consumerRecord.offset());

    }


}

测试结果 :

调用生产者发送消息,消费者成功接收到消息,类似如下:

消费者接收到信息,内容为:{"id":"1234","name":"lin"}
偏移量:19

标签:示例,spring,代码,kafka,发送,org,consumer,public
From: https://www.cnblogs.com/expiator/p/17795875.html

相关文章

  • 【RuoYi移动端】HbuilderX实现底部弹窗示例
    一、单选样式弹窗选择1、View页面代码<uni-popupref="textBox"type="bottom"> <viewclass="select_box"> <viewclass="select_row"v-for="(item,index)instatus"@click="selectClick(item.id)"&g......
  • c语言代码练习41
    问:实现在另一个数组中查找子字符串#define_CRT_SECURE_NO_WARNINGS1#include<stdio.h>#include<assert.h>#include<string.h>intmain(){char*p1="abcdefgdef";char*p2="def";char*ret=strstr(p1,p2);if(r......
  • idea 插件 checkstyle 规则示例和说明
    idea安装插件idea配置插件checkstyle.xml示例和说明<?xmlversion="1.0"?><!DOCTYPEmodulePUBLIC"-//Checkstyle//DTDCheckstyleConfiguration1.3//EN""https://checkstyle.org/dtds/configuration_1_3.dtd"><m......
  • TouchGFX界面开发 | 图像控件应用示例
    图像控件应用示例TouchGFX中的图像会绘制关联图像文件中的像素数据。使用图像文件前,必须将其导入到项目中。TouchGFXDesigner内置了五种类型的图像部件:固定图像:图像大小是由关联的图像文件定义的,不能在运行时改动。若要将图像显示为不同大小,需调整导入图像的大小缩放图像:能够绘制......
  • Java 静态代码块、代码块、构造方法和多态继承的代码执行顺序
    测试代码importlombok.Getter;publicclassExecutionOrder{{System.out.println("ExecutionOrdercode0");}static{System.out.println("ExecutionOrderstaticcode");}{System.out.println(&......
  • 无代码平台的表单平台 JAVA开源项目 毕业设计
    https://gf.bilibili.com/item/detail/1104045029为了帮助小白入门Java,博主录制了本项目配套的《项目手把手启动教程》,希望能给同学们带来帮助。一、摘要基于Vue+SpringBoot+MySQL的无代码平台的表单平台,包括了系统数据中心模块,用来存放管理系统通用的模块,另外分别设计了动态类型......
  • 安信可小安派AiPi 代码下载
    安信可小安派AiPi代码下载笔记记录AiPi代码下载(直接使用命令行操作,仅需要Type-C接口线即可)在完成环境搭建,和代码编写前提下,使用Type-C接口线下载代码,当然可以自己使用usb-ttl串口线下载程序,但是感觉麻烦,没有直接一根线舒服。以大佬的基于小安派AiPi-Eye-S1的小霸王工程代码为......
  • JavaScript代码,鼠标放上去显示一张图片
     <!DOCTYPEhtml> <html>  <head>    <metacharset="utf-8">    <title>FirstC</title>     </head>  <body>    <h1 >helloworld</h1>    <inputtype=&q......
  • 代码随想录第四天 | 24. 两两交换链表中的节点 19.删除链表的倒数第N个节点 面试题
    question1:SwapNodesinPairshttps://leetcode.cn/problems/swap-nodes-in-pairs/IwasalittleconfusedatfirstbecauseI'mthinkingwhethershouldIcreatanewhead,butsoonIcameupwiththeideaofcreatpre=Noneandwithan'if-els......
  • 学习笔记7+代码
    一、苏格拉底挑战二、遇见的问题三、实践和代码代码:#include<stdio.h>#include<pthread.h>//线程函数,接受一个void*参数,返回一个void*指针void*thread_function(void*arg){intthread_arg=*((int*)arg);printf("Threadreceivedargument......