首页 > 编程语言 >java——spring boot集成kafka——spring boot集成kafka

java——spring boot集成kafka——spring boot集成kafka

时间:2023-04-02 14:22:56浏览次数:38  
标签:监听器 spring boot ListenerConsumer kafka 提交 org

引入依赖:

 

 

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
</dependency>

 

 

 

 

 

 

 

 

 

 

编写配置文件:

 

 

 

 

erver:
port: 8080
spring:
kafka:
bootstrap-servers: 172.16.253.21:9093
producer: # ⽣产者
retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码⽅式
key-serializer:
org.apache.kafka.common.serialization.StringSerializer
value-serializer:
org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer:
org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:
org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500
listener:
# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上
次提交时间⼤于TIME时提交
# TIME
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理
record数量⼤于等于COUNT时提交
# COUNT
# TIME | COUNT 有⼀个条件满⾜时提交
# COUNT_TIME
# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调
⽤Acknowledgment.acknowledge()后提交
# MANUAL
# ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 172.16.253.21

 

 

 

 

 

 

 

 

 

 

生产者:

 

 

@RestController
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public void send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
}
}

 

 

 

 

 

 

 

 

 

消费者:

 

 

 

 

@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}

 

 

 

 

 

 

 

 

 

 

 

 

 

==========================================================

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:监听器,spring,boot,ListenerConsumer,kafka,提交,org
From: https://www.cnblogs.com/xiaobaibailongma/p/17280401.html

相关文章

  • SpringBoot的@Async注释的用法并例子
    在SpringBoot中,@Async注解用于将一个方法标记为异步执行的方法。使用该注解的方法将在一个单独的线程中异步执行,而不会阻塞调用线程。这对于处理需要长时间运行的任务或需要异步处理的任务非常有用。下面是一个简单的示例:@ServicepublicclassMyService{@Asyncpu......
  • springboot-监听器
    监听器ApplicationListener可以实现这个接口时传入对应的监听器,用于监听该事件比如:实现ApplicationListener<ContextRefreshedEvent>接口,重写onApplicationEvent方法,将ContextRefreshedEvent对象传进去。如果我们想在加载或刷新应用上下文时,也重新刷新下我们预加载的资源......
  • SpringBoot中Mybatis的应用
    创建一个SpringBoot项目,然后如下操作:(1)添加Lombok插件--简化实体类 (2)添加Mybatis框架和MySQL驱动---访问Mysql和使用Mybatis (3)创建pojo包和实体类在项目中创建pojo包,并在其中创建实体类。实体类上可以使用Lombok注解。首次使用时,需要在Ieda中安装该插件。L......
  • SpringBoot定时任务
    使用注解@Scheduled1.在启动类上添加注解@EnableScheduling开启定时任务2.创建定时任务@ComponentpublicclassStatisticsComp{/***日统计(每日0点1分触发)*/@Scheduled(cron="010?**")publicvoiddailyStatistics(){}/**......
  • 记录一: Spring Cloud Alibaba 2021.X 搭建
    一》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》》下载nacos  https://github.com/alibaba/nacos   nacos-server-2.0.3.zip  Windows版 解压后,数据库新建nacos库,将X:\nacos\conf目录下的 nacos-mysql.s......
  • kafka(java客户端)生产者消费者不能连接虚拟机kafka
    报错如下:...:localhost:9092...java.net.ConnectException:Connectionrefused:nofurtherinformationatsun.nio.ch.SocketChannelImpl.checkConnect(NativeMethod)atsun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)atorg.apac......
  • springboot-自己开发start
    步骤命名规范第三方在建立自己的Starter的时候命名规则统一用xxx-spring-boot-starter,官方提供的Starter统一命名方式为spring-boot-starter-xxx。步骤新建一个Maven项目,在pom.xml文件中定义好所需依赖;新建配置类,写好配置项和默认值,使用@ConfigurationProperties指明......
  • 这可能是最全面的Spring面试题总结了
    Spring是什么?Spring是一个轻量级的控制反转(IoC)和面向切面(AOP)的容器框架。Spring的优点通过控制反转和依赖注入实现松耦合。支持面向切面的编程,并且把应用业务逻辑和系统服务分开。通过切面和模板减少样板式代码。声明式事务的支持。可以从单调繁冗的事务管理代码中解脱......
  • vue+webSocket+springCloud消息推送交互
    一、后台代码:1、pom里面加上依赖;<!--webSocket坐标依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId><version>2.2.4.RE......
  • Spring Boot 集成 GRPC
    技术:SpringBoot2.0.5.RELEASE+Grpc1.15.0  运行环境:JDK1.8 概述:SpringBoot框架中集成Grpc服务详细一.背景SpringBoot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定......