首页 > 其他分享 >71、Flink 的 Hybrid Source 详解

71、Flink 的 Hybrid Source 详解

时间:2024-07-17 09:58:25浏览次数:9  
标签:读取 Flink Hybrid Source build KafkaSource HybridSource

Hybrid Source
1.概述

Hybrid Source 解决了从异构数据源顺序读取输入以生成单个输入流的问题。

示例:从 S3 读取前几天的有界输入,然后使用 Kafka 的最新无界输入,当有界文件输入完成而不中断应用程序时 Hybrid Source 会从 FileSource 切换到 KafkaSource。

在 Hybrid Source 出现之前,需要创建一个具有多个源的拓扑结构,并由用户定义切换机制;使用 HybridSource 之后,从 DataStream API 的角度看,多个源在 Flink 作业图中显示为单个源。

需要依赖如下:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.19.0</version>
</dependency>
2.下一个源的起始位置

要在一个 Hybrid Source 中排列多个源,除最后一个源外的所有源都需要有界;因此通常需要为源分配一个开始和结束位置。

a)固定起始位置

示例:从文件中读取到预先确定的切换时间,然后继续从 Kafka 中读取,每个源都覆盖了预先已知的范围,可以像直接使用一样预先创建包含的源。

long switchTimestamp = ...; // derive from file input paths

FileSource<String> fileSource =
  FileSource.forRecordStreamFormat(new TextLineInputFormat(), Path.fromLocalFile(testDir)).build();

KafkaSource<String> kafkaSource =
          KafkaSource.<String>builder()
                  .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                  .build();

HybridSource<String> hybridSource =
          HybridSource.builder(fileSource)
                  .addSource(kafkaSource)
                  .build();
b)动态其实位置

示例:文件源需要读取的数据量很大,可能比下一个源可用的保留时间更长,切换需要在 “当前时间-X” 发生。

因此要将下一个源的启动时间设置为切换时间,需要从以前的文件枚举器中转移结束位置,以便通过实现 SourceFactory 来延迟构建KafkaSource。

注意:枚举器需要支持获取结束时间戳。

FileSource<String> fileSource = CustomFileSource.readTillOneDayFromLatest();

HybridSource<String> hybridSource =
    HybridSource.<String, CustomFileSplitEnumerator>builder(fileSource)
        .addSource(
            switchContext -> {
              CustomFileSplitEnumerator previousEnumerator =
                  switchContext.getPreviousEnumerator();
              
              // how to get timestamp depends on specific enumerator
              long switchTimestamp = previousEnumerator.getEndTimestamp();
              
              KafkaSource<String> kafkaSource =
                  KafkaSource.<String>builder()
                      .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1))
                      .build();
              
              return kafkaSource;
            },
            Boundedness.CONTINUOUS_UNBOUNDED)
        .build();

标签:读取,Flink,Hybrid,Source,build,KafkaSource,HybridSource
From: https://blog.csdn.net/m0_50186249/article/details/140486441

相关文章

  • 69、Flink 的 DataStream Connector 之 Kafka 连接器详解
    1.概述Flink提供了Kafka连接器使用精确一次(Exactly-once)的语义在Kafkatopic中读取和写入数据。目前还没有Flink1.19可用的连接器。2.KafkaSourcea)使用方法KafkaSource提供了构建类来创建KafkaSource的实例。以下代码片段展示了如何构建KafkaSource来消......
  • WPF ListBox's itemsource depend on another listbox's selecteditem
    //xaml<ListBoxGrid.Row="1"Grid.Column="0"ItemsSource="{Binding}"x:Name="countryLbx"DisplayMemberPath="CountryName"/><ListBoxGrid.Row="1"Grid.Column="1&......
  • 多源谱修复学习算法(Multi-source Spectral Repair Learning Algorithm, MSRL)
    多源谱修复学习算法(Multi-sourceSpectralRepairLearningAlgorithm,MSRL)是一种针对非完备多源数据的处理方法,旨在解决因数据缺失而导致的多源数据学习问题。非完备多源数据是指在数据采集过程中,由于各种原因(如数据源多样性带来的质量差异或数据获取能力限制),导致某些样......
  • 多源谱嵌入融合学习算法(Multi-source Spectral Embedding Fusion Learning Algorithm,
    多源谱嵌入融合学习算法(Multi-sourceSpectralEmbeddingFusionLearningAlgorithm,简称MSEF)是一种专门设计用于处理多源数据的高级学习方法,其目标是在不同数据源之间建立一致的表示,从而提高聚类性能和数据理解的全面性。这种算法的核心在于利用全局和局部谱嵌入的融合,以......
  • 【vue深入学习第2章】Vue.js 中的 Ajax 处理:vue-resource 库的深度解析
    Vue.js中的Ajax处理:vue-resource库的深度解析在现代前端开发中,Ajax请求是与后端进行数据交互的关键技术。Vue.js作为一个渐进式JavaScript框架,提供了多种方式来处理Ajax请求,其中vue-resource是一个较为常用的库。尽管vue-resource在Vue2.x之后不再是官方推荐的......
  • 【译】The danger of TaskCompletionSource class
    来自SergeyTepliakov的另一篇https://devblogs.microsoft.com/premier-developer/the-danger-of-taskcompletionsourcet-class/#comments当使用async/await时,如果您想手动控制任务的生存期,TaskCompletionSource<T>类是一个非常有用的工具。下面是TaskCompletionSource的一个......
  • Flink滚动滑动窗口的区别
    1滚动窗口(TumblingWindows)滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子......
  • User 'red' has exceeded the 'max_updates' resource (current value: 500)
    错误记录:User'red'hasexceededthe'max_updates'resource(currentvalue:500)错误原因:在mysql数据库的下有一个库为mysql,它其中有一个表为user这里面的纪录每一条都对应为一个mysql用户的授权。其中字段max_questionsmax_updatesmax_connections分别记录着最大查询次......
  • Windows11系统System.Resources.Writer.dll文件丢失问题
    其实很多用户玩单机游戏或者安装软件的时候就出现过这种问题,如果是新手第一时间会认为是软件或游戏出错了,其实并不是这样,其主要原因就是你电脑系统的该dll文件丢失了或没有安装一些系统软件平台所需要的动态链接库,这时你可以下载这个System.Resources.Writer.dll文件(挑选合适......
  • Vue.js Ajax(vue-resource)
     Vue要实现异步加载需要使用到vue-resource库。Vue.js2.0版本推荐使用 axios 来完成ajax请求。<scriptsrc="https://cdn.staticfile.org/vue-resource/1.5.1/vue-resource.min.js"></script>Get请求以下是一个简单的Get请求实例,请求地址是一个简单的txt文......