首页 > 其他分享 >Flink合流操作

Flink合流操作

时间:2023-01-09 15:58:24浏览次数:37  
标签:flink org Flink value Tuple2 import apache 操作 合流

合流

1、概念

将不同流中的数据汇聚在一起,然后可以进行一个统计等相关操作。

2、基本合流操作

union和connect

  • union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并。

package com.pzb.transformation;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * 演示DataStream-Transformation-合并和连接操作
 */
public class TransformationDemo02 {
    public static void main(String[] args) throws Exception {
        //TODO 0.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //TODO 1.source
        DataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");
        DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");
        DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);

        //TODO 2.transformation
        DataStream<String> result1 = ds1.union(ds2);//注意union能合并同类型
        //ds1.union(ds3);//注意union不可以合并不同类型
        ConnectedStreams<String, String> result2 = ds1.connect(ds2);//注意:connet可以连接同类型
        ConnectedStreams<String, Long> result3 = ds1.connect(ds3);//注意connet可以连接不同类型

        /*
        public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
            OUT map1(IN1 value) throws Exception;
            OUT map2(IN2 value) throws Exception;
        }
         */
        // map 传的是CoMapFunction
        SingleOutputStreamOperator<String> result = result3.map(new CoMapFunction<String, Long, String>() {//第一个String是第一个数据流的数据类型,Long为第二个数据流的数据类型,第二个String为连接后的数据类型
            @Override
            public String map1(String value) throws Exception {//这个map是对ds1进行处理
                return "String:" + value;//提取出ds1的数据(value),将其变成返回值
            }

            @Override
            public String map2(Long value) throws Exception {//这个map是对ds3进行处理
                return "Long:" + value;//提取出ds2的数据(value),将其变成返回值
            }
        });


        //TODO 3.sink
        result1.print();
        //result2.print();//注意:connect之后需要做其他的处理,不能直接输出
        //result3.print();//注意:connect之后需要做其他的处理,不能直接输出
        result.print();

        //TODO 4.execute
        env.execute();
    }
}

合并和连接的区别

  • 合并的数据类型必须相同,连接没有要求
  • 连接后的数据不能直接print,只能execution

连接后的数据要想输出,需要进行相应的process操作,里面传的是对应的new CoProcessFunction<>参数。

将不同流中的数据按相同key值合并:

另外,值得一提的是,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

 connectedStreams.keyBy(keySelector1, keySelector2);

这里传入两个参数 keySelector1keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的 keyBy 用法完全一致。ConnectedStreams 进行 keyBy 操作,其实就是把两条流中 key 相同的数据放到了一起, 然后针对来源的流再做各自处理,这在一些场景下非常有用。另外,我们也可以在合并之前就将两条流分别进行 keyBy,得到的 KeyedStream 再进行连接(connect)操作,效果是一样的。

package com.peng.union_connect;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @author 海绵先生
 * @Description TODO
 * @date 2022/11/12-14:36
 */
public class KeyedStreamToConnection {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        // source
        DataStreamSource<Tuple2<String, Integer>> dataStream1 = env.fromElements(new Tuple2<String, Integer>("北京", 1),
                new Tuple2<String, Integer>("南京", 2),
                new Tuple2<String, Integer>("广东", 3));

        DataStreamSource<Tuple2<String, Integer>> dataStream2 = env.fromElements(new Tuple2<String, Integer>("北京", 2),
                new Tuple2<String, Integer>("广东", 2),
                new Tuple2<String, Integer>("北京", 3));

        dataStream1.print("dataStream1:");
        dataStream2.print("dataStream2:");


        ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedDataStream = dataStream1.keyBy(data -> data.f0)
                .connect(dataStream2.keyBy(data -> data.f0));

        connectedDataStream.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map1(Tuple2<String, Integer> value) throws Exception {
                return Tuple2.of("dataStream1的" + value.f0, value.f1);
            }

            @Override
            public Tuple2<String, Integer> map2(Tuple2<String, Integer> value) throws Exception {
                return Tuple2.of("dataStream2的" + value.f0, value.f1);
            }
        }).print("key连接后的数据");

        env.execute();
    }
}

3、 双流联结

双流联结(Join)是基于时间的合流。

如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了两种内置的 join 算子,以及coGroup 算子。

窗口联结(Windown Join)

标签:flink,org,Flink,value,Tuple2,import,apache,操作,合流
From: https://www.cnblogs.com/Mr-Sponge/p/17037254.html

相关文章

  • Flink的状态
    State-理解原理即可Flink中状态的自动管理之前写的Flink代码中其实已经做好了状态自动管理,如发送hello,得出(hello,1)再发送hello,得出(hello,2)说明Flink已经自动......
  • chrome究极暗夜操作
    有时候由黑色背景切换到白色背景会莫名的刺眼,尤其是晚上的时候,这种场景主要体现在由编辑器到百度(面相cv编程)所以拥有一个全黑的浏览器,应该是非常炫酷的,之前尝试过不同......
  • Flink设置Source数据源
    流处理说明有边界的流boundedstream:批数据无边界的流unboundedstream:真正的流数据Source基于集合packagecom.pzb.source;importorg.apache.flink.api.co......
  • Flink的高级应用watermake理论
    Time/Watermarker时间分类EventTime的重要性和Watermarker的引入代码演示-开发版-掌握https://ci.apache.org/projects/flink/flink-docs-release-1.12/de......
  • CAD参照缩放怎么操作?CAD参照缩放教程
    CAD外部参照指可以将整个图形作为参照图形附着到当前的图形中,当插入的CAD外部参照尺寸不合适时,该如何调整呢?这个问题其实很简单,下面小编来给大家分享一下CAD参照缩放怎么操......
  • Flink的转换方法
    流处理说明Transformation基本操作map/flatMap/filter/keyBy/sum/reduce...和之前学习的Scala/Spark里面的一样的意思map方法、flatmap方法、keyBy方法、reduce方法m......
  • MySQL入门之查询操作
    单表查询SELECT[DISTINCT]*|<字段名1,字段名2,字段名3,...> FROM<表名> [WHERE<条件查询表达式1>] [GROUPBY<字段名>[HAVING<条件表达式2>]] [ORDERBY<字段......
  • MySQL入门之表和数据的操作
    数据表的操作创建数据表时,要先use数据库名;来确定要操作的数据库。创建数据表CREATE[TEMPORARY]TABLE[IFNOTEXISTS]表名(字段名字段类型[字段属性]...)[表选项]......
  • Flink处理函数
    ProcessFlink提供了8个不同的处理函数:(1)ProcessFunction最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。(2)KeyedProcessFunction对流按键分区......
  • 基于Flink CDC的现代数据栈 (Modern Data Stack)实现
    ......