首页 > 其他分享 >Flink 合流操作——Union

Flink 合流操作——Union

时间:2022-09-03 15:22:29浏览次数:61  
标签:Union Flink 合并 union 水位 split 合流 public

应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

Union

最简单的合流操作就是直接将多条流合在一起,这种操作称作为流的 “联合”(union) ,如下图所示,进行联合操作的流的数据类型需要保持一致,合并之后的流会包含所有流种的元素,数据类型保持不变,这种合流操作简单除暴。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

stream1.union(stream2,stream3...)

注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。这里需要考虑一个问题。在事件时间语义下,水位线是时间的进度标志;不同的流中可能水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?还以要考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。换句话说,多流合并时处理的时效性是以最慢的那个流为准的。我们自然可以想到,这与之前介绍的并行任务水位线传递的规则是完全一致的;多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。

参考代码

public class UnionStreamTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> eventStream = env.socketTextStream("hadoop103", 9999)
                .map(data -> {
                    String[] split = data.split(",");
                    return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp * 10000;
                            }
                        }));

        eventStream.print("eventStream   ");

        SingleOutputStreamOperator<Event> eventStream1 = env.socketTextStream("hadoop103", 8888)
                .map(data -> {
                    String[] split = data.split(",");
                    return new Event(split[0].trim(), split[1].trim(), Long.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp * 10000;
                            }
                        }));

        eventStream1.print("eventStream1   ");
        /**
         * union 可以合并多个流,数据类型需要一样,合并后水位线 取最小的那个
         */
        eventStream.union(eventStream1).process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                out.collect("水位线 " + ctx.timerService().currentWatermark());
            }
        }).print();

          env.execute();

    }
}

这里为了更清晰地看到水位线的进展,我们创建了两条流来读取socket文本数据,并从数据中提取时间戳作为生成水位线的依据。用union将两条流合并后,用一个ProcessFunction来进行处理,获取当前的水位线进行输出。我们会发现两条流中每输入一个数据,合并之后的流中都会有数据出现;而水位线只有在两条流中水位线最小值增大的时候,才会真正向前推进。我们可以来分析一下程序的运行:在合流之后的ProcessFunction对应的算子任务中,逻辑时钟的初始状态如下图所示

 

由于Flink会在流的开始处,插入一个负无穷大(Long.MIN_VALUE)的水位线,所以合流后的ProcessFunction对应的处理任务,会为合并的每条流保存一个“分区水位线”,初始值都是Long.MIN_VALUE;而此时算子任务的水位线是所有分区水位线的最小值,因此也是Long.MIN_VALUE。我们在第一条socket文本流输入数据[Alice, ./home, 1000] 时,水位线不会立即改变,只有到水位线生成周期的时间点(200ms一次)才会推进到1000 -1 = 999毫秒;这与我们在7.3.2小节中对事件时间定时器的测试是一致的。不过即使第一条水位线推进到了999,由于另一条流没有变化,所以合流之后的Process任务水位线仍然是初始值。如下图所示。

如果这时我们在第二条socket文本流输入数据[Alice, ./home, 2000],那么第二条流的水位线会随之推进到2000 –1 = 1999毫秒,Process任务所保存的第二条流分区水位线更新为1999;这样两个分区水位线取最小值,Process任务的水位线也就可以推进到999了。如下图所示。

 进而如果我们继续在第一条流中输入数据[Alice, ./home, 3000],Process任务的第一条流分区水位线就会更新为2999,同时将算子任务的时钟推进到1999。状态如下图所示

标签:Union,Flink,合并,union,水位,split,合流,public
From: https://www.cnblogs.com/wdh01/p/16643064.html

相关文章

  • Java 使用flink读写kafka中的数据(windows下)
    一、启动服务(网上查)1、启动zookeeper2、启动kafka3、启动flink二、写producerpublicvoidkafkaProducer(List<ResultBean>opcValue)throwsException{......
  • prometheus监控flink
    背景很久没写博客了,今天也算完成了一个小测试。由于flink没有监控的平台,只是自己写了python脚本去监控发报警。flink自己的ui界面其实已经有很多的指标可以看了,但是......
  • 使用docker-compose搭建flink集群
    第一步:安装docker和docker-compose并赋予权限第二步:利用docker-compose构建容器1version:"2.1"2services:3jobmanager:4image:flink:1.9.2-scal......
  • Flink常见面试题总结
     1、面试题一:应用架构问题:公司怎么提交的实时任务,有多少JobManager、TaskManager?解答:(1)我们使用yarnsession模式提交任务;另一种方式是每次提交都会创建一个新......
  • 01-Flink概述
    1.源起和设计理念https://flink.apache.org/在Flink官网主页的顶部可以看到,项目的核心目标,是“数据流上的有状态计算”(StatefulComputationsoverDataStreams)。......
  • Flink面试题
    1.什么是ApacheFlink(为什么使用Flink替代Spark?)        ApacheFlink是一个开源的基于流的有状态计算框架。它是分布式地执行的,具备低延迟、高吞吐的优秀性......
  • Flink CDC 高频面试题
      1cdc简介CDC(ChangeDataCapture)是一种用于捕捉数据库变更数据的技术,Flink从1.11版本开始原生支持CDC数据(changelog)的处理,目前已经是非常成熟的变更......
  • 5分钟搞定 关系型数据库 到 Flink 数据同步
    简述实时数据处理领域中,使用Flink方式,除了从日志服务订阅埋点数据外,总离不开从关系型数据库订阅并处理相关业务数据,这时就需要监测并捕获数据库增量数据,将变更按发生的......
  • Flink自定义MySQLSink批量写入出现死锁解决
    一、错误日志Cause:java.sql.SQLException:Lockwaittimeoutexceeded;tryrestartingtransaction;二、原因分析在同一批次、同一事务中操作相同主键的数据,......
  • Flink数仓项目常见问题总结
    Flink数仓项目常见问题总结 一、开发中的常见bug 1、OutputTag的对象新建问题缺少花括号   Exceptioninthread"main"org.apache.flink.api.common.functi......