流处理说明
Transformation
基本操作
map/flatMap/filter/keyBy/sum/reduce...
和之前学习的Scala/Spark里面的一样的意思
map方法、flatmap方法、keyBy方法、reduce方法
map方法
- map:将函数作用在集合中的每一个元素上,并返回作用后的结果
flatmap方法
- flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果
keyBy方法
-
按照指定的key来对流中的数据进行分组
-
并且对keyby后的数据还提供了一些简单的聚合方法:sum(),max(),min()...
filter
- filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
reduce
- reduce:对集合中的元素进行聚合
案例用法
-
对流数据中的单词进行统计,排除敏感词TMD(Theater Missile Defense 战区导弹防御)
package com.pzb.transformation; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * 演示DataStream-Transformation-基本操作 */ public class TransformationDemo01 { public static void main(String[] args) throws Exception { //TODO 0.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); //TODO 1.source DataStream<String> lines = env.socketTextStream("hadoop-001", 9999); //TODO 2.transformation DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] arr = value.split(" "); for (String word : arr) { out.collect(word); } } }); DataStream<String> filted = words.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return !value.equals("TMD");//如果是TMD则返回false表示过滤掉 } }); SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { return Tuple2.of(value, 1); } }); //第二个String为返回key的数据类型 KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0); //SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1); SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { //Tuple2<String, Integer> value1 :进来的(单词,历史值) //Tuple2<String, Integer> value2 :进来的(单词,1) //需要返回(单词,数量) return Tuple2.of(value1.f0, value1.f1 + value2.f1); //_+_ } }); //TODO 3.sink result.print(); //TODO 4.execute env.execute(); } }
合并和连接
union和connect
- union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并。
package com.pzb.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* 演示DataStream-Transformation-合并和连接操作
*/
public class TransformationDemo02 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);
//TODO 2.transformation
DataStream<String> result1 = ds1.union(ds2);//注意union能合并同类型
//ds1.union(ds3);//注意union不可以合并不同类型
ConnectedStreams<String, String> result2 = ds1.connect(ds2);//注意:connet可以连接同类型
ConnectedStreams<String, Long> result3 = ds1.connect(ds3);//注意connet可以连接不同类型
/*
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
*/
// map 传的是CoMapFunction
SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {//第一个String是第一个数据流的数据类型,Long为第二个数据流的数据类型,第二个String为连接后的数据类型
@Override
public String map1(String value) throws Exception {//这个map是对ds1进行处理
return "String:" + value;//提取出ds1的数据(value),将其变成返回值
}
@Override
public String map2(Long value) throws Exception {//这个map是对ds3进行处理
return "Long:" + value;//提取出ds2的数据(value),将其变成返回值
}
});
//TODO 3.sink
result1.print();
//result2.print();//注意:connect之后需要做其他的处理,不能直接输出
//result3.print();//注意:connect之后需要做其他的处理,不能直接输出
result.print();
//TODO 4.execute
env.execute();
}
}
合并和连接的区别:
- 合并的数据类型必须相同,连接没有要求
- 连接后的数据不能直接print,只能execution
连接后的数据要想输出,需要进行相应的process操作,里面传的是对应的new CoProcessFunction<>参数。
另外,值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:
connectedStreams.keyBy(keySelector1, keySelector2);
这里传入两个参数 keySelector1
和 keySelector2
,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起, 然后针对来源的流再做各自处理,这在一些场景下非常有用。另外,我们也可以在合并之前就将两条流分别进行 keyBy,得到的 KeyedStream 再进行连接(connect)操作,效果是一样的。
拆分和选择
•API
Split就是将一个流分成多个流
Select就是获取分流后对应的数据
注意:split函数已过期并移除
Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中
重点掌握其标签的概念
package com.pzb.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* 演示DataStream-Transformation-拆分(split)和选择(select)操作
* 注意split和select在flink1.12中已经过期并移除了
* 所以得使用outPutTag和process来实现
* 需求:对流中的数据按照奇数和偶数拆分并选择
*/
public class TransformationDemo03 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//TODO 2.transformation
//需求:对流中的数据按照奇数和偶数拆分并选择
OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
OutputTag<Integer> evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));
/*
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
public abstract void processElement(I value, ProcessFunction.Context ctx, Collector<O> out) throws Exception;
}
*/
SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {//第一个为输入数据类型,第二个为输出数据类型
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
//out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
if (value % 2 == 0) {//if处开始编写拆分策略
ctx.output(evenTag, value);
} else {
ctx.output(oddTag, value);
}
}
});
DataStream<Integer> oddResult = result.getSideOutput(oddTag);
DataStream<Integer> evenResult = result.getSideOutput(evenTag);
//TODO 3.sink
System.out.println(oddTag);//OutputTag(Integer, 奇数)
System.out.println(evenTag);//OutputTag(Integer, 偶数)
oddResult.print("奇数:");
evenResult.print("偶数:");
//TODO 4.execute
env.execute();
}
}
rebalance重平衡分区
•API
类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜
Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;
解决数据倾斜的问题
package com.pzb.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 演示DataStream-Transformation-rebalance-重平衡分区
*/
public class TransformationDemo04 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<Long> longDS = env.fromSequence(0, 100);
//下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long num) throws Exception {
return num > 10;//筛选出大于10的数据
}
});
//TODO 2.transformation
//没有经过rebalance有可能出现数据倾斜
SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {//第一个泛型为传入的数据类型,第二个泛型为输出的数据类型
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
return Tuple2.of(subTaskId, 1);
}
//按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
}).keyBy(t -> t.f0).sum(1);
//调用了rebalance解决了数据倾斜
SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> map(Long value) throws Exception {
int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
return Tuple2.of(subTaskId, 1);
}
//按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
}).keyBy(t -> t.f0).sum(1);
//TODO 3.sink
result1.print("result1");
result2.print("result2");
//TODO 4.execute
env.execute();
}
}
其他分区操作
说明:
recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例:
上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
partitionCustom
有两个参数:第一个参数是自定义的Partitioner
,我们需要重写里面的partition
函数;第二个参数是对数据流哪个字段使用partiton
逻辑。partition
函数的返回一个整数,表示该元素将被路由到下游第几个实例。
package com.pz.transformation;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 演示DataStream-Transformation-各种分区
*/
public class TransformationDemo05 {
public static void main(String[] args) throws Exception {
//TODO 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 1.source
DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
});
//TODO 2.transformation
DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
// 自定义分区
DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new MyPartitioner(), t -> t.f0);
//TODO 3.sink
result1.print("result1");
result2.print("result2");
result3.print("result3");
result4.print("result4");
result5.print("result5");
result6.print("result6");
result7.print("result7");
//TODO 4.execute
env.execute();
}
public static class MyPartitioner implements Partitioner<String>{//String 为source阶段的数据类型
@Override
public int partition(String key, int numPartitions) {
//if(key.equals("北京")) return 0; 这里写自己的分区逻辑即可
return 0;
}
}
}
标签:DataStream,Flink,flink,转换方法,api,import,apache,org
From: https://www.cnblogs.com/Mr-Sponge/p/17037170.html