首页 > 其他分享 >Flink(五)DataStream流处理算子

Flink(五)DataStream流处理算子

时间:2024-09-27 16:50:22浏览次数:1  
标签:keyedStream DataStream Flink dataStream 算子 path 数据

DataStream流处理算子

Source算子(数据读入)

  • Flink可以使用StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源

基于本地集合的source

DataStream<String> words = env.fromElements("hello", "flink", "stream");

基于文件的source

readTextFile(path)
  • 读取指定路径的文本文件
readFile(fileInputFormat, path)
  • 按指定的文件输入格式(fileInputFormat)读取指定路径path的文件
readFile(fileInputFormat, path, watchType, interval, pathFilter) 
  • watchType:定义了文件源的行为,可以是FileProcessingMode.PROCESS_CONTINUOUSLY,定期监听路径下的新数据;或者是FileProcessingMode.PROCESS_ONCE,只处理一次当前在路径中的数据并退出
  • interval:当watchType设置为PROCESS_CONTINUOUSLY时,这个参数定义了文件系统被监控的间隔时间(毫秒)
  • pathFilter:可选的PathFilter实例,用于进一步过滤要处理的文件
DataStream<String> text = env.readTextFile(inputPath);

基于网络套接字的source

  • socketTextStream(host, port): 从指定的主机和端口读取文本行,返回一个字符串数据流
DataStream<String> text = env.socketTextStream("hadoop102", 7777);

从Hadoop兼容的文件系统读取

  • readTextFile(FileSystemPath path): 从Hadoop文件系统读取文本文件
DataStream<String> text = env.readTextFile(new Path("hdfs:///path/to/your/file.txt"));

自定义source

  • addSource(new RichParallelSourceFunction()): 实现自定义的Source功能
DataStream<String> customSource = env.addSource(new RichParallelSourceFunction<String>() {
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        // 自定义数据生成逻辑
    }
});

Transform转换算子(数据处理)

转换算子 描述
map 将DataSet中的每一个元素转换为另外一个元素
FlatMap 采用一个数据元并生成零个,一个或多个数据元,将句子分割为单词的flatmap函数
FlatMap 采用一个数据元并生成零个,一个或多个数据元
Filter 计算每个数据元的布尔函数,并保存函数返回true的数据元
KeyBy 逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区
Reduce 将当前数据元与最后一个Reduce的值组合并发出新值
Fold 将当前数据元与最后折叠的值组合并发出新值
Aggregations 在被Keys化数据流上滚动聚合,min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元
Window 可以在已经分区的KeyedStream上定义Windows
Union 两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流
connect “连接”两个保存其类型的数据流
Split 根据某些标准将流拆分为两个或更多个流
Select 从拆分流中选择一个或多个流
dataStream.map { x => x * 2 }
//将字符串按空格分割
dataStream.flatMap { str => str.split(" ") }
//过滤零值
dataStream.filter { _ != 0 }
//按照数据流中每个元素的第一个字段进行分组,比如Tuple<t1,t2>,按照t1分组
dataStream.keyBy(0) 
//对每个分组的数据流中的元素进行累加
keyedStream.reduce { _ + _ }  
// 序列(1,2,3,4,5),输出结果“start-1”,“start-1-2”,“start-1-2-3”,...
val result: DataStream[String] =  keyedStream.fold("start")((str, i) => { str + "-" + i }) 
keyedStream.sum(0);
keyedStream.min(0);
keyedStream.max(0);
keyedStream.minBy(0);
keyedStream.maxBy(0);

Sink算子(数据输出)

print()

  • 将数据输出到标准输出(控制台)
dataStream.print();

writeAsText()

  • 将数据以文本形式写入到文件系统中的文件,包括本地文件,HDFS文件
dataStream.writeAsText("output.txt");

writeAsCsv()

  • 将数据以逗号分隔值(CSV)的格式写入到文件系统中的文件
dataStream.writeAsCsv("output.csv");

addSink(SinkFunction)

  • 使用自定义的SinkFunction将数据输出到外部系统

标签:keyedStream,DataStream,Flink,dataStream,算子,path,数据
From: https://www.cnblogs.com/shihongpin/p/18435742

相关文章

  • Flink(二)搭建Maven工程实现WordCount
    开发环境编写WordCountpom文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation=&qu......
  • Flink-Yarn模式修改Task Slot的数量
    1.修改Flink配置文件(flink-conf.yaml)Flink中的TaskManager是根据slots来分配任务的,默认情况下,一个TaskManager可以有多个slots。你可以通过调整flink-conf.yaml中的以下配置来控制每个TaskManager的slot数量:taskmanager.numberOfTaskSlots:<number_of_slots......
  • Flink(二)集群安装
    集群安装Standalone模式安装解压缩[user@hadoop102software]$tar-zxvfflink-1.10.1-bin-scala_2.12.tgz-C/opt/module/修改flink/conf/flink-conf.yaml文件jobmanager.rpc.address:hadoop102修改/conf/slaves文件hadoop103hadoop104分发给其他两台虚拟......
  • ShiftAddAug:基于乘法算子训练的最新无乘法网络方案 | CVPR'24
    不包含乘法的运算符,如移位和加法,因其与硬件的兼容性而日益受到重视。然而,采用这些运算符的神经网络(NNs)通常表现出比具有相同结构的传统NNs更低的准确性。ShiftAddAug利用成本较高的乘法来增强高效但功能较弱的无乘法运算符,从而在没有任何推理开销的情况下提高性能。将一个ShiftAd......
  • flink的发展历程
    ApacheSpark和ApacheFlink都是开源的分布式大数据处理框架,它们各自有着不同的特点和发展历程。ApacheSpark:起始时间:2009年,由加州大学伯克利分校AMPLab开发。开源时间:2010年,MateiZaharia将其开源。主要发展:2013年,成为Apache基金项目。2014年,成为Apache顶级项目。2016年......
  • 可微TopK算子
    形式及推导形式:前向计算如下所示,\[\text{TopK}(\vec{x},k)=\sigma(\vec{x}+\Delta(\vec{x},k))\]注意\(\Delta(\cdot)\)满足限制条件\(\sum\Delta(\vec{x},k)=k\),并且\(\sigma(x)=\frac{1}{1+\exp\{-x\}}\)梯度推导:令\(f(\vec{x},k)=\sigma(\vec{x}+\De......
  • Flink(一)概述
    Flink概述ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算Flink特点事件驱动(Event-driven)事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作比较典型的就是以kafka为代表的消......
  • 利用 Flink CDC 实现实时数据同步与分析
    1.概述1.1简要介绍什么是FlinkCDC(ChangeDataCapture)FlinkCDC(ChangeDataCapture)是一种用于实时捕获和处理数据库中数据变更的技术。它通过监控数据库的变更事件,将这些事件转化为流式数据,使得数据处理系统(如ApacheFlink)能够以流的方式实时处理和分析数据。FlinkC......
  • Flink CDC介绍:基于流的数据集成工具
    FlinkCDC是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。该工具使得用户能够以YAML配置文件的形式,优雅地定义其ETL(Extract,Transform,Load)流程,并协助用户自动化生成定制化的Flink算子并且提交Flink作业。FlinkCDC在任务提交过程中......
  • 尚硅谷-flink
    一、介绍1.简介flink是一个开源的分布式流处理框架优势:高性能处理、高度灵活window操作、有状态计算的Exactly-once等详情简介,参考官网:https://flink.apache.org/flink-architecture.html中文参考:https://flink.apache.org/zh/flink-architecture.......