首页 > 其他分享 >Spring整合Kafaka(二十六)

Spring整合Kafaka(二十六)

时间:2022-10-28 22:14:02浏览次数:40  
标签:二十六 Spring springframework kafka Kafaka import test org consumer

1 引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2 配置Kafka

配置server、consumer
默认组id在config/consumer.properties文件下

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092

# consumer group id
group.id=test-consumer-group

application.yml

spring:
  # kafaka
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: test-consumer-group
      enable-auto-commit: true # 自动提交
      auto-commit-interval: 3000 # 自动提交的频率(ms)

3 访问Kafaka

生产者主动发送消息,消费者被动处理消息。

KafkaTests.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@SpringBootTest
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

// 生产者
@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}

// 消费者
@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

标签:二十六,Spring,springframework,kafka,Kafaka,import,test,org,consumer
From: https://www.cnblogs.com/dalelee/p/16837582.html

相关文章

  • Spring Boot
    SpringBootSpringBoot基础SpringBoot简介SpringBoot是由Pivotal团队提供的全新框架,其设计目的是用来简化Spring应用的初始搭建以及开发过程,简单讲就是牺牲项目的自由......
  • Spring
    SpringSpring简介Spring是什么?Spring是分层的JavaSE/EE应用full-stack轻量级开源框架,以IoC(InverseOfControl:反转控制)和AOP(AspectOrientedProgramming:面向切面编......
  • Spring 注解开发
    使用Spring注解前的配置<?xmlversion="1.0"encoding="UTF8"?><beansxmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/20......
  • Kafaka安装与配置(二十五)
    1.简介Kafka是一个分布式的流媒体平台。Kafka可以应用于消息系统、日志收集、用户行为追踪、流式处理等多种场景。Kafka具有高吞吐量、消息持久化、高可靠性、高扩展性......
  • 16. Spring的基本使用
    一、引入Spring的依赖  通过maven的方式导入jar包,打包方式设置为jar包即可。<!--https://mvnrepository.com/artifact/org.springframework/spring-context-->......
  • springboot启动默认连接数据库问题
    这里是给新手介绍的springboot默认启动是连接数据库的经常很多人蒙蔽,//无数据源启动@SpringBootApplication(exclude={DataSourceAutoConfiguration.class,DataSo......
  • springboot像springnvc那样访问视图
    1.SpringBoot访问静态资源的位置(优先级按以下顺序)classpath默认就是resources,所以classpath:/static/就是resources/static/classpath:/META‐INF/resources/cl......
  • Spring Boot 学习笔记
    SpringBoot简介为什么要使用SpringBoot因为Spring,SpringMVC需要使用的大量的配置文件(xml文件)还需要配置各种对象,把使用的对象放入到spring容器中才能使用对象......
  • 【SpringBoot】引入mybatis及连接Mysql数据库
    创建一个SpringBoot项目其他不赘叙了,引入MyBaties、MySql依赖编辑 创建mysql表CREATETABLEsp_users(`id`INTPRIMARYKEY,`username`VARCHAR(30),`age`INT);刚......
  • jwt+token,springsecurity认证方式总结
    基于redis的认证方式分析redis解决短信验证码时效性,以及使用token的方式判断是否登录的问题。(没用jwt)这里面使用两个拦截器的方式解决:1.给token有效期刷新2.判断用户......