首页 > 其他分享 >Flink的转换方法

Flink的转换方法

时间:2023-01-09 15:22:53浏览次数:41  
标签:DataStream Flink flink 转换方法 api import apache org

流处理说明

Transformation

基本操作

map/flatMap/filter/keyBy/sum/reduce...

和之前学习的Scala/Spark里面的一样的意思

map方法、flatmap方法、keyBy方法、reduce方法

map方法

  • map:将函数作用在集合中的每一个元素上,并返回作用后的结果

flatmap方法

  • flatMap:将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果

keyBy方法

  • 按照指定的key来对流中的数据进行分组

  • 并且对keyby后的数据还提供了一些简单的聚合方法:sum(),max(),min()...

filter

  • filter:按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素

reduce

  • reduce:对集合中的元素进行聚合

案例用法

  • 对流数据中的单词进行统计,排除敏感词TMD(Theater Missile Defense 战区导弹防御)

    package com.pzb.transformation;
    
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * 演示DataStream-Transformation-基本操作
     */
    public class TransformationDemo01 {
        public static void main(String[] args) throws Exception {
            //TODO 0.env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    
            //TODO 1.source
            DataStream<String> lines = env.socketTextStream("hadoop-001", 9999);
    
    
            //TODO 2.transformation
            DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out) throws Exception {
                    String[] arr = value.split(" ");
                    for (String word : arr) {
                        out.collect(word);
                    }
                }
            });
    
            DataStream<String> filted = words.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return !value.equals("TMD");//如果是TMD则返回false表示过滤掉
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    return Tuple2.of(value, 1);
                }
            });
    
            //第二个String为返回key的数据类型
            KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);
    
            //SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                    //Tuple2<String, Integer> value1 :进来的(单词,历史值)
                    //Tuple2<String, Integer> value2 :进来的(单词,1)
                    //需要返回(单词,数量)
                    return Tuple2.of(value1.f0, value1.f1 + value2.f1); //_+_
                }
            });
    
            //TODO 3.sink
            result.print();
    
            //TODO 4.execute
            env.execute();
        }
    }
    
    

合并和连接

union和connect

  • union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并。

package com.pzb.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * 演示DataStream-Transformation-合并和连接操作
 */
public class TransformationDemo02 {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
        DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);

        //TODO 2.transformation
        DataStream<String> result1 = ds1.union(ds2);//注意union能合并同类型
        //ds1.union(ds3);//注意union不可以合并不同类型
        ConnectedStreams<String, String> result2 = ds1.connect(ds2);//注意:connet可以连接同类型
        ConnectedStreams<String, Long> result3 = ds1.connect(ds3);//注意connet可以连接不同类型

        /*
        public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
            OUT map1(IN1 value) throws Exception;
            OUT map2(IN2 value) throws Exception;
        }
         */
        // map 传的是CoMapFunction
        SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {//第一个String是第一个数据流的数据类型,Long为第二个数据流的数据类型,第二个String为连接后的数据类型
            @Override
            public String map1(String value) throws Exception {//这个map是对ds1进行处理
                return "String:" + value;//提取出ds1的数据(value),将其变成返回值
            }

            @Override
            public String map2(Long value) throws Exception {//这个map是对ds3进行处理
                return "Long:" + value;//提取出ds2的数据(value),将其变成返回值
            }
        });


        //TODO 3.sink
        result1.print();
        //result2.print();//注意:connect之后需要做其他的处理,不能直接输出
        //result3.print();//注意:connect之后需要做其他的处理,不能直接输出
        result.print();

        //TODO 4.execute
        env.execute();
    }
}

合并和连接的区别

  • 合并的数据类型必须相同,连接没有要求
  • 连接后的数据不能直接print,只能execution

连接后的数据要想输出,需要进行相应的process操作,里面传的是对应的new CoProcessFunction<>参数。

另外,值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

 connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数 keySelector1keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起, 然后针对来源的流再做各自处理,这在一些场景下非常有用。另外,我们也可以在合并之前就将两条流分别进行 keyBy,得到的 KeyedStream 再进行连接(connect)操作,效果是一样的。

拆分和选择

•API

​ Split就是将一个流分成多个流

​ Select就是获取分流后对应的数据

注意:split函数已过期并移除

Side Outputs:可以使用process方法对流中数据进行处理,并针对不同的处理结果将数据收集到不同的OutputTag中

重点掌握其标签的概念

package com.pzb.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 演示DataStream-Transformation-拆分(split)和选择(select)操作
 * 注意split和select在flink1.12中已经过期并移除了
 * 所以得使用outPutTag和process来实现
 * 需求:对流中的数据按照奇数和偶数拆分并选择
 */
public class TransformationDemo03 {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        //TODO 2.transformation
        //需求:对流中的数据按照奇数和偶数拆分并选择
        OutputTag<Integer> oddTag = new OutputTag<>("奇数", TypeInformation.of(Integer.class));
        OutputTag<Integer> evenTag = new OutputTag<>("偶数",TypeInformation.of(Integer.class));

        /*
        public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
            public abstract void processElement(I value, ProcessFunction.Context ctx, Collector<O> out) throws Exception;
        }
         */
        SingleOutputStreamOperator<Integer> result = ds.process(new ProcessFunction<Integer, Integer>() {//第一个为输入数据类型,第二个为输出数据类型
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                //out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
                if (value % 2 == 0) {//if处开始编写拆分策略
                    ctx.output(evenTag, value);
                } else {
                    ctx.output(oddTag, value);
                }
            }
        });

        DataStream<Integer> oddResult = result.getSideOutput(oddTag);
        DataStream<Integer> evenResult = result.getSideOutput(evenTag);

        //TODO 3.sink
        System.out.println(oddTag);//OutputTag(Integer, 奇数)
        System.out.println(evenTag);//OutputTag(Integer, 偶数)
        oddResult.print("奇数:");
        evenResult.print("偶数:");

        //TODO 4.execute
        env.execute();
    }
}

rebalance重平衡分区

•API

类似于Spark中的repartition,但是功能更强大,可以直接解决数据倾斜

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况,出现了数据倾斜,其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;

解决数据倾斜的问题

package com.pzb.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 演示DataStream-Transformation-rebalance-重平衡分区
 */
public class TransformationDemo04 {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<Long> longDS = env.fromSequence(0, 100);
        //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜
        DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long num) throws Exception {
                return num > 10;//筛选出大于10的数据
            }
        });

        //TODO 2.transformation
        //没有经过rebalance有可能出现数据倾斜
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = filterDS
                .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {//第一个泛型为传入的数据类型,第二个泛型为输出的数据类型
                    @Override
                    public Tuple2<Integer, Integer> map(Long value) throws Exception {
                        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
                        return Tuple2.of(subTaskId, 1);
                    }
                    //按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
                }).keyBy(t -> t.f0).sum(1);

        //调用了rebalance解决了数据倾斜
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> result2 = filterDS.rebalance()
                .map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Long value) throws Exception {
                        int subTaskId = getRuntimeContext().getIndexOfThisSubtask();//子任务id/分区编号
                        return Tuple2.of(subTaskId, 1);
                    }
                    //按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
                }).keyBy(t -> t.f0).sum(1);


        //TODO 3.sink
        result1.print("result1");
        result2.print("result2");


        //TODO 4.execute
        env.execute();
    }
}

其他分区操作

说明:
recale分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。
举例:
上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。

partitionCustom有两个参数:第一个参数是自定义的Partitioner,我们需要重写里面的partition函数;第二个参数是对数据流哪个字段使用partiton逻辑。partition函数的返回一个整数,表示该元素将被路由到下游第几个实例。

package com.pz.transformation;

        import org.apache.flink.api.common.RuntimeExecutionMode;
        import org.apache.flink.api.common.functions.FlatMapFunction;
        import org.apache.flink.api.common.functions.Partitioner;
        import org.apache.flink.api.java.tuple.Tuple2;
        import org.apache.flink.streaming.api.datastream.DataStream;
        import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
        import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
        import org.apache.flink.util.Collector;

/**
 * 演示DataStream-Transformation-各种分区
 */
public class TransformationDemo05 {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> linesDS = env.readTextFile("data/input/words.txt");
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        //TODO 2.transformation
        DataStream<Tuple2<String, Integer>> result1 = tupleDS.global();
        DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();
        DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();
        DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();
        DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();
        DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();
        // 自定义分区
        DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new MyPartitioner(), t -> t.f0);


        //TODO 3.sink
        result1.print("result1");
        result2.print("result2");
        result3.print("result3");
        result4.print("result4");
        result5.print("result5");
        result6.print("result6");
        result7.print("result7");


        //TODO 4.execute
        env.execute();
    }
    public static class MyPartitioner implements Partitioner<String>{//String 为source阶段的数据类型
        @Override
        public int partition(String key, int numPartitions) {
            //if(key.equals("北京")) return 0;  这里写自己的分区逻辑即可
            return 0;
        }
    }
}

标签:DataStream,Flink,flink,转换方法,api,import,apache,org
From: https://www.cnblogs.com/Mr-Sponge/p/17037170.html

相关文章

  • Flink处理函数
    ProcessFlink提供了8个不同的处理函数:(1)ProcessFunction最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。(2)KeyedProcessFunction对流按键分区......
  • 基于Flink CDC的现代数据栈 (Modern Data Stack)实现
    ......
  • 基于SpringBoot 使用 Flink 收发Kafka消息
    前言这周学习下Flink相关的知识,学习到一个读写Kafka消息的示例,自己动手实践了一下,别人示例使用的是普通的JavaMain方法,没有用到springboot.我们在实际工作中会使用spr......
  • 为什么巨头的 Flink 作业运行都在 YARN 上?(附源码)
    曾有人调侃:HBase没有资源什么事情也做不了,Spark占用了资源却没有事情可做? 那YARN了解一下?01YARN!伴随着Hadoop生态的发展,不断涌现了多种多样的技术组件Hive、HBase、Spa......
  • Flink:CEP
    基本概念CEPCEP就是复杂事件处理(ComplexEventProcessing)的缩写。而FlinkCEP就是Flink实现的一个用于复杂事件处理的库。具体的处理过程是,把事件流中的一个个简......
  • Flink:CEP
    基本概念CEPCEP就是复杂事件处理(ComplexEventProcessing)的缩写。而FlinkCEP就是Flink实现的一个用于复杂事件处理的库。具体的处理过程是,把事件流中的一个个简......
  • Flink Spark jdbc读写数据库导致oom和提升性能解决办法
     fetchsize=Integer.MIN_VALUE 作用如果不设置上述值,默认读取jdbc数据时,会默认获取所有的行到resultset中,数据量大会导致oom和占用大量内存reWriteBatchedInserts=t......
  • Flink:TableAPI 和 SQL
    快速上手引入依赖要在代码中使用TableAPI,必须引入相关的依赖。这里的依赖是一个Java的“桥接器”,主要就是负责TableAPI和下层DataStreamAPI的连接支持,按照不同的......
  • Flink mini-batch "引发" 的乱序问题
    问题描述近期业务反馈,开启了mini-batch之后,出现了数据不准的情况,关掉了mini-batch之后,就正常了,因此业务方怀疑,是不是Flink的mini-batch存在bug?问题排查......
  • flink orc hive 2.1.1 源码bug处理
    先说一下我们公司的线上集群配置: CDH6.3.1,hive2.1.1 ,由于公司是做车联网业务方向的,所以数据量很大,同事小A,在往集群写数据,发现写入的数据不能在hive表里查询,他写往......