首页 > 其他分享 >全国职业院校技能大赛-大数据应用赛项-离线数据处理-备赛笔记04-2024省赛离线数据处理专项训练

全国职业院校技能大赛-大数据应用赛项-离线数据处理-备赛笔记04-2024省赛离线数据处理专项训练

时间:2025-01-04 15:58:16浏览次数:3  
标签:val 离线 职业院校 dwd time 数据处理 import spark order

数据抽取:

1、 抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.customer_inf命令;

2、抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.product_info命令;

3、抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_master命令;

4、抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_detail命令。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}

import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar

object prac1 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME","root")
    Logger.getLogger("org").setLevel(Level.ERROR)
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("task1")
    val spark = new SparkSession.Builder().config(sparkConf)
    .config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
    .config("spark.sql.shuffle.partitions", "1500")
    .config("dfs.replications", "3")
    .config("hive.exec.dynamic.partition.mode", "nonstrict")
    .config("hive.exec.max.dynamic.partitions", "3000")
    .enableHiveSupport()
    .getOrCreate()
    val table=Array("customer_inf","product_info","order_master","order_detail")
    table.foreach(x=>{
      println(s"===================抽取数据:$x=====================")
      val mysql = new Tools().loadMysql(spark, x).cache()
      mysql.show()
      println("====================获取增量字段=====================")
      var incre: Any = null
      val simple = new SimpleDateFormat("yyyyMMdd")
      val calendar=Calendar.getInstance()
      calendar.add(Calendar.DATE,-1)
      val etl_date = simple.format(calendar.getTime)
      val simple1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      if (x=="customer_inf"){
        incre = spark.table(s"ods.customer_inf").select(max("modified_time")).collect()(0).get(0)
        if (incre==null){
          incre=Timestamp.valueOf(simple1.parse("0").getTime.toString)
        }
      }
      else {
        incre = spark.table(s"ods.$x").select(max("modified_time")).collect()(0).get(0)
        if (incre==null){
          incre=Timestamp.valueOf(simple1.parse("0").getTime.toString)
        }
      }
      println(s"======================Final incre:$incre=======================")
      mysql.where(col("modified_time")>incre)
      .withColumn("etl_date",lit(etl_date).cast("string"))
      //        .show()
      .write.format("hive").mode("append").partitionBy("etl_date").saveAsTable(s"ods.$x")
    })
    spark.close()
  }
}
class Tools {
  def loadMysql(spark:SparkSession,table:String): DataFrame ={
    val prop=Map(
      "url"->"jdbc:mysql://node1/ds_db01?useSSL=false",
      "driver"->"com.mysql.jdbc.Driver",
      "user"->"root",
      "password"->"8888",
      "dbtable"->table
    )
    spark.read.format("jdbc")
    .options(prop)
    .load()
  }
}

数据清洗:

1、 抽取ods库中customer_inf表中昨天的分区(任务一生成的分区)数据,并结合dim_customer_inf最新分区现有的数据,根据customer_id合并数据到dwd库中dim_customer_inf的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以customer_id为合并字段,根据modified_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_customer_inf命令;

2、 抽取ods库中product_info表中昨天的分区(任务一生成的分区)数据,并结合dim_product_info最新分区现有的数据,根据product_id合并数据到dwd库中dim_product_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以product_id为合并字段,根据modified_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_product_info命令;

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.text.SimpleDateFormat
import java.util.Calendar

object task2 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = new Tools().createSpark()
    val tables=Array("customer_inf","product_info")
    val dwdtables=Array("dim_customer_inf","dim_product_info","fact_order_master","fact_order_detail")

    tables.foreach(tbl=>{
      merge(spark,tbl,dwdtables(0))
    })
    spark.close()
  }
  def merge(spark: SparkSession, table: String,dwdtable:String): Unit ={
    val calendar=Calendar.getInstance()
    val nowtime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(calendar.getTime)
    calendar.add(Calendar.DATE,-1)
    val etl_date=new SimpleDateFormat("yyyyMMdd").format(calendar.getTime)
    val odsFrame: DataFrame = spark.table(table)
    .where("etl_date=20241217")
    .drop("etl_date")
    .withColumn("dwd_insert_user",lit("user1"))
    .withColumn("dwd_insert_time",lit(nowtime).cast("timestamp"))
    .withColumn("dwd_modify_user",lit("user1"))
    .withColumn("dwd_modify_time",lit(nowtime).cast("timestamp"))
    .cache()
    val dwdFrame: DataFrame = spark.table(dwdtable).drop("etl_date").cache()
    if (table=="customer_inf"){
      val mergeFrame: Dataset[Row] = odsFrame.unionByName(dwdFrame)
      val result: DataFrame = mergeFrame.withColumn("sequence", row_number().over(Window.partitionBy("customer_id").orderBy(desc("modified_time"))))
      .withColumn("dwd_insert_time", min("dwd_insert_time").over(Window.partitionBy("customer_id")))
      .where("sequence = 1")
      .drop("sequence")
      .withColumn("etl_date",lit(etl_date))
      result.show()
    }
    else {
      val mergeFrame: Dataset[Row] = odsFrame.unionByName(dwdFrame)
      val result: DataFrame = mergeFrame.withColumn("sequence", row_number().over(Window.partitionBy("product_id").orderBy(desc("modified_time"))))
      .withColumn("dwd_insert_time", min("dwd_insert_time").over(Window.partitionBy("customer_id")))
      .where("sequence = 1")
      .drop("sequence")
      .withColumn("etl_date",lit(etl_date))
      result.show()
    }
  }
}

指标计算:

1、 编写Scala工程代码,根据dwd的订单表,求各省份下单时间为2022年的支付转化率,并将计算结果写入clickhouse的ds_result库的表。在Linux的clickhouse命令行中根据ranking字段查询出转化率前三的省份,将SQL语句与执行结果截图粘贴至客户端桌面【Release\模块D提交结果.docx】中对应的任务序号下;

支付转化率=完成订单数/总订单数*100%

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{desc, lit, row_number}

object selct {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = new Tools().createSpark()
    //Task1
    // 省份 2022 订单总数 已支付订单
    // 订单总数
    spark.sql(
      """
        |select
        |  province,
        |  count(order_sn) as totalCount
        |from(
        |  select
        |    order_sn,province
        |  from dwd.fact_order_master
        |  where order_status="已下单"
        |  and cast(substring(create_time,1,4) as string)="2022"
        |)
        |group by province
        |""".stripMargin).createTempView("totalOrder")
    // 已支付订单
    spark.sql(
      """
        |select
        |  province,
        |  count(order_sn) as finishCount
        |from(
        |  select
        |    order_sn,province
        |  from dwd.fact_order_master
        |  where order_sn not in(
        |    select order_sn
        |    from dwd.fact_order_master
        |    where order_status="已退款"
        |  )
        |  and order_status="已下单"
        |  and cast(substring(create_time,1,4) as string)="2022"
        |)
        |group by province
        |""".stripMargin)
    .createTempView("finishOrder")

    spark.table("finishOrder").show()
    println("====================================")
    spark.table("totalOrder").show()

    val result = spark.sql(
      """
        |select
        |  t.province as province,
        |  t.totalCount as totalCount,
        |  f.finishCount as finalCount,
        |  round(f.finishCount/t.totalCount,3) as ranking
        |from totalOrder t join finishOrder f
        |on t.province=f.province
        |""".stripMargin)
    val frame: DataFrame = result.withColumn("grp",lit("grp"))
    .withColumn("ranking", row_number().over(Window.partitionBy("grp").orderBy(desc("ranking"))))
    .drop("grp")
    frame.show()
    val prop=Map(
      "url"->"jdbc:clickhouse://node1:8123/test1",
      "driver"->"ru.yandex.clickhouse.ClickHouseDriver",
      "user"->"default",
      "password"->"8888",
      "dbtable"->"paytran"
    )
    frame.show()
    frame.write
    .format("jdbc")
    .mode("append")
    .options(prop)
    .save()
    spark.close()
  }
}

标签:val,离线,职业院校,dwd,time,数据处理,import,spark,order
From: https://blog.csdn.net/2302_80311632/article/details/144931037

相关文章

  • 2024年河北省职业院校技能大赛云计算应用赛项赛题第3套(容器云)
    #需要资源(软件包及镜像)或有问题的,可私聊博主!!!#需要资源(软件包及镜像)或有问题的,可私聊博主!!!#需要资源(软件包及镜像)或有问题的,可私聊博主!!!模块二容器云(50分)        企业构建Kubernetes容器云集群,引入KubeVirt实现OpenStack到Kubernetes的全面转型,用Kubernetes来管一切......
  • 离线环境一步部署OCR文字识别程序
    前言百度、阿里等的OCR接口需要联网环境,并且超过免费/试用次数后需要付费。一般政务项目因为信息安全要求都部署在独立内网,有没有离线免费的OCR实现方案?下文基于EasyOCR实现一步部署,可下载直接使用。EasyOCREasyOCR支持离线部署,可免费使用(支持Apache-2.0license协议),并且在配置......
  • 绿色免费离线版JS加密混淆工具 - 支持全景VR加密, 小程序js加密,
    自从我们推出在线版的免费JS加密混淆工具以来,受到了广大用户的热烈欢迎。特别是全景开发人员,他们使用该工具加密VR插件的JS代码,添加域名锁等,都非常有效地保护了插件的代码资源。最近,我们收到了许多用户的反馈,大家希望能够提供一款桌面版的JS加密混淆工具,以便在离线状态下使用。......
  • 离线下载1.23.17版本k8s镜像、插件
    1.离线部署说明由于项目运行在内网环境,无法直接在线安装Kubernetes,因此需要提前离线下载所需的镜像、工具和配置文件,并在内网环境中进行部署2.纯净相同系统的服务器,下载k8s相关组件、镜像等2.1配置阿里云yum源curl-o/etc/yum.repos.d/CentOS-Base.repohttps://mirrors.al......
  • 离线服务器配置nfs目录
    1、安装软件yuminstall nfs-utilsyuminstall rpcbind 2、设置共享nfs目录mkdir  nfs_path  #创建共享目录chmod-R777 nfs_path  #修改共享目录权限vim/etc/exports nfs_path*(rw,sync,no_root_squash)客户机地址可以是主机名、IP地址、网......
  • JavaSpring AI与阿里云通义大模型的集成使用Java Data Science Library(JDSL)进行数据处
    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站学习总结1、掌握JAVA入门到进阶知识(持续写作中……)2、学会Oracle数据库入门到入土用法(创作中……)3、手把手教你开发炫酷的vbs脚本制作(完善中……)4、牛逼哄哄的IDEA......
  • 不使用docker push ,使用docker save 打包成tar文件,scp到服务器上,应对离线环境
    如果你不想使用dockerpush,而是希望将本地Docker镜像打包成tar文件,并通过scp将其传输到目标服务器上,你可以按照以下步骤操作:1.在本地打包Docker镜像为tar文件首先,在本地构建并打包你的Docker镜像。假设你的镜像名称为fastapi-app。构建Docker镜像:如果......
  • 离线安装Kubesphere
    1.环境要求【centos7.X】1.1依赖项要求master、node1节点安装yuminstall-ysocatconntrackebtablesipset1.2获取镜像列表访问https://get-images.kubesphere.io/选择需要部署的扩展组件。填入邮箱地址。点击获取镜像列表。查看填写的邮箱,获取KubeSphere最新的......
  • 两个定时任务的并发问题,导致数据处理的顺序和状态变得混乱
    1.背景:有两个定时任务在特定时间触发,同时对数据进行操作,且任务之间存在并发执行的场景。主要涉及的表为lingyejun_task,涉及到的操作有:数据插入、推送、状态更新和错误处理。定时任务A负责生成数据,定时任务B负责将生成好的数据处理并推送到第三方系统,由于出问题的时候定时任务A......
  • Java 大视界 -- Java 大数据测试框架与实践:确保数据处理质量(十二)
           ......