首页 > 编程语言 >kafka消费消息-java版-demo

kafka消费消息-java版-demo

时间:2023-03-21 18:01:58浏览次数:39  
标签:java demo kafka auto put poll consumer properties


@SpringBootApplication
public class CcApplication {

public static void main(String[] args) {
SpringApplication.run(CcApplication.class, args);

/**
* 主要参数:
* 1.bootstrap.servers,group.id,key.deserializer,value.deserializer
* 2.session.timeout.ms coordinator检测失败的时间,设置为比较小的值
* 3. max.poll.interval.ms consumer处理逻辑最大时间
* 4. auto.offset.reset [earliest,lastest,none]
* 5. enable.auto.commit 是否自动提交位移,设置为false,由用户自行提交位移
* 6. fetch.max.bytes 指定consumer单次获取数据的最大字节数
* 7. max.poll.records 单次调用poll的最大返回消息数,默认500
* 8. heartbeat.interval.ms 越小越好
* 9. connections.max.idle.ms Kafka定期关闭空闲Socket的时间
*/
String topic = "ty_analysis";
String groupId = "analysis";

Properties properties = new Properties();
properties.put("bootstrap.servers","172.16.9.10:9092");
//必须指定有业务意义的名字
properties.put("group.id",groupId);
properties.put("enable.auto.commit","true");
properties.put("auto.commit.interval.ms","1000");
//从最早的消息开始读取
properties.put("auto.offset.reset","earliest");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//订阅主题,可以订阅多个主题,还可以使用正则表达式订阅主题
//注意:多次订阅,会覆盖前面的
consumer.subscribe(Arrays.asList(topic));

try{
while(true){
//1000是超时设定,如果有定时要求,可设置,否则建议设置个比较大的值
//通常consumer拿到足够多的数据,会立即返回,否则会阻塞
//poll返回则认为是成功消费了消息,如果发现消费慢需要分析是poll慢还是本身业务逻辑处理慢
ConsumerRecords<String,String> records = consumer.poll(1000);
for(ConsumerRecord<String,String> record : records){
System.out.printf("offset=%d, key=%s,value= %s%n",record.offset(),record.key(),record.value());
}
}
}finally {
consumer.close();
}
}

}
# 主要的pom依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>

标签:java,demo,kafka,auto,put,poll,consumer,properties
From: https://blog.51cto.com/u_16021118/6140753

相关文章

  • Java SPI机制详解
    一、什么是SPI机制1、SPI(ServiceProviderInterface),是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要被框架的开发人员使用,比如Java.sql.Driver接口......
  • javaweb-Cookie、Kaptcha、正则表达式
    资料来源于:B站尚硅谷JavaWeb教程(全新技术栈,全程实战),本人才疏学浅,记录笔记以供日后回顾由于是多个视频内容混合在一起,因此只放了第一个链接视频链接知识点1.Cook......
  • java 中的intern()方法
    https://www.bilibili.com/video/BV1PJ411n7xZ?p=127&vd_source=d52fb7546f3e6962911bc7cc32990c21           前言最近遇到一个Intern()方......
  • 在javascript的文章中sink指什么?
    在介绍xss的英文文章中常看到sink这个词语。查了下:AsinkisapotentiallydangerousJavaScriptfunctionthatcancausedundesirableeffectsifattackercontrolle......
  • Java中的字符串是常量
    Java中的字符串是常量publicclassStringtest{ publicstaticvoidmain(String[]args){ Strings1="hello"; Strings2="world"; Strings3="hello"......
  • Java ThreadPoolTaskExecutor 线程池的常见问题
    JavaThreadPoolTaskExecutor线程池的常见问题 https://blog.csdn.net/weixin_43611528/article/details/123083314 重要参数corePoolSize:核心线程数,常开的线程数,默......
  • algrothm_java命名
    ......
  • algrothm_ Demo
    ......
  • corejava_基础
    首先来个问题:什么是对象?     宽泛:可以用形容词修饰的名词都称作对象     程序:具有什么(属性)和能做什么(功能)的特点称作对象对于知识点的理解:   ......
  • 4-springboot多数据源配置报错Cause: java.lang.IllegalArgumentException: jdbcUrl i
    springboot2.0版本以上的多数据源配置改成:spring.datasource.refunddb.url=jdbc:mysql://refund地址spring.datasource.refunddb.username=uatspring.datasource.refundd......