首页 > 其他分享 >springboot整合kafka

springboot整合kafka

时间:2022-11-19 17:37:09浏览次数:51  
标签:springboot spring springframework kafka 整合 import msg org

1.引入kakka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.修改配置文件

# 应用名称
spring.application.name=springboot-kafka

# 应用服务 WEB 访问端口
server.port=8080

# 连接kafka集群
spring.kafka.bootstrap-servers=8.8.80.8:9092

#key value的序列化
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# key value 的反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费者组id
spring.kafka.consumer.group-id=kafka-consumer-group

3.创建生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/kafka")
    public String data(String msg){
        // topic:“kafka”,msg:发送的消息
        kafkaTemplate.send("kafka", msg);

        return "ok";
    }
}

4. 创建消费者

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
    // 监听topics = "kafka"主题
    @KafkaListener(topics = "kafka")
    public void consumerTopic(String msg){
        System.out.println("收到消息:" + msg);
    }
}

5. 启动服务并发送消息

http://localhost:8080/kafka?msg=spring-boot-kafka

标签:springboot,spring,springframework,kafka,整合,import,msg,org
From: https://www.cnblogs.com/huangdh/p/16906583.html

相关文章

  • Kafka硬件配置选择
    场景:100万日活(中型公司),每人每天100条日志,每天总共的日志条数是100万*100条=1亿条。其日志处理速度为:1亿条/24小时/60分/60秒≈1157条/每秒钟。 假设每条日志大小......
  • Springboot整合RabbitMQ基本使用
    1、依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2、rabbitmq链接配置......
  • 【Java】 Springboot+Vue 大文件断点续传
     同事在重构老系统的项目时用到了这种大文件上传 第一篇文章是简书的这个:https://www.jianshu.com/p/b59d7dee15a6 是夏大佬写的vue-uploader组件:https://www......
  • SpringBoot2.x 事务
    1.事务管理方式 事务管理对于企业应用来说是至关重要的,即使出现异常情况,它也可以保证数据的一致性。spring支持编程式事务管理和声明式事务管理两种方式。 编程......
  • springboot controller
    传值@RestControllerpublicclassHelloController{@GetMapping("/hello")publicStringhello(Stringnickname){return"我是帅哥"+nickname;}}http://l......
  • springboot热部署
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><optional>true</optional>......
  • Springboot 配置 Security流程
    装入依赖引入spring-sercurity<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-security</artifactId></dependen......
  • 十二天----->阿里云短信服务的整合:
    开通短信服务的整合一、新建短信微服务整合阿里云短信服务,注册时候发送手机验证码在service创建子模块service_msm创建包的结构,创建controller和service等结构......
  • SpringBoot整合JUnit
    这个不用自己整合,在创建好SpringBoot项目后在Test文件夹中就能找到,相关的依赖也导入进去了。这里只是进行一个详细的说明。  @SpringBootTest类型:测试类注解位置:测......
  • Kafka的使用
    一、启动Zookeeper服务在Windows系统中打开第1个cmd窗口,启动Zookeeper服务:>cdc:\kafka_2.12-2.4.0>.\bin\windows\zookeeper-server-start.bat.\config\zookeeper.......