首页 > 其他分享 >springboot集成kafka

springboot集成kafka

时间:2023-06-15 17:12:17浏览次数:38  
标签:集成 springboot kafka user apache org message public

  1. 导入spring-kafka依赖信息
点击查看代码
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>
2. 在resources下创建文件application.yml
点击查看代码
server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 消息生产者
点击查看代码
@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("wzh-topic","hello-kafka");
        return "ok";
    }
}
4. 消息消费者
点击查看代码
@Component
public class HelloListener {

    @KafkaListener(topics = "wzh-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }
    }
}

传递消息为对象
目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强。

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可。

步骤

  1. 发送消息
点击查看代码
@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);
    kafkaTemplate.send("user-topic", JSON.toJSONString(user));
    return "ok";
}
2. 接收消息
点击查看代码
@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }
    }
}

标签:集成,springboot,kafka,user,apache,org,message,public
From: https://www.cnblogs.com/wzh-Official/p/17483447.html

相关文章

  • Spring boot集成Redis实现sessions共享时,sessions过期时间问题分析
    Springboot鼓励零配置的方式,帮你做好大部分重复劳动的事,好到不能再好;具体的Redis安装方法和Springboot集成Redis方法,可以去搜索相关文章或参考该文。 当做用户权限管理时,一般都设置一个session过期时间,以确保用户长时间不操作时自动退出系统。在springboot中设置該值的方法如下: 1......
  • springboot文件上传
    前言需要实现文件上传功能需要MultipartResolver接口的实现类的实例,这样才可以实现文件上传而MultipartResolver有两个实现类:StandardServletMultipartResolver:springboot中以自动配置完成,直接使用即可CommonsMultipartResolver:springmvc中使用的是CommonsMultipartReso......
  • kafka消费模式
    消费者消费方式:订阅与分配1、KafkaConsumer.subscribe():为consumer自动分配partition,有内部算法保证topic-partition以最优的方式均匀分配给相同group下的不同consumer。2、KafkaConsumer.assign():为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当与......
  • Kafka概述
    定义Kafka传统定义:Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。Kafka最新定义:Kafka是一个开源的分布式事件......
  • Kafka安装
    环境说明在安装Kafka之前,请确保已经安装了JDK和Zookeeper。运行Kafka,首先保证Java环境能正常使用,可执java-version查看。安装JDK环境下载jdk安装包curlhttps://download.oracle.com/java/20/latest/jdk-20_linux-x64_bin.tar.gz-ojdk-20_linux-x64_bin.tar.gz新建JDK......
  • DBeaver Ultimate Edtion 23.1 Multilingual (macOS, Linux, Windows) - 通用数据库工
    DBeaverUltimateEdtion23.1Multilingual(macOS,Linux,Windows)-通用数据库工具,现已集成ChatGPTOnetoolforalldatasources请访问原文链接:https://sysin.org/blog/dbeaver-23/,查看最新版。原创作品,转载请保留出处。作者主页:sysin.org通用数据库工具DBeaver是......
  • springboot 自定义listener 添加环境变量。 抄的springboot项目去掉了一些不用的ja
    1.自定义listener实现 ApplicationListener<ApplicationEnvironmentPreparedEvent>,Ordered(如果要设置优先级可以实现Ordered接口,注意order值越小优先级越高)publicclassMyListenerimplementsApplicationListener<ApplicationEnvironmentPreparedEvent>,Ordered{......
  • ajax请求springboot
    老是忘记ajax请求格式,记录一下吧后面看自己的functionsave(){varURL="/reconciliation/wzglWzMaterilApplicationDetail/saveSupplier";varcc=JSON.stringify({data:"world",p:{pag:0,pageS......
  • springboot项目启动失败之 org.springframework.boot.env.OriginTrackedYamlLoader.cr
    1、检查一下父项目的module的依赖是否,以及版本是否一致。例如<dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>1.30</version></dependency>2、如何知道自己的org.yaml的版本,可以双击Shift键,输入如图所示,注意划红线的地方就是......
  • mysql和neo4j集成多数据源和事务
    在微服务大行其道的今天,按理说不应该有多数据源这种问题(嗯,主从库算是一个多数据源的很常见的场景。),但是也没人规定不能这样做。就算有人规定的,曾经被奉为圭臬的数据库三大范式现在被宽表冲得七零八落,在很多场景下,其实是鼓励建立冗余字段的。话说项目中需要用到图数据库,我们选用......