首页 > 其他分享 >Flink:多流转换

Flink:多流转换

时间:2022-12-31 00:44:19浏览次数:33  
标签:转换 Flink 多流 long event Tuple2 Override new public

分流

将一条数据流拆分成完全独立的两条、甚至多条流。基于一个 DataStream,得到完全平等的多个子 DataStream。

SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
        .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                })
        );

// 定义辅助标签
OutputTag<Tuple3<String, String, Long>> marryTag = new OutputTag<Tuple3<String, String, Long>>("Marry") {};
OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob") {};

SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
    @Override
    public void processElement(Event event, ProcessFunction<Event, Event>.Context context, Collector<Event> collector) throws Exception {
        if (event.user.equals("Marry")) {
            context.output(marryTag, Tuple3.of(event.user, event.url, event.timestamp));
        } else if (event.user.equals("Bob")) {
            context.output(bobTag, Tuple3.of(event.user, event.url, event.timestamp));
        } else {
            collector.collect(event);
        }
    }
});

processStream.print("else");
processStream.getSideOutput(marryTag).print("Marry");
processStream.getSideOutput(bobTag).print("Bob");

基本合流操作

联合 Union

联合操作要求流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,且数据类型不变。

// union() 参数可以是多个,也就是可以实现多流合并
stream1.union(stream2, stream3, ...)

连接 Connect

连接流

为了处理更加灵活,连接操作允许流的数据类型不同。连接流可以看成是两条流形式上的“统一”,放在了同一个流中,事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream, 还需要进一步定义一个“同处理”转换操作,用来说明对于不同来源、不同类型的数据,怎样进行处理转换,得到统一的输出类型。

DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
DataStreamSource<Long> stream2 = env.fromElements(4L, 5L, 6L);

stream1.connect(stream2).map(new CoMapFunction<Integer, Long, String>() {
    @Override
    public String map1(Integer integer) throws Exception {
        return "Integer:" + integer.toString();
    }

    @Override
    public String map2(Long aLong) throws Exception {
        return "Long:" + aLong.toString();
    }
}).print();

对账案例

// 来自app的支付日志
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
        Tuple3.of("order-1", "app", 1000L),
        Tuple3.of("order-2", "app", 2000L),
        Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
            @Override
            public long extractTimestamp(Tuple3<String, String, Long> stringStringLongTuple3, long l) {
                return stringStringLongTuple3.f2;
            }
        }));

// 来自第三方支付平台的支付日志
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdStream = env.fromElements(
        Tuple4.of("order-1", "third-part", "success", 3000L),
        Tuple4.of("order-3", "third-part", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
            @Override
            public long extractTimestamp(Tuple4<String, String, String, Long> stringStringLongTuple4, long l) {
                return stringStringLongTuple4.f3;
            }
        }));

// 检测同一支付单在两条流中是否匹配
appStream.keyBy(data -> data.f0).connect(thirdStream.keyBy(data -> data.f0))
        .process(new OrderMatchResult())
        .print();
// 自定义实现CoProcessFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {
    // 定义状态变量,用来保存已经到达的事件
    private ValueState<Tuple3<String, String, Long>> appEventState;
    private ValueState<Tuple4<String, String, String, Long>> thirdEventState;

    @Override
    public void open(Configuration parameters) throws Exception {
        appEventState = getRuntimeContext().getState(
                new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
        );

        thirdEventState = getRuntimeContext().getState(
                new ValueStateDescriptor<Tuple4<String, String, String, Long>>("third-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG))
        );
    }

    @Override
    public void processElement1(Tuple3<String, String, Long> stringStringLongTuple3, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> collector) throws Exception {
        // 来的是app事件,看另一条流中是否来过
        if (thirdEventState.value() != null) {
            collector.collect("对账成功:" + stringStringLongTuple3 + " " + thirdEventState.value());
            // 清空状态
            thirdEventState.clear();
        } else {
            // 更新状态
            appEventState.update(stringStringLongTuple3);
            // 定义注册一个5s后的定时器,开始等待另一条流的事件
            context.timerService().registerEventTimeTimer(stringStringLongTuple3.f2 + 5000L);
        }
    }

    @Override
    public void processElement2(Tuple4<String, String, String, Long> stringStringStringLongTuple4, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context context, Collector<String> collector) throws Exception {
        // 来的是third事件,看另一条流中是否来过
        if (appEventState.value() != null) {
            collector.collect("对账成功:" + appEventState.value() +  " " + stringStringStringLongTuple4);
            // 清空状态
            appEventState.clear();
        } else {
            // 更新状态
            thirdEventState.update(stringStringStringLongTuple4);
            // 定义注册一个定时器,开始等待另一条流的事件
            context.timerService().registerEventTimeTimer(stringStringStringLongTuple4.f3);
        }
    }

    @Override
    public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        // 定时器触发,判断状态,如果某个状态不为空,说明另一条流中的事件未来
        if (appEventState.value() != null) {
            out.collect("对账失败:" + appEventState.value() + " " + "第三方支付平台信息未到");
        }
        if (thirdEventState.value() != null) {
            out.collect("对账失败:" + thirdEventState.value() + " " + "app信息未到");
        }
        appEventState.clear();
        thirdEventState.clear();
    }
}

双流联结 Join

窗口联结

Flink 为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键的数据放在窗口中进行配对处理。

首先需要调用 DataStream 的 join() 方法来合并两条流,得到一个 JoinedStreams。然后通过 where()equalTo() 方法指定两条流中联结的 key,再然后通过 window() 开窗并调用 apply() 传入联结窗口函数进行处理计算。

SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
        Tuple2.of("a", 1000L),
        Tuple2.of("b", 1000L),
        Tuple2.of("a", 2000L),
        Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        })
);

SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = env.fromElements(
        Tuple2.of("a", 3000),
        Tuple2.of("b", 4000),
        Tuple2.of("a", 4500),
        Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Integer> stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        })
);

stream1.join(stream2)
        .where(data -> data.f0)
        .equalTo(data -> data.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, Object>() {
            @Override
            public Object join(Tuple2<String, Long> stringLongTuple2, Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringLongTuple2 + "->" + stringIntegerTuple2;
            }
        }).print();

间隔联结

在有些场景下,我们要处理的时间间隔可能并不是固定的。间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。给定两个时间点,分别叫作间隔的“上界”和“下界”。

SingleOutputStreamOperator<Tuple2<String, Long>> orderStream = env.fromElements(
        Tuple2.of("Marry", 1000L),
        Tuple2.of("Alice", 5000L),
        Tuple2.of("Bob", 10000L),
        Tuple2.of("Alice", 20000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        })
);

SingleOutputStreamOperator<Event> clickStream = env.fromElements(
        new Event("Mary", "./home", 1000L),
        new Event("Bob", "./cart", 4000L),
        new Event("Alice", "./cart", 9000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        })
);

orderStream.keyBy(data -> data.f0)
        .intervalJoin(clickStream.keyBy(data -> data.user))
        .between(Time.seconds(-5), Time.seconds(5))
        .process(new ProcessJoinFunction<Tuple2<String, Long>, Event, String>() {
            @Override
            public void processElement(Tuple2<String, Long> stringLongTuple2, Event event, ProcessJoinFunction<Tuple2<String, Long>, Event, String>.Context context, Collector<String> collector) throws Exception {
                collector.collect(event + "->" + stringLongTuple2);
            }
        }).print();

窗口同组联结

与窗口联结不同的是,调用 apply() 方法定义具体操作时,传入的是一个 CoGroupFunction 函数类接口。

SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
        Tuple2.of("a", 1000L),
        Tuple2.of("b", 1000L),
        Tuple2.of("a", 2000L),
        Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        })
);

SingleOutputStreamOperator<Tuple2<String, Integer>> stream2 = env.fromElements(
        Tuple2.of("a", 3000),
        Tuple2.of("b", 4000),
        Tuple2.of("a", 4500),
        Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Integer> stringLongTuple2, long l) {
                return stringLongTuple2.f1;
            }
        })
);

stream1.coGroup(stream2)
        .where(data -> data.f0)
        .equalTo(data -> data.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
            @Override
            public void coGroup(Iterable<Tuple2<String, Long>> iterable, Iterable<Tuple2<String, Integer>> iterable1, Collector<String> collector) throws Exception {
                collector.collect(iterable + "->" + iterable1);
            }
        }).print();

标签:转换,Flink,多流,long,event,Tuple2,Override,new,public
From: https://www.cnblogs.com/fireonfire/p/17016135.html

相关文章

  • Flink:处理函数
    基本处理函数函数功能处理函数主要是定义数据流的转换工作。处理函数提供了一个“定时服务”,可以通过它访问流中的事件、时间戳、水位线,甚至可以注册“定时事件”。继承......
  • flink 实现 postgre-CDC
    一、前置工作1.修改postgresql配置文件  /data/pgsql/13/data/postgresql.conf相关配置:#更改wal日志方式为logicalwal_level=logical#minimal,replica,or......
  • 使用Spire.Doc来转换文本
    使用Spire.Doc来转换文本前段时间,我为不熟悉这个产品的读者们写了一篇关于我对 Spire.Doc的初识印象。 Spire.Doc是一个专业的Word.NET库,它是专门为开发人员设计的用来......
  • .NET Word 文件格式转换
    不同格式的文件有不同的应用领域和优势,因此在日常开发中我们需要对文件格式进行转换。在微软Word中,可以在“文件—另存为—保存类型”中实现Word文件格式的转换。这篇文章,旨......
  • java 转换指定文件夹文件编码工具
    importjava.io.*;publicclasstest{publicstaticvoidmain(String[]args){printFiles(newFile("./src"),1);}publicstaticvoidpr......
  • 模拟量直流信号隔离放大转换器0-75mV/0-±10V/0-5V转0-±100mV/0-20mA/4-20mAPCB模块
    ​概述:导轨安装DIN11IPOOC系列模拟信号隔离放大器是一种将输入信号隔离放大、转换成按比例输出的直流信号混合集成厚模电路。产品广泛应用在电力、远程监控、仪器仪表、医......
  • yoloV1 bbox标签转换为yolo
    importnumpyasnpdefconvert_bbox2labels(bboxes):""":parambboxes:(N,5)的bbox信息列表:return:(30,7,7)的yolov1格式的label,需要将(cls_index,dx......
  • VBA 42 数据类型与转换
    注意:使用VBA.TypeName()判断单元格内容的数据类型时,单元格一定要使用value属性。(切记切记,否则返回的类型是Range)判断是否为空VBA.IsEmpty()VBA.TypeName() 判断是......
  • 如何在 Pandas 中将对象转换为浮点数(附示例)
    您可以使用以下方法之一将pandasDataFrame中的列从对象转换为浮点数: Method1:Useastype()df['column_name']=df['column_name'].astype(float)Method2:Use......
  • #yyds干货盘点# LeetCode程序员面试金典:整数转换
    题目:整数转换。编写一个函数,确定需要改变几个位才能将整数A转成整数B。示例1:输入:A=29(或者0b11101),B=15(或者0b01111)输出:2示例2:输入:A=1,B=2输出:2代码实现:classS......