首页 > 其他分享 >Flink核心API之DataSet

Flink核心API之DataSet

时间:2023-06-03 11:34:05浏览次数:40  
标签:val Flink DataSet second API ._ import 数据 first

DataSet API

DataSet API主要可以分为3块来分析:DataSource、Transformation、Sink。
DataSource是程序的数据源输入。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap、filter等操作。
DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

DataSet API之DataSource

针对DataSet批处理而言,其实最多的就是读取HDFS中的文件数据,所以在这里我们主要介绍两个DataSource组件。

  • 基于集合
    fromCollection(Collection),主要是为了方便测试使用。它的用法和DataStreamAPI中的用法一样,我们已经用过很多次了。
  • 基于文件
    readTextFile(path),读取hdfs中的数据文件。这个前面我们也使用过了。

DataSet API之Transformation

map             输入一个元素进行处理,返回一个元素
mapPartition    类似map,一次处理一个分区的数据
flatMap         输入一个元素进行处理,可以返回多个元素
filter          对数据进行过滤,符合条件的数据会被留下
reduce          对当前元素和上一次的结果进行聚合操作
aggregate       sum(),min(),max()等

这里面的算子我们都是比较熟悉的,在前面DatatreamAPI中都用过,用法都是一样的,所以在这就不再演示了
mapPartition这个算子我们在Flink中还没用过,不过在Spark 中是用过的,用法也是一样的
其实mapPartition就是一次处理一批数据,如果在处理数据的时候想要获取第三方资源连接,建议使用mapPartition,这样可以一批数据获取一次连接,提高性能。

下面来演示一下Flink中mapPartition的使用

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

/**
  * MapPartition的使用:一次处理一个分区的数据
  */
object BatchMapPartitionScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //生成数据源数据
    val text = env.fromCollection(Array("hello you", "hello me"))
    //每次处理一个分区的数据
    text.mapPartition(it => {
      //可以在此处创建数据库连接,建议把这块代码放到try-catch代码块中
      //注意:此时是每个分区获取一个数据库连接,不需要每处理一条数据就获取一次连接,
      val res = ListBuffer[String]()
      it.foreach(line => {
        val words = line.split(" ")
        for (word <- words) {
          res.append(word)
        }
      })
      res
      //关闭数据库连接
    }).print()
    //No new data sinks have been defined since the last execution.
    //The last execution refers to the latest call to 'execute()', 'count()',
    //注意:针对DataSetAPI,如果在后面调用的是count、collect、print,则最后不需要指
    //env.execute("BatchMapPartitionScala")
  }

}

下面还有一些transformation算子

distinct        返回数据集中去重之后的元素
join            内连接
outerJoin       外连接
cross           获取两个数据集的笛卡尔积
union           返回多个数据集的总和,数据类型需要一致
first-n         获取集合中的前N个元素
  • distinct算子比较简单,就是对数据进行全局去重。
  • join:内连接,可以连接两份数据集

下面来演示一下join的用法

import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * join:内连接
  */
object BatchJoinScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
    //对两份数据集执行join操作
    text1.join(text2)
      //注意:这里的where和equalTo实现了类似于on fieldA=fieldB的效果
      //where:指定左边数据集中参与比较的元素角标
      .where(0)
      //equalTo指定右边数据集中参与比较的元素角标
      .equalTo(0) { (first, second) => {
      //equalTo指定右边数据集中参与比较的元素角标
      (first._1, first._2, second._2)
    }
    }.print()
  }

}
  • outerJoin:外连接
import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * outerJoin:外连接
  * 一共有三种情况
  * 1:leftOuterJoin
  * 2:rightOuterJoin
  * 3:fullOuterJoin
  */
object BatchOuterJoinScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //初始化第一份数据 Tuple2<用户id,用户姓名>
    val text1 = env.fromCollection(Array((1, "jack"), (2, "tom"), (3, "mick")))
    //初始化第二份数据 Tuple2<用户id,用户所在城市>
    val text2 = env.fromCollection(Array((1, "bj"), (2, "sh"), (4, "gz")))
    //对两份数据集执行leftOuterJoin操作
    text1.leftOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:second中的元素可能为null
          if (second == null) {
            (first._1, first._2, "null")
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()
    println("========================================")
    //对两份数据集执行rightOuterJoin操作
    text1.rightOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:first中的元素可能为null
          if (first == null) {
            (second._1, "null", second._2)
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()
    println("========================================")
    //对两份数据集执行rightOuterJoin操作
    text1.fullOuterJoin(text2)
      .where(0)
      .equalTo(0) {
        (first, second) => {
          //注意:first和second中的元素都有可能为null
          if (first == null) {
            (second._1, "null", second._2)
          } else if (second == null) {
            (first._1, first._2, "null")
          } else {
            (first._1, first._2, second._2)
          }
        }
      }.print()
  }

}
  • cross:获取两个数据集的笛卡尔积
import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * cross:获取两个数据集的笛卡尔积
  */
object BatchCrossScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //初始化第一份数据
    val text1 = env.fromCollection(Array(1, 2))
    //初始化第二份数据
    val text2 = env.fromCollection(Array("a", "b"))
    //执行cross操作
    text1.cross(text2).print()
  }
}

union:返回两个数据集的总和,数据类型需要一致
和DataStreamAPI中的union操作功能一样
first-n:获取集合中的前N个元素

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable.ListBuffer

/**
  * first-n:获取集合中的前N个元素
  */
object BatchFirstNScala {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = ListBuffer[Tuple2[Int, String]]()
    data.append((2, "zs"))
    data.append((4, "ls"))
    data.append((3, "ww"))
    data.append((1, "aw"))
    data.append((1, "xw"))
    data.append((1, "mw"))
    import org.apache.flink.api.scala._
    //初始化数据
    val text = env.fromCollection(data)
    //获取前3条数据,按照数据插入的顺序
    text.first(3).print()
    println("==================================")
    //根据数据中的第一列进行分组,获取每组的前2个元素
    text.groupBy(0).first(2).print()
    println("==================================")
    //根据数据中的第一列分组,再根据第二列进行组内排序[倒序],获取每组的前2个元素
    //分组排序取TopN
    text.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print()
  }
}

DataSet API之DataSink

Flink针对DataSet提供了一些已经实现好的数据目的地
其中最常见的是向HDFS中写入数据
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自对象的toString()方法
还有一个是print:打印每个元素的toString()方法的值,测试时使用。

标签:val,Flink,DataSet,second,API,._,import,数据,first
From: https://www.cnblogs.com/strongmore/p/17377639.html

相关文章

  • Intersection Observer API 交叉观察器 API vue3 antd table 滚动加载 使用过程
    需求:表格滚动加载做法:步骤一:给表格最后一行添加特定标识,类名或者id等组件库https://www.antdv.com/components/table-cn#APIwebApihttps://developer.mozilla.org/zh-CN/docs/Web/API/Intersection_Observer_API组件名table添加类名的组件方法rowClassName使用例子::......
  • Flink核心API之Table API和SQL
    TableAPI&SQL注意:TableAPI和SQL现在还处于活跃开发阶段,还没有完全实现Flink中所有的特性。不是所有的[TableAPI,SQL]和[流,批]的组合都是支持的。TableAPI和SQL的由来:Flink针对标准的流处理和批处理提供了两种关系型API,TableAPI和SQL。TableAPI允许用户以一种很直......
  • Flink核心API之DataStream
    Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。低级API(StatefulStreamProcessing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在一些复杂事件处理逻辑上。核心API(DataStream/DataSetAP......
  • react配置API请求代理
    需求当请求http://10.1.1.1:3131/v1/*接口时,需要代理到8181端口。如果只需要代理匹配到/v1路径的请求,可以在package.json中使用http-proxy-middleware进行自定义代理配置。以下是一个示例:首先,确保已经安装了http-proxy-middleware包。如果没有安装,可以使用以下命令进行安......
  • Flink详解
    什么是FlinkApacheFlink是一个开源的分布式,高性能,高可用,准确的流处理框架。分布式:表示flink程序可以运行在很多台机器上,高性能:表示Flink处理性能比较高高可用:表示flink支持程序的自动重启机制。准确的:表示flink可以保证处理数据的准确性。Flink支持流处理和批处理,虽然我们......
  • Flink安装部署
    Flink集群安装部署Flink支持多种安装部署方式StandaloneONYARNMesos、Kubernetes、AWS…这些安装方式我们主要讲一下standalone和onyarn。如果是一个独立环境的话,可能会用到standalone集群模式。在生产环境下一般还是用onyarn这种模式比较多,因为这样可以综合利用集群......
  • CentOS 7.x安装微服务网关Apache APISIX
    阅读文本大概需要3分钟。    APISIX是一个云原生、高性能、可扩展的微服务API网关。它是基于OpenResty和etcd来实现,和传统API网关相比,APISIX具备动态路由和插件热加载,特别适合微服务体系下的API管理。APISIX通过插件机制,提供动态负载平衡、身份验证、限流限速等功能,并且......
  • 实时数据治理—当Atlas遇见Flink
    Atlas是Hadoop的数据治理和元数据框架。Atlas是一组可扩展和可扩展的核心基础治理服务,使企业能够有效,高效地满足Hadoop中的合规性要求,并允许与整个企业数据生态系统集成。ApacheAtlas为组织提供了开放的元数据管理和治理功能,以建立其数据资产的目录,对这些资产进行分类和治理,并为数......
  • .NET Core WebAPI 认证授权之JWT
    @@.NETCoreWebAPI认证授权之JWT--google from --->NETCoreWebAPI认证授权之JWT(二)  在上一篇 《.NET缓存系列(一):缓存入门》中实现了基本的缓存,接下来需要对缓存进行改进,解决一些存在的问题。一、缓存过期策略问 题:当源数据更改或删除时,服务器程序并不知道,导......
  • NET Core WebAPI 认证授权之JWT
    @@.netcoretoken非对称加密 --Google-->@@wepapi认证授权之jwt NETCoreWebAPI认证授权之JWT(二):HMAC算法实操  一、前言在上一篇 《.NETCoreWebAPI认证授权之JWT(一):JWT介绍》中讲到了JWT的组成,分为三部分,其中标头(header)和载荷(payload)都只是简单的将json......