首页 > 其他分享 >【Flink从入门到精通 02】DataStream API

【Flink从入门到精通 02】DataStream API

时间:2024-02-26 17:36:00浏览次数:26  
标签:02 DataStream Flink value 算子 Override new public

【Flink从入门到精通 02】DataStream API

在之前的文章中,我们介绍了Flink的安装部署、基础概念,今天我们来一起学习Flink的核心之一DataStream API

01 分布式流处理基础

在这里插入图片描述

上图中,我们将整个代码分为了三个部分,即分布式流处理的基本模型

  • Source
  • Transformation
  • Sink

从而,我们可以给出Flink编程框架

// 1. 获取运行环境
final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

// 2. 加载数据源
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));

// 3. 数据处理操作
DataStream<Person> adults = flintstones.filter(new
FilterFunction<Person>() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});

// 4. 写出到Sink
adults.print();

// 5. 提交任务执行
env.execute();

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

02 Flink DataStream API概览

对于编程模型,这里我们重点关注第二步Transformations,对DataStream的处理。下图中,给出了DataStream到各个Stream流的转换过程。

image-20220122223510168

首先来对各个Stream之间的转换算子做一个总结。

DataStream --> DataStream

fliter算子

对每条记录执行过滤函数,返回为true的结果。

SingleOutputStreamOperator<String> filter = kafkaDStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                if (s.contains("actions")) {
                    return true;
                }
                return false;
            }
        });
filter.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
map算子

一个输入对应一个输出,对每条记录执行MapFunction。

SingleOutputStreamOperator<String> map = kafkaDStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                return "大数据干货杨" + s;
            }
        });
map.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
flatMap算子

一个输入对应多个输出。

SingleOutputStreamOperator<String> flatMap = kafkaDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split("},");
                for (String s1 : split) {
                    collector.collect(s1);
                }
            }
        });
flatMap.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

DataStream --> KeyedStream

keyBy算子

按key对数据进行分组,相同key的数据被分配到同一个分区,内部使用的是hashcode。

下列情况的数据类型无法作为key:

  • POJO类型没有重写hashcode方法并且依赖Object.hashcode()实现
  • 任何类型的数组
dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);
  • 1
  • 2
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = jsonDS.map(new MapFunction<JSONObject, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(JSONObject jsonObject) throws Exception {
                return new Tuple2<>(jsonObject.getJSONObject("common").getString("mid"), 1);
            }
        }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2.f0;
            }
        }).sum(1);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

KeyedStream --> DataStream

Reduce算子

只有keyedStream可以执行reduce,将上次reduce的值和当前值聚合并提交新的结果

注意,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果

SingleOutputStreamOperator<JSONObject> reduce = jsonDS.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getJSONObject("common").getString("mid");
            }
        }).reduce(new ReduceFunction<JSONObject>() {
            @Override
            public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
                Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
                        Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
                value1.getJSONObject("common").put("is_new", num);
                return value1;
            }
        });
reduce.print();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

KeyedStream --> WindowedStream

window算子

在上一篇《Flink流式处理概念》的文章中,我们介绍了窗口的概念,即逻辑上将数据流划分成一个个的“桶”,对桶进行数据计算。

窗口定义在keyedStream上,窗口将每个key的数据按照某种规则进行分组,如最近5秒到达的数据。

下面只给出了window算子的使用样例,具体的使用方法会在后续文章中给出。

SingleOutputStreamOperator<JSONObject> reduceWindow = jsonDS.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getJSONObject("common").getString("mid");
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10))) // 指定滚动窗口,窗口大小10s
                .reduce(new ReduceFunction<JSONObject>() {
                    @Override
                    public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
                        Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
                                Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
                        value1.getJSONObject("common").put("is_new", num);
                        return value1;
                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

DataStream --> AllWindowedStream

windowAll算子

窗口定义在通用DataStream上,对整个流中的事件按照某种规则分组,没有并行操作,所有的数据会被聚合到一个任务中。

SingleOutputStreamOperator<JSONObject> reduceWindowAll = jsonDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .reduce(new ReduceFunction<JSONObject>() {
                    @Override
                    public JSONObject reduce(JSONObject value1, JSONObject value2) throws Exception {
                        Integer num = Integer.parseInt(value1.getJSONObject("common").getString("is_new")) +
                                Integer.parseInt(value2.getJSONObject("common").getString("is_new"));
                        value1.getJSONObject("common").put("is_new", num);
                        return value1;
                    }
                });
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

WindowedStream/AllWindowedStream --> DataStream

apply算子

对窗口应用一个通用函数。

windowedStream.apply(new WindowFunction<Tuple2<String, Integer>, Integer, Tuple, window>(){
	public void apply(Tuple tuple,Window window,Iterable<Tuple2<String,Integer>> values, Collector<Integer> out) throws Exception(){
		int sum=0;
		for(value t : values){
			sum += t.f1;
		}
		out.collect(new Integer(sum));
	}
});

allWindowedStream.apply(new AllWindowFunction<Tuple2<String, Integer>, Integer, Tuple, window>(){
public void apply(Tuple tuple,Window window,Iterable<Tuple2<String,Integer>> values, Collector<Integer> out) throws Exception(){
int sum=0;
for(value t : values){
sum += t.f1;
}
out.collect(new Integer(sum));
}
});

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
WindowReduce算子

对窗口应用一个reduce函数,并将结果返回,调用方法桶KeyedStream的reduce算子。

windowedStream.reduce(new ReduceFunction<Tuple2<String,Integer>>(){
	public Tuple2<String,Integer> reduce(Tuple2<String,Integer> value1,Tuple2<String,Integer> value2) throws Exception{
		return Tuple2<String,Integer>(value1.f0,value.f1+value2.f1);
	}
});
  • 1
  • 2
  • 3
  • 4
  • 5

DataStream* --> DataStream

union算子

创建一个新的DataStream,包含多个流的所有元素,要求流的数据类型相同。

dataStream.union(otherStream1,OtherStream2,...);
  • 1

DataStream,DataStream --> DataStream

join算子

将两个DataStream根据给定的key进行join。

dataStream.join(otherStream)
	.where(<key selector>).equalTo(<key selector>)
	.window(TumblingEventTimeWindows.of(Time.seconds(3)))
	.apply(new JObFunction(){...});
  • 1
  • 2
  • 3
  • 4
CoGroup算子

在一个窗口内将两个DataStream按照给定的key进行组合

dataStream.coGroup(otherStream)
	.where(0).equalTo(1)
	.window(TumblingEventTimeWindows.of(Time.seconds(3)))
	.apply(new CoGroupFunction(){...});
  • 1
  • 2
  • 3
  • 4

KeyedStream,KeyedStream --> DataStream

Interval Join算子

将两个KeyedStream在一个给定时间间隔内进行join

keyedStream.intervalJoin(otherKeyedStream)
	.between(Time.milliseconds(-2),Time.milliseconds(2))
	.upperBoundExclusive(true)
	.lowerBoundExclusive(true)
	.process(new IntervalJoinFunction(){...});
  • 1
  • 2
  • 3
  • 4
  • 5

DataStream,DataStream --> ConnectedStream

connect算子

连接两个保持类型的数据流,两个数据流被connect后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
  • 1
  • 2
  • 3
  • 4

ConnectedStream --> DataStream

CoMap,CoFlatMap算子

类似map、flatMap,不过可以对两个流分别应用不同的处理逻辑

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>(){
		@Override
    public Boolean map1(Integer value) {
        return true;
    }
    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, Boolean>(){
	 @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }
   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

DataStream --> IterativeStream -->ConnectedStream

Iterate算子

通过将一个运算符的输出重定向到某个先前的运算符,在流中创建一个“反馈”循环。 这对于定义不断更新模型的算法特别有用。 下面的代码从一个流开始,不断地应用迭代体。 大于0的元素被送回反馈通道,其余元素向下游转发

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map(...);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
	@Override
	public boolean filter(Long value) throws Exception{
		return value>0;
	}
});
iteration.colseWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
	@Override
	public boolean filter(Long value) throws Exception{
		return value = 0;
	}
});
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

03 物理分组

Flink程序可以对每个算子设置并行度

使用物理分组可以对数据进行更细粒度的分区,常用的物理分组如下所示:

Custom Partitioning

使用用户自定义的分区函数来选择每个数据的目标。

dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
  • 1
  • 2
Random Partitioning

随机均匀分区,即随机分组。

dataStream.shuffle();
  • 1
Rescaling

本地的轮流分配。
对于需要从源的每个并行实例输出到多个映射器的子集以分配负载但不希望 rebalance() 会导致完全重新平衡的情况,这个算子很有用。

同时只需要本地数据传输,而不是通过网络传输数据,具体取决于其他配置值,例如 TaskManager 的插槽数。

上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。例如,如果上游操作的并行度为 2,下游操作的并行度为 6,则一个上游操作会将元素分配给三个下游操作,而另一个上游操作将分配给其他三个下游操作。另一方面,如果下游操作的并行度为 2,而上游操作的并行度为 6,则三个上游操作将分配给一个下游操作,而其他三个上游操作将分配给另一个下游操作。

在不同并行度不是彼此的倍数的情况下,一个或多个下游操作将具有来自上游操作的不同数量的输入。

在这里插入图片描述

jsonDS.keyBy(new KeySelector<JSONObject, String>() {
            @Override
            public String getKey(JSONObject value) throws Exception {
                return value.getJSONObject("common").getString("mid");
            }
        }).map(new MapFunction<JSONObject, JSONObject>() {
            @Override
            public JSONObject map(JSONObject value) throws Exception {
                return null;
            }
        }).setParallelism(6).rescale();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
Broadcasting

广播每一个元素到所有的分区,即将每个元素都会发送到所有分区。

dataStream.broadcast();
  • 1

04 类型系统

Flink支持的数据类型如下:

类型说明
基本类型Java基本类型(包装类)以及void、String、Date、BigDecimal、BigInteger
复合类型Tuple和Scala case class(不支持null)、ROW、POJO
辅助、集合类型Option、Either、List、Map等
上述类型的数组
其他类型自定义类型

05 任务链和资源组

任务链:将两个算子链接在一起能使得它们在同一个线程中执行,从而提升性能。

Flink 默认会将能链接的算子尽可能地进行链接(例如, 两个 map 转换操作),还提供了对链接更细粒度控制的 API ,这些API只能在 DataStream 转换操作后才能被调用,因为它们只对前一次数据转换生效。

资源组:一个资源组对应着 Flink 中的一个 slot 槽,可以根据需要手动地将各个算子隔离到不同的 slot 中

Start New Chain

someStream.filter(...).map(...).startNewChain().map(...);
  • 1

Disable Chaining

不开启链接操作。

someStream.map(...).disableChaining();
  • 1

Set Slot Sharing Group

设置操作的槽共享组。 Flink 会将具有相同 slot 共享组的操作放在同一个 slot 中,而将没有 slot 共享组的操作保留在其他 slot 中。 这可用于隔离插槽。

如果所有输入操作都在同一个槽共享组中,则槽共享组继承自输入操作。 默认槽共享组的名称是“default”,可以通过调用 slotSharingGroup(“default”) 将操作显式放入该组。

someStream.filter(...).slotSharingGroup("name");
  • 1

关于Flink的DataStream API就介绍到这里了,学习过程中还是需要自己去写代码才能领悟其中的精髓,文章中的代码可以去flink-learning的项目中获取,希望能给你带来帮助。
在这里插入图片描述

原文链接:https://blog.csdn.net/qq_36369061/article/details/123028297

标签:02,DataStream,Flink,value,算子,Override,new,public
From: https://www.cnblogs.com/sunny3158/p/18034800

相关文章

  • Exception in thread "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThr
    这个问题集合遍历修改了集合结构,这样是不被允许的,需要换种方式报错示意图 第一可以采用for(inti=0;i<registryList.size();i++)解决第二采用迭代处理Iterator<XxlJobRegistry>iterator=registryList.iterator();while(iterator.hasNext()){XxlJobRegist......
  • 蓝桥杯2022年第十三届省赛真题-矩形拼接
    目录题目分析代码题目分析情况1:三个矩形有一边相等。(完全匹配:4边)情况2:三个矩形中有前两个矩形的边等于第三个矩形的边,而且前两个矩形的另一条边相等。(完全匹配:4边)情况3:三个矩形中有前两个矩形的边等于第三个矩形的边,而且前两个矩形的另一条边不相等。(部分匹配:6边)......
  • 邀请函 | 2024年数据技术嘉年华集结号已吹响,期待您参会!
    龙腾四海内,风云际会时,2024年中国数据嘉年华如约而至。从起初小范围的网友聚会,到如今面向全国各地从业者、爱好者的年度集会,纵使岁月更迭,我们初心依旧。我们在各自最好的年华里共同见证了中国数据库行业的蓬勃发展,感恩所有同行者!由墨天轮数据社区及中国数据库联盟(ACDU)主办的 第......
  • 2024年Apache DolphinScheduler RoadMap:引领开源调度系统的未来
    非常欢迎大家来到ApacheDolphinScheduler社区!随着开源技术在全球范围内的快速发展,社区的贡献者“同仁”一直致力于构建一个强大而活跃的开源调度系统社区,为用户提供高效、可靠的任务调度和工作流管理解决方案。在过去的一段时间里,我们取得了一些重要的成就,但我们的愿景远未实......
  • 2024-02-26 闲话
    Course不是UndergraduateResearch.Plug-and-PlayKnowledgeInjectionforPre-trainedLanguageModels建议以后写完文章拿ChatGPT跑一遍语法错误metioned不是mentions谢谢。设计了“plug-and-play”的paradigm。下文记作pap范式主打map-tuning。有一......
  • Ncast盈可视高清智能录播系统RCE漏洞(CVE-2024-0305)复现
    0x00漏洞简介Ncast盈可视高清智能录播系统是广东盈科电子公司的一款产品。该系统2017及之前版本/classes/common/busiFacade.php接口存在RCE漏洞。0x01资产测绘:zoomeye-query:title:"高清智能录播系统"fofa-query:app="Ncast-产品"&&title=="高清智能录播系统"0x02漏......
  • 【李宏毅机器学习2021】(二)Tips for training
    这一节主要讲解机器学习、类神经网络训练不起来怎么办?讲解一些训练的tips。先来回顾下机器学习的步骤:接下来将介绍在机器学习过程中,遇到问题时如何判断原因并解决:在训练数据上Loss值很大ModelBias在训练数据上Loss值很大,有可能是发生了Model问题。问题原因:模型太......
  • SketchUp Pro 2023:颠覆传统,重塑设计世界mac/win版
    SketchUpPro2023是一款强大的三维建模软件,专为设计师、建筑师和创意专业人士打造。这款软件以其直观易用的界面和强大的功能而著称,为用户提供了无限的创意空间。→→↓↓载SketchUpPro2023mac/win版 SketchUpPro2023在用户体验方面进行了全面的优化,界面更加简洁明了,操......
  • GDOI2024 游记
    加训睡觉/fendou。Day-10|2024.2.20早上打了icpc2022hangzhou。拷打钱哥怎么没过计算几何板子题。研究模拟赛某题的凸包,感觉增删的凸包还是太困难了,即使条件弱化很多了也不太好做。nmd。晚上看lpl,怎么IG把BLG给虐了。和网友聊八卦,激情输出观点,得出的结论是恋爱太......
  • 【李宏毅机器学习2021】(一)引入机器学习和深度学习
    引入机器学习MachineLearning概括来说就是LookingforFunction,即让机器具备找一个函数的能力这些函数显然非常复杂,要依靠机器自动找出该函数。随着要找的函数不同,机器学习有不同的类别:Regression,回归:函数输出的是数值。Classification,分类:函数从给定选项(类别)中选择一个......