首页 > 其他分享 >【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

时间:2024-01-13 14:32:32浏览次数:24  
标签:join val 示例 flink currency new Array Types Euro




文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:时态表的join(scala版本)
  • 1)、统计需求对应的SQL
  • 2)、Without connnector 实现代码
  • 3)、With CSVConnector 实现代码



本文给以scala的语言给出来Table API 针对时态表的join操作。


本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)17、Flink 之Table API: Table API 支持的操作(2)

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:时态表的join(scala版本)

该示例来源于:https://developer.aliyun.com/article/679659

假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)_大数据

1)、统计需求对应的SQL

SELECT o.currency, o.amount, r.rate
  o.amount * r.rate AS yen_amount
FROM
  Orders AS o,
  LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

2)、Without connnector 实现代码

object TemporalTableJoinTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    env.setParallelism(1)
// 设置时间类型是 event-time  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 构造订单数据
    val ordersData = new mutable.MutableList[(Long, String, Timestamp)]
    ordersData.+=((2L, "Euro", new Timestamp(2L)))
    ordersData.+=((1L, "US Dollar", new Timestamp(3L)))
    ordersData.+=((50L, "Yen", new Timestamp(4L)))
    ordersData.+=((3L, "Euro", new Timestamp(5L)))

    //构造汇率数据
    val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]
    ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))
    ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))
    ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))
    ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))

// 进行订单表 event-time 的提取
    val orders = env
      .fromCollection(ordersData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]())
      .toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)

// 进行汇率表 event-time 的提取
    val ratesHistory = env
      .fromCollection(ratesHistoryData)
      .assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]())
      .toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)

// 注册订单表和汇率表
    tEnv.registerTable("Orders", orders)
    tEnv.registerTable("RatesHistory", ratesHistory)
    val tab = tEnv.scan("RatesHistory");
// 创建TemporalTableFunction
    val temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
//注册TemporalTableFunction
tEnv.registerFunction("Rates",temporalTableFunction)

    val SQLQuery =
      """
        |SELECT o.currency, o.amount, r.rate,
        |  o.amount * r.rate AS yen_amount
        |FROM
        |  Orders AS o,
        |  LATERAL TABLE (Rates(o.rowtime)) AS r
        |WHERE r.currency = o.currency
        |""".stripMargin

    tEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))

    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
    // 打印查询结果
    result.print()
    env.execute()
  }

}
  • OrderTimestampExtractor 实现如下
import java.SQL.Timestamp

import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time

class OrderTimestampExtractor[T1, T2]
  extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
    element._3.getTime
  }
}

3)、With CSVConnector 实现代码

在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。

1、genEventRatesHistorySource

def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

2、genRatesOrderSource

def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

3、主程序

import java.io.File

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.book.utils.{CommonUtils, FileUtils}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row

object CsvTableSourceUtils {

  def genWordCountSource: CsvTableSource = {
    val csvRecords = Seq(
      "words",
      "Hello Flink",
      "Hi, Apache Flink",
      "Apache FlinkBook"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("words"),
      Array(
        Types.STRING
      ),
      fieldDelim = "#",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  def genRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "rowtime ,currency   ,rate",
    "09:00:00   ,US Dollar  , 102",
    "09:00:00   ,Euro       , 114",
    "09:00:00  ,Yen        ,   1",
    "10:45:00   ,Euro       , 116",
    "11:15:00   ,Euro       , 119",
    "11:49:00   ,Pounds     , 108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("rowtime","currency","rate"),
      Array(
        Types.STRING,Types.STRING,Types.STRING
      ),
      fieldDelim = ",",
      rowDelim = "$",
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genEventRatesHistorySource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#rate",
      "1#US Dollar#102",
      "1#Euro#114",
      "1#Yen#1",
      "3#Euro#116",
      "5#Euro#119",
      "7#Pounds#108"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency","rate"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }

  def genRatesOrderSource: CsvTableSource = {

    val csvRecords = Seq(
      "ts#currency#amount",
      "2#Euro#10",
      "4#Euro#10"
    )
    // 测试数据写入临时文件
    val tempFilePath =
      FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")

    // 创建Source connector
    new CsvTableSource(
      tempFilePath,
      Array("ts","currency", "amount"),
      Array(
        Types.LONG,Types.STRING,Types.LONG
      ),
      fieldDelim = "#",
      rowDelim = CommonUtils.line,
      ignoreFirstLine = true,
      ignoreComments = "%"
    )
  }


  /**
    * Example:
    * genCsvSink(
    *   Array[String]("word", "count"),
    *   Array[TypeInformation[_] ](Types.STRING, Types.LONG))
    */
  def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)
  }

}

4、运行结果

【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)_flink 流批一体化_02

以上,本文给以scala的语言给出来Table API 针对时态表的join操作。

标签:join,val,示例,flink,currency,new,Array,Types,Euro
From: https://blog.51cto.com/alanchan2win/9232420

相关文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、
    文章目录Flink系列文章一、maven依赖二、示例:表的聚合操作1、示例代码公共部分2、groupby3、GroupByWindowAggregation4、OverWindowAggregation5、DistinctAggregation6、Distinct本文给出了关于表数据的聚合操作示例,比如groupby、distinct、以及groupby、over、distin......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以
    文章目录Flink系列文章一、maven依赖二、示例:表的join操作(内联接、外联接以及联接自定义函数等)本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、FlinkTableAPI支......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch
    文章目录Flink系列文章一、maven依赖二、表的OrderBy,Offset和Fetch操作三、表的insert操作本文介绍了表的OrderBy、Offset和Fetch、insert操作,以示例形式展示每个操作的结果。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、Flin......
  • 【CMake】5. 单项目多模块添加第三方依赖示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-mod-deps这里引入C++gRPC依赖,进行示例1.多模块工程+第三方依赖CMake多模块工程,这是一个示例工程simple-mod-deps,项目名称de......
  • 【CMake】3.单项目单模块添加第三方依赖包示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块-添加第三方依赖示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple-deps1.单模块工程+第三方依赖CMake单模块工程,这是一个示例工程simple-deps,项目名称cmake,第三方依赖demo......
  • 【CMake】2. 单项目单模块示例工程
    CMake示例工程代码https://github.com/LABELNET/cmake-simple单项目单模块示例工程https://github.com/LABELNET/cmake-simple/tree/main/simple1.单模块工程CMake单模块工程,这是一个示例工程simple,项目名称cmake,第三方依赖demo,主模块main2.目录结构$.SIMPLE......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersec
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • thinkphp6的join连表查询
    说明join要关联的(完整)表名以及别名,支持三种写法:写法1:['完整表名或者子查询'=>'别名']写法2:'完整表名别名'写法3:'不带数据表前缀的表名'condition关联条件,可以为字符串或数组,为数组时每一个元素都是一个关联条件。type关联类型,可以为:INNER、LEFT、RIGHT、FULL,......
  • 京东商品详情API实现实时数据获取的Java代码示例
    在电商行业中,商品详情页是用户了解商品信息、进行购买决策的重要页面。为了提高用户体验和促进销售,电商平台通常会提供商品详情的API接口,以便第三方应用能够实时获取商品数据。本文将介绍如何使用京东获得的JD商品详情API实现实时数据获取,并提供相应的Java代码示例。一、JD商品详......
  • 天拓四方5G边缘计算网关在工业领域的应用示例
    在工业领域,5G边缘计算网关的应用正在逐渐普及,为工业4.0和智能制造的实现提供了强大的技术支持。通过将数据分析和处理能力从中心节点转移至网络的边缘,5G边缘计算网关为工业生产带来了前所未有的机遇。本文将通过举例实际应用,探讨5G边缘计算网关在工业领域的具体应用。案例一:智能制......