首页 > 其他分享 >Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

Flink 实时数仓(十)【DWS 层搭建(四)交易域汇总表创建】

时间:2024-08-08 19:28:38浏览次数:12  
标签:数仓 String DWS Flink public Override new TODO id

前言

        今天完成 DWS 层交易域剩下的两个指标,估计一早上就完了,这两个需求用到的知识点和昨天的需求差不多;

1、交易域省份粒度下单各窗口汇总表

1.1、思路分析

        这个需求是比较简单的,province_id 字段是订单表中的字段,在 DWD 层的下单事务事实表中我们已经将该字段保留下来了,所以我们只需要读取 DWD 层的下单事务事实表即可;考虑到下单事务事实表来源于订单与处理表,而这张表在生成时需要和 活动、优惠券进行 left join,所以依然是迟到数据造成的数据重复问题,不过依然不影响,因为那俩表的字段我们用不上,所以我们只需要过滤出第一条数据即可完成去重;

  • 消费 dwd_trade_order_detail
  • 转为 JSON 流
  • 第一次去重(取迟到数据中的第一条即可)
  • 转为 JavaBean 流
  • 提取事件时间(取 create_time)并设置水位线
  • 按照粒度(province_id)分组
  • 开窗(依旧是 10s 的滚动窗口)
  • 聚合(依旧是增量聚合 + 全量聚合)
  • 关联省份信息补全 province_name
  • 写出到 clickhouse

1.2、代码实现

1.2.1、创建 ck 表及其实体类

字段依然是 粒度 + 窗口起止时间 + 度量值

create table if not exists dws_trade_province_order_window
(
    stt           DateTime,
    edt           DateTime,
    province_id   String,
    province_name String,
    order_count   UInt64,
    order_amount  Decimal(38, 20),
    ts            UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt, province_id);
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.Set;

@Data
@AllArgsConstructor
@Builder
public class TradeProvinceOrderWindow {
    // 窗口起始时间
    String stt;

    // 窗口结束时间
    String edt;

    // 省份 ID
    String provinceId;

    // 省份名称
    @Builder.Default
    String provinceName = "";

    // 累计下单次数
    Long orderCount;

    // 订单 ID 集合,用于统计下单次数
    @TransientSink
    Set<String> orderIdSet;

    // 累计下单金额
    Double orderAmount;

    // 时间戳
    Long ts;
}

1.2.2、读取下单事务事实表并转为 JSON 流

// TODO 2. 读取 kafka dwd_trade_order_detail
        String groupId = "dws_trade_province_order_window";
        DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_detail", groupId));

        // TODO 3. 转为 JSON 流
        SingleOutputStreamOperator<JSONObject> jsonDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    // 可以选择输出到侧输出流
                    e.printStackTrace();
                }
            }
        });

1.2.3、第一次去重(上游 left join 迟到数据)

这里因为我们的数据和 left join 的右表无关,所以直接取第一条即可

// TODO 4. 第一次去重(根据 order_detail_id 进行分组)
        KeyedStream<JSONObject, String> keyedByIdStream = jsonDS.keyBy(json -> json.getString("id"));
        SingleOutputStreamOperator<JSONObject> filterDS = keyedByIdStream.filter(new RichFilterFunction<JSONObject>() {
            // 用来存储第一条数据,之后的数据来了全部丢弃(上游 left join 迟到的数据)
            private ValueState<String> state;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(5))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .build();

                ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("first-value", String.class);
                stateDescriptor.enableTimeToLive(ttlConfig);
                state = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public boolean filter(JSONObject value) throws Exception {
                String data = state.value();
                if (data == null) {
                    state.update("1"); //  随便存就行
                    return true;
                }
                return false;
            }
        });

1.2.4、转为 JavaBean 流并设置水位线

转 JavaBean 流时,取 create_time 作为 ts 字段,以供下面提取它为事件时间(虽然之后还会把它设为系统时间作为 ck 表的版本字段)

// TODO 5. 转为 JavaBean 流
        SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderDS = filterDS.map(new MapFunction<JSONObject, TradeProvinceOrderWindow>() {
            @Override
            public TradeProvinceOrderWindow map(JSONObject value) throws Exception {
                HashSet<String> orderIds = new HashSet<>();
                orderIds.add(value.getString("order_id"));

                return TradeProvinceOrderWindow.builder()
                        .orderAmount(value.getDouble("split_total_amount"))
                        .orderIdSet(orderIds)
                        .ts(DateFormatUtil.toTs(value.getString("create_time"),true))
                        .provinceId(value.getString("province_id"))
                        .build();
            }
        });

        // TODO 6. 提取事件时间并设置水位线
        SingleOutputStreamOperator<TradeProvinceOrderWindow> provinceOrderWithWmDS = provinceOrderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeProvinceOrderWindow>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<TradeProvinceOrderWindow>() {
                    @Override
                    public long extractTimestamp(TradeProvinceOrderWindow element, long recordTimestamp) {
                        return element.getTs();
                    }
                }));

1.2.5、分组开窗聚合

按照粒度分组,度量字段求和,并在窗口闭合后补充窗口起始时间和结束时间,时间戳字段设置为当前时间:

//  TODO 7. 分组开窗聚合
        SingleOutputStreamOperator<TradeProvinceOrderWindow> reduceDS = provinceOrderDS.keyBy(TradeProvinceOrderWindow::getProvinceId)
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeProvinceOrderWindow>() {
                    @Override
                    public TradeProvinceOrderWindow reduce(TradeProvinceOrderWindow value1, TradeProvinceOrderWindow value2) throws Exception {
                        value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                        value1.setOrderAmount(value1.getOrderAmount() + value2.getOrderAmount());
                        return value1;
                    }
                }, new WindowFunction<TradeProvinceOrderWindow, TradeProvinceOrderWindow, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<TradeProvinceOrderWindow> input, Collector<TradeProvinceOrderWindow> out) throws Exception {
                        TradeProvinceOrderWindow next = input.iterator().next();

                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setTs(System.currentTimeMillis());
                        next.setOrderCount((long) next.getOrderIdSet().size());

                        out.collect(next);
                    }
                });

1.2.6、关联维表 dim_base_province 并写出到 ck

// TODO 8. 关联维保 province_info
        SingleOutputStreamOperator<TradeProvinceOrderWindow> resultDS = AsyncDataStream.unorderedWait(reduceDS, new DimAsyncFunction<TradeProvinceOrderWindow>("DIM_BASE_PROVINCE") {
            @Override
            public String getKey(TradeProvinceOrderWindow input) {
                return input.getProvinceId();
            }

            @Override
            public void addAttribute(TradeProvinceOrderWindow pojo, JSONObject dimInfo) {
                pojo.setProvinceName(dimInfo.getString("NAME"));
            }
        }, 100, TimeUnit.SECONDS);

        // TODO 9. 写出到 clickhouse
        reduceDS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_province_order_window values (?,?,?,?,?,?)"));

        // TODO 10. 启动任务
        env.execute("DwsTradeProvinceOrderWindow");

2、交易域品牌-品类-用户粒度退单各窗口汇总表

2.1、思路分析

这里是退单,DWD 层退单事务事实表的数据仅来自 ODS 中过滤出来的退单数据、字典表(字典表被关联两次,一次获取退单类型、一次获取退单原因)等

  • 从 dwd_trade_order_refund 读取数据
  • 转为 JavaBean 流(String -> JSON -> JavaBean)
  • 关联维表 sku_info 补充 tm_id、category3_id 字段
  • 提取事件时间生成水位线
  • 分组(按照粒度)开窗聚合
  • 关联维表
    • 关联base_trademark 获得 name
    • 关联 base_category3、base_category2、base_category1 获得 id 和 name
  • 写出到 clickhouse

2.2、代码实现

2.2.1、创建 ck 表及其 JavaBean

create table if not exists dws_trade_trademark_category_user_refund_window
(
    stt            DateTime,
    edt            DateTime,
    trademark_id   String,
    trademark_name String,
    category1_id   String,
    category1_name String,
    category2_id   String,
    category2_name String,
    category3_id   String,
    category3_name String,
    user_id        String,
    refund_count   UInt64,
    ts             UInt64
) engine = ReplacingMergeTree(ts)
      partition by toYYYYMMDD(stt)
      order by (stt, edt, trademark_id, trademark_name, category1_id,
                category1_name, category2_id, category2_name, category3_id, category3_name, user_id);
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;

import java.util.Set;

@Data
@AllArgsConstructor
@Builder
public class TradeTrademarkCategoryUserRefundBean {
    // 窗口起始时间
    String stt;
    // 窗口结束时间
    String edt;
    // 品牌 ID
    String trademarkId;
    // 品牌名称
    String trademarkName;
    // 一级品类 ID
    String category1Id;
    // 一级品类名称
    String category1Name;
    // 二级品类 ID
    String category2Id;
    // 二级品类名称
    String category2Name;
    // 三级品类 ID
    String category3Id;
    // 三级品类名称
    String category3Name;

    // 订单 ID
    @TransientSink
    Set<String> orderIdSet;

    // sku_id
    @TransientSink
    String skuId;

    // 用户 ID
    String userId;
    // 退单次数
    Long refundCount;
    // 时间戳
    Long ts;
}

2.2.2、读取退单事务事实表并转换格式

// TODO 2. 读取 kafka dwd_trade_order_detail
        String groupId = "dws_trade_trademark_category_user_refund_window";
        DataStreamSource<String> orderDetailDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("dwd_trade_order_refund", groupId));

        // TODO 3. 转为 JSON 流
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> tmCateUserDS = orderDetailDS.flatMap(new RichFlatMapFunction<String, TradeTrademarkCategoryUserRefundBean>() {
            @Override
            public void flatMap(String value, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
                try {
                    JSONObject jsonObject = JSONObject.parseObject(value);

                    HashSet<String> orderIds = new HashSet<>();
                    orderIds.add(jsonObject.getString("order_id"));

                    out.collect(TradeTrademarkCategoryUserRefundBean.builder()
                            .userId(jsonObject.getString("user_id"))
                            .ts(jsonObject.getString(DateFormatUtil.toTs("create_time",true)))
                            .skuId(jsonObject.getString("sku_id"))
                            .orderIdSet(orderIds)
                            .build()
                    );
                } catch (Exception e) {
                    // 可以选择输出到侧输出流
                    e.printStackTrace();
                }
            }
        });

2.2.3、关联维表 sku_info

关联维表 sku_info 补充 tm_id、category3_id 字段

// TODO 4. 关联维表 sku_info 补充 tm_id、category3_id 字段
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withSkuCateUserIdDS = AsyncDataStream.unorderedWait(tmCateUserDS, new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_SKU_INFO") {
            @Override
            public String getKey(TradeTrademarkCategoryUserRefundBean input) {
                return input.getSkuId();
            }

            @Override
            public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
                pojo.setCategory3Id(dimInfo.getString("CATEGORY3_ID"));
                pojo.setTrademarkId(dimInfo.getString("TM_ID"));
            }
        }, 60 * 5, TimeUnit.SECONDS);

2.2.4、设置水位线并分组开窗聚合

// TODO 5. 设置水位线
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> withWmDS = withSkuCateUserIdDS.assignTimestampsAndWatermarks(WatermarkStrategy.<TradeTrademarkCategoryUserRefundBean>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<TradeTrademarkCategoryUserRefundBean>() {
                    @Override
                    public long extractTimestamp(TradeTrademarkCategoryUserRefundBean element, long recordTimestamp) {
                        return element.getTs();
                    }
                }));

        // TODO 6. 分组(按照粒度)开窗聚合
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceDS = withWmDS.keyBy(data -> Tuple3.of(data.getCategory3Id(), data.getTrademarkId(), data.getUserId()))
                .window(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10)))
                .reduce(new ReduceFunction<TradeTrademarkCategoryUserRefundBean>() {
                    @Override
                    public TradeTrademarkCategoryUserRefundBean reduce(TradeTrademarkCategoryUserRefundBean value1, TradeTrademarkCategoryUserRefundBean value2) throws Exception {
                        value1.getOrderIdSet().addAll(value2.getOrderIdSet());
                        return value1;
                    }
                }, new WindowFunction<TradeTrademarkCategoryUserRefundBean, TradeTrademarkCategoryUserRefundBean, Tuple3<String, String, String>, TimeWindow>() {
                    @Override
                    public void apply(Tuple3<String, String, String> stringStringStringTuple3, TimeWindow window, Iterable<TradeTrademarkCategoryUserRefundBean> input, Collector<TradeTrademarkCategoryUserRefundBean> out) throws Exception {
                        TradeTrademarkCategoryUserRefundBean next = input.iterator().next();
                        next.setTs(System.currentTimeMillis());
                        next.setStt(DateFormatUtil.toYmdHms(window.getStart()));
                        next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));
                        next.setRefundCount((long) next.getOrderIdSet().size());
                        out.collect(next);
                    }
                });

2.2.5、关联其它维表并写出数据到 ck

// TODO  7. 关联其它维表
        // TODO 7.2 关联 tm
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithTmDS = AsyncDataStream.unorderedWait(reduceDS,
                new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_TRADEMARK") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserRefundBean input) {
                        return input.getTrademarkId();
                    }

                    @Override
                    public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
                        pojo.setTrademarkName(dimInfo.getString("TM_NAME"));
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 7.3 关联 category3
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate3DS = AsyncDataStream.unorderedWait(reduceWithTmDS,
                new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY3") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserRefundBean input) {
                        return input.getCategory3Id();
                    }

                    @Override
                    public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
                        pojo.setCategory3Name(dimInfo.getString("NAME"));
                        pojo.setCategory2Id("CATEGORY2_ID");
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 7.4 关联 category2
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate2DS = AsyncDataStream.unorderedWait(reduceWithCate3DS,
                new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY2") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserRefundBean input) {
                        return input.getCategory2Id();
                    }

                    @Override
                    public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
                        pojo.setCategory2Name(dimInfo.getString("NAME"));
                        pojo.setCategory1Id("CATEGORY1_ID");
                    }
                },
                100, TimeUnit.SECONDS);
        // TODO 7.5 关联 category1
        SingleOutputStreamOperator<TradeTrademarkCategoryUserRefundBean> reduceWithCate1DS = AsyncDataStream.unorderedWait(reduceWithCate2DS,
                new DimAsyncFunction<TradeTrademarkCategoryUserRefundBean>("DIM_BASE_CATEGORY1") {
                    @Override
                    public String getKey(TradeTrademarkCategoryUserRefundBean input) {
                        return input.getCategory1Id();
                    }

                    @Override
                    public void addAttribute(TradeTrademarkCategoryUserRefundBean pojo, JSONObject dimInfo) {
                        pojo.setCategory1Name(dimInfo.getString("NAME"));
                    }
                },
                100, TimeUnit.SECONDS);
        
        // TODO 8. 写出到 clickhouse
        reduceWithCate1DS.addSink(ClickHouseUtil.getSinkFunction("insert into dws_trade_trademark_category_user_refund_window values(?,?,?,?,?,?,?,?,?,?,?,?,?)"));

        // TODO 9. 启动任务
        env.execute("DwsTradeTrademarkCategoryUserRefundWindow");

总结

        至此,DWS 层搭建完毕,总的来说每一层的逻辑差不太多,难点就是对实时数据的抽象理解;接下来就是 ADS 层,今天搞定它!

标签:数仓,String,DWS,Flink,public,Override,new,TODO,id
From: https://blog.csdn.net/m0_64261982/article/details/141017854

相关文章

  • Flink开发:Java vs. Scala - 代码对比分析,选择你的最佳拍档
    一、引言1.1Flink简介ApacheFlink是一个开源的流处理框架,它支持高吞吐量、低延迟以及复杂的事件处理。Flink的核心是一个流式数据流执行引擎,它的针对数据流的分布式计算提供了数据分发、通信、容错机制。Flink提供了多种API,包括DataStreamAPI(用于构建流处理程序)、D......
  • Flink 1.20 最新版本 Windows本地运行
    ApacheFlink1.20是Flink的一个较新版本,它带来了许多改进和新功能,如物化表、统一的检查点文件合并机制等。然而,关于Flink1.20在Windows本地运行的具体步骤,虽然Flink本身是跨平台的,但官方文档和社区资源可能更多地关注于Linux环境下的部署和配置。不过,基于Flin......
  • 最佳实践:解读GaussDB(DWS) 统计信息自动收集方案
    摘要:现在商用优化器大多都是基于统计信息进行查询代价评估,因此统计信息是否实时且准确对查询影响很大,特别是分布式数据库场景。本文详细介绍GaussDB(DWS)如何实现了一种轻量、实时、准确的统计信息自动收集方案。本文分享自华为云社区《【最佳实践】GaussDB(DWS)统计信息自动收......
  • 4、Flink SQL 与 DataStream API 集成处理 Insert-Only 流详解
    处理Insert-Only流StreamTableEnvironment提供以下方法来从DataStream转换和转换到DataStream:fromDataStream(DataStream):将insert-only和任意类型的流转换为表,默认情况下不传播事件时间和水印。fromDataStream(DataStream,Schema):将insert-only和任意类型......
  • Flink实战(10)-checkpoint容错保证
    0前言程序在Flink集群运行,某个算子因为某些原因出现故障,如何处理在故障恢复后,如何保证数据状态,和故障发生之前的数据状态一致?1什么是checkpoint(检查点)?Checkpoint能生成快照(Snapshot)。若Flink程序崩溃,重新运行程序时可以有选择地从这些快照进行恢复。Checkpoin......
  • 数仓建模。传统三范式建模和维度建模 详细篇
    数据仓库建模,说白了就是建库建表目录一,三范式建模一,三范式建模的概叙二,三范式建模的作用三,三范式建模的定义四,三范式建模的举例二,维度建模的概念与定义一,维度建模的优势二,维度建模的分类三,维度建模的举例四,维度建模的具体示例:三,三范式建模和维度建模的......
  • Apache Flink开发时选择Java还是Scala作为编程语言
    在ApacheFlink的开发过程中,选择Java还是Scala作为编程语言是一个重要的决策点。这两种语言各有其独特的优势和特点,适合不同的开发场景和需求。以下是对这一选择的详细探讨,旨在帮助开发者更好地理解并做出合理的选择。一、ApacheFlink简介ApacheFlink是一个开源的分布式......
  • rancher2.5.9部署flink1.13.1集群问题求教
    下面是我在rancher页面配置flink的yaml:apiVersion:batch/v1kind:Jobmetadata:name:flink-jobmanagernamespace:flink-resourcespec:template:metadata:labels:app:flinkcomponent:jobmanagerspec:restartPolicy:OnFailurecontainers:-name:jobmanagerima......
  • ETL数据集成丨将GreenPlum数据同步至Doris数仓
    在当今数据驱动的时代,高效、可靠的数据集成成为企业数字化转型的关键一环。ETLCloud作为一款创新的数据集成平台,通过其强大的零代码配置能力,为企业提供了从数据抽取、转换到加载(ETL)的全链条解决方案,尤其在跨系统数据迁移方面展现出显著优势。本次实践通过将GreenPlum数据库的数据......
  • 深入剖析Apache Flink的状态后端
    ApacheFlink的状态后端是其状态管理的核心组件,负责存储和管理Flink程序的状态信息。状态后端的选择直接影响到Flink程序的容错能力、性能以及与外部系统的集成能力。本文将详细介绍Flink中的不同状态后端,包括它们的工作原理、特点、适用场景以及如何配置和使用。一、Flink......