首页 > 其他分享 >Flink核心API之DataStream

Flink核心API之DataStream

时间:2023-06-03 11:23:30浏览次数:53  
标签:DataStream flink String val Int 分区 Flink API 算子

image

Flink中提供了4种不同层次的API,每种API在简洁和易表达之间有自己的权衡,适用于不同的场景。目前上面3个会用得比较多。

  • 低级API(Stateful Stream Processing):提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在一些复杂事件处理逻辑上。
  • 核心API(DataStream/DataSet API):主要提供了针对流数据和批数据的处理,是对低级API进行了一些封装,提供了filter、sum、max、min等高级函数,简单易用,所以这些API在工作中应用还是比较广泛的。
  • Table API:一般与DataSet或者DataStream紧密关联,可以通过一个DataSet或DataStream创建出一个Table,然后再使用类似于filter, join,或者 select这种操作。最后还可以将一个Table对象转成DataSet或DataStream。
  • SQL:Flink的SQL底层是基于Apache Calcite,Apache Calcite实现了标准的SQL,使用起来比其他API更加灵活,因为可以直接使用SQL语句。Table API和SQL可以很容易地结合在一块使用,因为它们都返回Table对象。

针对这些API我们主要学习下面这些

image

DataStream API

DataStream API主要分为3块:DataSource、Transformation、DataSink。
DataSource是程序的输入数据源。
Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap和filter等操作。
DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

DataStream API之DataSoure

DataSource是程序的输入数据源,Flink提供了大量内置的DataSource,也支持自定义DataSource,不过目前Flink提供的这些已经足够我们正常使用了。
Flink提供的内置输入数据源:包括基于socket、基于Collection
还有就是Flink还提供了一批Connectors,可以实现读取第三方数据源,

Flink内置 Apache Bahir
Kafka ActiveMQ
Kinesis Streams Netty
RabbitMQ
NiFi
Twitter Streaming API
Google PubSub
  • Flink 内置:表示Flink中默认自带的。
  • Apache Bahir:表示需要添加这个依赖包之后才能使用的。

针对source的这些Connector,我们在实际工作中最常用的就是Kafka
当程序出现错误的时候,Flink的容错机制能恢复并继续运行程序,这种错误包括机器故障、网络故障、程序故障等
针对Flink提供的常用数据源接口,如果程序开启了checkpoint快照机制,Flink可以提供这些容错性保证

DataSource 容错保证 备注
Socket at most once
Collection exactly once
Kafka exactly once 需要使用0.10及以上版本

针对这些常用的DataSouce,基于socket的我们之前已经用过了,下面我们来看一下基于Collection集合的。
针对Kafka的这个我们在后面会详细分析,在这里先不讲。
由于我们后面还会学到批处理的功能,所以在项目里面创建几个包,把流处理和批处理的代码分开,后期看起来比较清晰。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 基于collection的source的使用
  * 注意:这个source的主要应用场景是模拟测试代码流程的时候使用
  */
object StreamCollectionSourceScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //使用collection集合生成DataStream
    val text = env.fromCollection(Array(1, 2, 3, 4, 5))
    text.print().setParallelism(1)
    env.execute("StreamCollectionSource")
  }

}

DataStream API之Transformation

transformation是Flink程序的计算算子,负责对数据进行处理,Flink提供了大量的算子,其实Flink中的大部分算子的使用和spark中算子的使用是一样的,下面我们来看一下:

map           输入一个元素进行处理,返回一个元素
flatMap       输入一个元素进行处理,可以返回多个元素
filter        对数据进行过滤,符合条件的数据会被留下
keyBy         根据key分组,相同key的数据会进入同一个分区
reduce        对当前元素和上一次的结果进行聚合操作
aggregations  sum(),min(),max()等

这里面的算子的用法其实和spark中对应算子的用法是一致的,这里面的map、flatmap、keyBy、reduce、sum这些算子我们都用过了。所以这里面的算子就不再单独演示了。

union           合并多个流,多个流的数据类型必须一致
connect         只能连接两个流,两个流的数据类型可以不同
split           根据规则把一个数据流切分为多个流
shuffle         随机分区
rebalance       对数据集进行再平衡,重分区,消除数据倾斜
rescale         重分区
partitionCustom 自定义分区

这里面的算子我们需要分析一下。

union:表示合并多个流,但是多个流的数据类型必须一致

多个流join之后,就变成了一个流
应用场景:多种数据源的数据类型一致,数据处理规则也一致

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 合并多个流,多个流的数据类型必须一致
  * 应用场景:多种数据源的数据类型一致,数据处理规则也一致
  */
object StreamUnionScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //第1份数据流
    val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
    //第2份数据流
    val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
    //合并流
    val unionStream = text1.union(text2)
    //打印流中的数据
    unionStream.print().setParallelism(1)
    env.execute("StreamUnionScala")
  }

}

connect:只能连接两个流,两个流的数据类型可以不同

两个流被connect之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
connect方法会返回connectedStream,在connectedStream中需要使用CoMap、CoFlatMap这种函数,类似于map和flatmap

import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 只能连接两个流,两个流的数据类型可以不同
  * 应用:可以将两种不同格式的数据统一成一种格式
  */
object StreamConnectScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    //第1份数据流
    val text1 = env.fromElements("user:tom,age:18")
    //第2份数据流
    val text2 = env.fromElements("user:jack_age:20")
    //连接两个流
    val connectStream = text1.connect(text2)
    connectStream.map(new CoMapFunction[String, String, String] {
      //处理第1份数据流中的数据
      override def map1(value: String): String = {
        value.replace(",", "-")
      }

      //处理第2份数据流中的数据
      override def map2(value: String): String = {
        value.replace("_", "-")
      }
    })
      .print()
      .setParallelism(1)
    env.execute("StreamConnectScala")
  }
}

split:根据规则把一个数据流切分为多个流

注意:split只能分一次流,切分出来的流不能继续分流

split需要和select配合使用,选择切分后的流
应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑

import java.{lang, util}

import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 根据规则把一个数据流切分为多个流
  * 注意:split只能分一次流,切分出来的流不能继续分流
  * split需要和select配合使用,选择切分后的流
  * 应用场景:将一份数据流切分为多份,便于针对每一份数据使用不同的处理逻辑
  */
object StreamSplitScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    //按照数据的奇偶性对数据进行分流
    val splitStream = text.split(new OutputSelector[Int] {
      override def select(value: Int): lang.Iterable[String] = {
        val list = new util.ArrayList[String]()
        if (value % 2 == 0) {
          list.add("even") //偶数
        } else {
          list.add("odd") //奇数
        }
        list
      }
    })
    //选择流
    val evenStream = splitStream.select("even")
    evenStream.print().setParallelism(1)
    //二次切分流会报错
    //Consecutive multiple splits are not supported. Splits are deprecated. Pl
    /*val lowHighStream = evenStream.split(new OutputSelector[Int] {
    override def select(value: Int): lang.Iterable[String] = {
    val list = new util.ArrayList[String]()
    if(value <= 5){
    list.add("low");
    }else{
    list.add("high")
    }
    list
    }
    })
    val lowStream = lowHighStream.select("low")
    lowStream.print().setParallelism(1)*/
    env.execute("StreamSplitScala")
  }
}

目前split切分的流无法进行二次切分,并且split方法已经标记为过时了,官方不推荐使用,现在官方推荐使用side output的方式实现。下面来看一下使用side output如何实现流的多次切分

import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

/**
  * 使用sideoutput切分流
  */
object StreamSideOutputScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    //按照数据的奇偶性对数据进行分流
    //首先定义两个sideoutput来准备保存切分出来的数据
    val outputTag1 = new OutputTag[Int]("even") {} //保存偶数
    val outputTag2 = new OutputTag[Int]("odd") {} //保存奇数
    //注意:process属于Flink中的低级api
    val outputStream = text.process(new ProcessFunction[Int, Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
        if (value % 2 == 0) {
          ctx.output(outputTag1, value)
        } else {
          ctx.output(outputTag2, value)
        }
      }
    })
    //获取偶数数据流
    val evenStream = outputStream.getSideOutput(outputTag1)
    //获取奇数数据流
    val oddStream = outputStream.getSideOutput(outputTag2)
    //evenStream.print().setParallelism(1)
    //对evenStream流进行二次切分
    val outputTag11 = new OutputTag[Int]("low") {} //保存小于等五5的数字
    val outputTag12 = new OutputTag[Int]("high") {} //保存大于5的数字
    val subOutputStream = evenStream.process(new ProcessFunction[Int, Int] {
      override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {
        if (value <= 5) {
          ctx.output(outputTag11, value)
        } else {
          ctx.output(outputTag12, value)
        }
      }
    })
    //获取小于等于5的数据流
    val lowStream = subOutputStream.getSideOutput(outputTag11)
    //获取大于5的数据流
    val highStream = subOutputStream.getSideOutput(outputTag12)
    lowStream.print().setParallelism(1)
    env.execute("StreamSideOutputScala")
  }

}

最后针对这几个算子总结一下:
首先是union和connect的区别,如图所示:

image

union可以连接多个流,最后汇总成一个流,流里面的数据使用相同的计算规则
connect值可以连接2个流,最后汇总成一个流,但是流里面的两份数据相互还是独立的,每一份数据使用一个计算规则
然后是流切分
如果是只需要切分一次的话使用split或者side output都可以
如果想要切分多次,就不能使用split了,需要使用side output

image

接下来看一下这几个和分区相关的算子

算子              解释
random           随机分区
rebalance        对数据集进行再平衡,重分区,消除数据倾斜
rescale          重分区
custom partition 自定义分区
  • random:随机分区,它表示将上游数据随机分发到下游算子实例的每个分区中,在代码层面体现是调用shuffle()函数
    查看源码 ,shuffle底层对应的是ShufflePartitioner这个类
    这个类里面有一个selectChannel函数,这个函数会计算数据将会被发送给哪个分区,里面使用的是random.nextInt,所以说是随机的。
  • rebalance:重新平衡分区(循环分区),我觉得叫循环分区更好理解,它表示对数据集进行再平衡,消除数据倾斜,为每个分区创建相同的负载,其实就是通过循环的方式给下游算子实例的每个分区分配数据,在代码层面体现是调用rebalance()函数
    查看源码,rebalance底层对应的是RebalancePartitioner这个类
    这个类里面有一个 setup 和 selectChannel 函 数,setup 函数会根据分区数初始化一个随机值
    nextChannelToSendTo ,然后selectChannel函数会使用nextChannelToSendTo 加1和分区数取模,把计算的值再赋给nextChannelToSendTo ,后面以此类推,其实就可以实现向下游算子实例的多个分区循环发送数据了,这样每个分区获取到的数据基本一致。
  • rescale:重分区
    查看源码,rescale底层对应的是RescalePartitioner这个类
    这个类里面有一个selectChannel函数,这里面的numberOfChannels是分区数量,其实也可以认为是我们所说的算子的并行度,因为一个分区是由一个线程负责处理的,它们两个是一一对应的。
  • broadcast:广播分区,将上游算子实例中的数据输出到下游算子实例的每个分区中,适合用于大数据集Join小数据集的场景,可以提高性
  • custom partition:自定义分区,可以按照自定义规则实现
    自定义分区需要实现Partitioner接口

注意:rescale与rebalance的区别是rebalance会产生全量重分区,而rescale不会。

image

image

最后使用代码演示一下它们具体的用法

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

/**
  * 分区规则的使用
  */
object StreamPartitionOpScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //注意:默认情况下Fink任务中算子的并行度会读取当前机器的CPU个数
    //fromCollection的并行度为1,由源码可知
    val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    //注意:在这里建议将这个隐式转换代码放到类上面
    //因为默认它只在main函数生效,针对下面提取的shuffleOp是无效的,否则也需要在shuffleOp函数中添加
    //import org.apache.flink.api.scala._
    //使用shuffle分区规则
    shuffleOp(text)
    //使用rebalance分区规则
    //rebalanceOp(text)
    //使用rescale分区规则
    //rescaleOp(text)
    //使用broadcast分区规则,此代码一共会打印40条数据,因为print的并行度为4
    //broadcastOp(text)
    //自定义分区规则:根据数据的奇偶性进行分区
    //注意:此时虽然print算子的并行度是4,但是自定义的分区规则只会把数据分发给2个并行度,所以有两个是不干活的
    //custormPartitionOp(text)
    env.execute("StreamPartitionOpScala")
  }

  private def custormPartitionOp(text: DataStream[Int]) = {
    text.map(num => num)
      .setParallelism(2) //设置map算子的并行度为2
      //.partitionCustom(new MyPartitionerScala,0)//这种写法已经过期
      .partitionCustom(new MyPartitionerScala, num => num) //官方建议使用keySelector
      .print()
      .setParallelism(4) //设置print算子的并行度为4
  }

  private def broadcastOp(text: DataStream[Int]) = {
    text.map(num => num)
      .setParallelism(2) //设置map算子的并行度为2
      .broadcast
      .print()
      .setParallelism(4) //设置print算子的并行度为4
  }

  private def rescaleOp(text: DataStream[Int]) = {
    text.map(num => num)
      .setParallelism(2) //设置map算子的并行度为2
      .rescale
      .print()
      .setParallelism(4) //设置print算子的并行度为4
  }

  private def rebalanceOp(text: DataStream[Int]) = {
    text.map(num => num)
      .setParallelism(2) //设置map算子的并行度为2
      .rebalance
      .print()
      .setParallelism(4) //设置print算子的并行度为4
  }

  private def shuffleOp(text: DataStream[Int]) = {
    //由于fromCollection已经设置了并行度为1,所以需要再接一个算子之后才能修改并行度
    text.map(num => num)
      .setParallelism(2) //设置map算子的并行度为2
      .shuffle
      .print()
      .setParallelism(4) //设置print算子的并行度为4
  }

  /**
    * 自定义分区规则:按照数字的奇偶性进行分区
    * Created by xuwei
    */
  class MyPartitionerScala extends Partitioner[Int] {
    override def partition(key: Int, numPartitions: Int): Int = {
      println("分区总数:" + numPartitions)
      if (key % 2 == 0) { //偶数分到0号分区
        0
      } else { //奇数分到1号分区
        1
      }
    }
  }

}

DataStream API之DataSink

DataSink是 输出组件,负责把计算好的数据输出到其它存储介质中
Flink支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。
还有就是print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息
Flink提供了一批Connectors,可以实现输出到第三方目的地

Flink内置           Apache Bahir
Kafka              ActiveMQ
Cassandra          Flume
Kinesis Streams    Redis
Elasticsearch      Akka
Hadoop FileSysterm
RabbitMQ
NiFi
JDBC

针对sink的这些connector,我们在实际工作中最常用的是kafka、redis
针对Flink提供的常用sink组件,可以提供这些容错性保证

DataSink          容错保证                     备注
Redis             at least once
Kafka             at least once/exactly once  Kafka0.9和0.10提供at least once,Kafka0.11及以上提供exactly once

针对kafka这个sink组件的使用,我们在后面会统一分析,现在我们来使用一下redis这个sink组件
需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。

注意:redis sink是在Bahir这个依赖包中,所以在pom.xml中需要添加对应的依赖

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

/**
  * 需求:接收Socket传输过来的数据,把数据保存到Redis的list队列中。
  */
object StreamRedisSinkScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //连接socket获取输入数据
    val text = env.socketTextStream("bigdata01", 9001)
    import org.apache.flink.api.scala._
    //组装数据,这里组装的是tuple2类型
    //第一个元素是指list队列的key名称
    //第二个元素是指需要向list队列中添加的元素
    val listData = text.map(word => ("l_words_scala", word))
    //指定redisSink
    val conf = new FlinkJedisPoolConfig.Builder().setHost("xxx").setPort(6379).setPassword("szz123").build()
    val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper())
    listData.addSink(redisSink)
    env.execute("StreamRedisSinkScala")
  }

  class MyRedisMapper extends RedisMapper[Tuple2[String, String]] {
    //指定具体的操作命令
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.LPUSH)
    }

    //获取key
    override def getKeyFromData(data: (String, String)): String = {
      data._1
    }

    //获取value
    override def getValueFromData(data: (String, String)): String = {
      data._2
    }
  }

}

注意: 针对 List 数据类型,我们在定义 getCommandDescription 方法的时候,使用 new
RedisCommandDescription(RedisCommand.LPUSH);。如果是 Hash 数据类型,在定义 getCommandDescription 方法的时候,需要使用 new RedisCommandDescription(RedisCommand.HSET,“hashKey”);在构造函数中需要直接指定Hash数据类型的key的名称。

注意:执行代码之前,需要先开启socket和redis服务,通过socket传递单词

[root@bigdata01 ~]# nc -l 9001
hello
flink

最终到redis中查看结果

[root@bigdata01 redis-5.0.9]# redis-cli
ip:6379> lrange l_words_scala 0 -1
1) "flink"
2) "hello"

标签:DataStream,flink,String,val,Int,分区,Flink,API,算子
From: https://www.cnblogs.com/strongmore/p/17373911.html

相关文章

  • react配置API请求代理
    需求当请求http://10.1.1.1:3131/v1/*接口时,需要代理到8181端口。如果只需要代理匹配到/v1路径的请求,可以在package.json中使用http-proxy-middleware进行自定义代理配置。以下是一个示例:首先,确保已经安装了http-proxy-middleware包。如果没有安装,可以使用以下命令进行安......
  • Flink详解
    什么是FlinkApacheFlink是一个开源的分布式,高性能,高可用,准确的流处理框架。分布式:表示flink程序可以运行在很多台机器上,高性能:表示Flink处理性能比较高高可用:表示flink支持程序的自动重启机制。准确的:表示flink可以保证处理数据的准确性。Flink支持流处理和批处理,虽然我们......
  • Flink安装部署
    Flink集群安装部署Flink支持多种安装部署方式StandaloneONYARNMesos、Kubernetes、AWS…这些安装方式我们主要讲一下standalone和onyarn。如果是一个独立环境的话,可能会用到standalone集群模式。在生产环境下一般还是用onyarn这种模式比较多,因为这样可以综合利用集群......
  • CentOS 7.x安装微服务网关Apache APISIX
    阅读文本大概需要3分钟。    APISIX是一个云原生、高性能、可扩展的微服务API网关。它是基于OpenResty和etcd来实现,和传统API网关相比,APISIX具备动态路由和插件热加载,特别适合微服务体系下的API管理。APISIX通过插件机制,提供动态负载平衡、身份验证、限流限速等功能,并且......
  • 实时数据治理—当Atlas遇见Flink
    Atlas是Hadoop的数据治理和元数据框架。Atlas是一组可扩展和可扩展的核心基础治理服务,使企业能够有效,高效地满足Hadoop中的合规性要求,并允许与整个企业数据生态系统集成。ApacheAtlas为组织提供了开放的元数据管理和治理功能,以建立其数据资产的目录,对这些资产进行分类和治理,并为数......
  • .NET Core WebAPI 认证授权之JWT
    @@.NETCoreWebAPI认证授权之JWT--google from --->NETCoreWebAPI认证授权之JWT(二)  在上一篇 《.NET缓存系列(一):缓存入门》中实现了基本的缓存,接下来需要对缓存进行改进,解决一些存在的问题。一、缓存过期策略问 题:当源数据更改或删除时,服务器程序并不知道,导......
  • NET Core WebAPI 认证授权之JWT
    @@.netcoretoken非对称加密 --Google-->@@wepapi认证授权之jwt NETCoreWebAPI认证授权之JWT(二):HMAC算法实操  一、前言在上一篇 《.NETCoreWebAPI认证授权之JWT(一):JWT介绍》中讲到了JWT的组成,分为三部分,其中标头(header)和载荷(payload)都只是简单的将json......
  • 利用jira及confluence的API进行批量操作(查找/更新/导出/备份/删除等)
    前言:近期因为某些原因需要批量替换掉jira和confluence中的特定关键字,而且在替换前还希望进行备份(以便后续恢复)和导出(方便查看)atlassian官方的api介绍文档太简陋,很多传参都没有进一步的描述说明,过程中踩了不少的坑...故现将相关代码分享下,希望有类似需求的朋友能用得上,直接上代......
  • Elasticsearch专题精讲—— REST APIs —— Document APIs —— Delete by query API
    RESTAPIs——DocumentAPIs——DeletebyqueryAPIhttps://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-delete-by-query.htmlDeletesdocumentsthatmatchthespecifiedquery.删除与指定查询匹配的文档。curl-XPOS......
  • Elasticsearch专题精讲—— REST APIs —— Document APIs —— Delete API
    RESTAPIs——DocumentAPIs——DeleteAPIRemovesaJSONdocumentfromthespecifiedindex.从指定的索引中移除JSON文档。1、Request(请求)https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-delete.......