首页 > 其他分享 >Flink如何使用DataStreamAPI消费Kafka

Flink如何使用DataStreamAPI消费Kafka

时间:2022-10-22 17:13:00浏览次数:85  
标签:flink 消费 提交 offsets Flink kafka DataStreamAPI OffsetsInitializer Kafka

1、到官网查询所在版本的依赖,导入pom.xml(在此用Flink1.13) 官网->教程->connectors->datastream->kafka

网址:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka_2.11</artifactId>
     <version>1.13.6</version>
 </dependency>

2.在此页面找到Kafka source 示例代码,将此代码填充至类中并将其具体参数修改即可

  //如果方法在返回值的位置声明了泛型,此时在调用这个方法时,需要在方法名前补充泛型KafkaSource.<String>builder()
         KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop102:9092") //集群地址,写一个也行,多个也行
                .setTopics("first")//消费的主题
                .setGroupId("my-group")//消费者组id
                 /*设置起始偏移量有以下几种情况:
                   1.从指定的位置消费:OffsetsInitializer.offsets(Map< TopicPartition, Long> offsets)
                   2.从最新位置消费(最后一条处):OffsetsInitializer.latest()
                   3.从最早位置消费(第一条处):OffsetsInitializer.earliest()
                   4.从上次提交的位置消费:OffsetsInitializer.committedOffsets()
                   5.新的组,从来没有提交过,再指定一个消费方式:                   OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)
                  */
                  .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//设置起始偏移量,也就是从哪里消费
                 //由于大多数情况下key列没有值,所以只设置value的反序列化器即可
                .setValueOnlyDeserializer(new SimpleStringSchema()) //消费必须设置的Key-value的反序列化器
                .build();
          //用设置好的组件获取source
         env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

 

需注意!

1、flink的kafkaSource默认是把消费的offsets提交到当前Task的状态中,并不会主动提交到kafka的——consumer_offsets中

所以,上述代码无论运行多少次消费的都是一样的内容,想要达到这次消费起始位置是上次消费的最后一条的情况

需要手动设置,把offsets提交到kafka一份

 //设置额外的消费者参数
 .setProperty("enable.auto.commit","true")//允许consumer自动提交offsets
 .setProperty("auto.commit.interval.ms","1000")//每次提交的时间间隔

2、Job重启时,如果开启了Checjpoint,默认从哪Checkpoint中获取之前提交的offsets

获取不到时,才会从kafka的_consumer_offsets中获取

标签:flink,消费,提交,offsets,Flink,kafka,DataStreamAPI,OffsetsInitializer,Kafka
From: https://www.cnblogs.com/CYan521/p/16816514.html

相关文章

  • 103-windows 安装kafka
    下载地址:http://mirrors.hust.edu.cn/apache/zookeeper/2)下载后解压到一个目录:eg:D:\Java\Tool\zookeeper-3.4.103)在zookeeper-3.4.10目录下,新建文件夹,并命名(......
  • flink入门学习
    一:为什么使用flink1.jdk实现流式处理packagenet.xdclass.app;importnet.xdclass.model.VideoOrder;importjava.util.Arrays;importjava.util.List;importjav......
  • kafka日常维护
     1.列出topics[yeemiao@elk1bin]$./kafka-topics.sh--zookeeper10.26.41.102:2181,10.26.41.60:2181,10.27.181.169:2181--list__consumer_offsetsbusiness-logsngi......
  • Kafka在centOS7下的安装
    单机模式#下载Kafka[root@kafka~]#wgethttps://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz#解压[root@kafka~]#tarzxfkafka_2.11-2.2.1.tgz[......
  • Kafka概览
    主要概念和术语Event(事件)是一个现实世界或业务中发生的事,也叫消息或记录,说白了就是你写入Kafka的消息。Event具有key、value和timestamp,这和其它的消息系统有点儿不一样。......
  • flink udaf demo
    flinkudafdemo之前一个小伙伴留言说想看TableAggregateFunction的例子吗?以及自定义函数如何使用sql的方式调用?FlinkSQL我都是用开发的sqlSubmit工具做的提交,很多......
  • 利用kafka自带的zookeeper搭建kafka集群
    利用kafka自带的zookeeper搭建kafka集群  搭建kafka集群是需要zookeeper的,可是kafka自身就已经带了一个zookeeper,所以不需要额外搭建zookeeper的集群,只需要将kafka自......
  • Helm部署Zookeeper+Kafka集群
    三、Helm部署Zookeeper集群3.1、helm准备#Helm客户端安装文档https://helm.sh/docs/intro/install/#添加bitnami和官方helm仓库:helmrepoaddbitnamihttps://cha......
  • Flink-CDC
    flink-cdcflink-cdc概述    flink-cdc    文档地址:https://ververica.github.io/flink-cdc-connectors/master/content/about.html#依赖    <dependency......
  • kafka随记
    一、概述1.定义传统定义:kafka是一个分布式的基于发布/订阅模式的消息队列最新定义:kafka是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集......