首页 > 其他分享 >全国职业技能大赛大数据应用开发赛题任务B详解

全国职业技能大赛大数据应用开发赛题任务B详解

时间:2024-09-17 17:21:21浏览次数:12  
标签:info 赛题 详解 user dwd time date 职业技能 id

1、写在前言

  • 关于本篇:博主在团队中主要负责数据抽取、数据清洗、数据代码部分的编写,也就是任务书中的任务B模块,因此本片博客内容主要也会集中在任务B题目上
  • 关于代码风格:任务B的内容基本都是和spark打交道,众所周知spark有两种编写风格sql风格、dsl风格。两种风格功能都是能实现的,博主这里无脑推荐dsl风格(比赛里idea用的是社区版没有数据库插件,sql没有补全,但是dsl的函数是有的),比赛题目都比较简单,因此速度上的越快越好。
  • 关于技巧:1.比赛开始就要将常用方法如读mysql写mysql写到工具类里,这样能节省很多时间,也能防止写错 2.比赛能idea本地跑的部分就不要打包到服务器运行,除了一些必须要服务器截图的题目,方便debug 3.题干关键字划出,尤其是子任务一,子任务一错后面全错。
  • 关于内容更新:博主已经是一个大三的专科仔,主要时间在写毕设和专升本,关于题目会一点点更新,另外因为当时主要练的是国赛任务书,一些地方任务书可能我也没做过就不会贴出来了。

2、常用代码封装

package object jhc {
    System.setProperty("HADOOP_USER_NAME","root")
    val odsPath = "hdfs://bigdata1:9000/user/hive/warehouse/ods_ds_hudi.db/"
    val dwdPath = "hdfs://bigdata1:9000/user/hive/warehouse/dwd_ds_hudi.db/"
    val dwsPath = "hdfs://bigdata1:9000/user/hive/warehouse/dws_ds_hudi.db/"
    val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store"

    //创建spark 配置
    private val conf = new SparkConf()
      .set("spark.driver.host","localhost")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .setMaster("local[*]")
      .setAppName("app")

    //创建spark对象
    val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    //mysql 连接信息
    private val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")
    properties.setProperty("driver","com.mysql.jdbc.Driver")

    /**
     * 从mysql加载数据
     * @param table
     * @return
     */
    def loadToMysql(table:String): DataFrame = spark.read.jdbc(jdbcUrl,table,properties)



    /**
     * 保存到mysql
     * @param table
     * @param dataFrame
     */
    def saveToMysql(table: String, dataFrame: DataFrame): Unit = dataFrame.write.jdbc(jdbcUrl, table, properties)


    /**
     * 保存到hive
     * @param table
     * @param dataFrame
     * @param partitionCol
     * @param saveMode
     */
    def saveToHive(
                  table:String,
                  partition:String,
                  dataFrame: DataFrame,
                  saveMode: SaveMode = SaveMode.Append
                  ): Unit = {
        dataFrame.write
          .format("Hive")
          .mode(saveMode)
          .partitionBy(partition.split(','):_*)
          .saveAsTable(table)
    }

    /**
     * 保存到hudi
     * @param outPutPath
     * @param table
     * @param recordkeyField
     * @param precombineField
     * @param partitionFields
     * @param dataFrame
     */
    def saveToHudi(outPutPath:String,
                   table:String,
                   recordkeyField:String,
                   precombineField:String,
                   partitionFields:String,
                   dataFrame: DataFrame): Unit = {
        dataFrame.write
          .format("hudi")
          .options(QuickstartUtils.getQuickstartWriteConfigs)
          //表名
          .option(TBL_NAME.key(),table)
          //主键
          .option(RECORDKEY_FIELD.key(),recordkeyField)
          //分区字段
          .option(PRECOMBINE_FIELD.key(),precombineField)
          //分区字段
          .option(PARTITIONPATH_FIELD.key(),partitionFields)
          .mode(SaveMode.Append)
          .save(s"$outPutPath/$table")
    }
}

3、大数据应用开发赛题第01套

3.1子任务一

题目内容

      编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。

  1. 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  2. 抽取shtd_store库中sku_info的增量数据进入Hive的ods库中表sku_info。根据ods.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.sku_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  3. 抽取shtd_store库中base_province的增量数据进入Hive的ods库中表base_province。根据ods.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_province命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  4. 抽取shtd_store库中base_region的增量数据进入Hive的ods库中表base_region。根据ods.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_region命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  5. 抽取shtd_store库中order_info的增量数据进入Hive的ods库中表order_info,根据ods.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  6. 抽取shtd_store库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_detail命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下。
代码
import org.apache.spark.sql.functions._
import spark.implicits._
object Task1 {

    def main(args: Array[String]): Unit = {
        byOperateAndCreate("user_info")
        byCreate("sku_info")
        byId("base_province")
        byId("base_region")
        byOperateAndCreate("order_info")
        byCreate("order_detail")
    }

    def byOperateAndCreate(table:String): Unit = {
        val odsName = s"ods3.$table"

        val max_time = spark.table(odsName)
          .select(greatest(max("create_time"), max("operate_time")))
          .first()
          .get(0)

        val mysqlDF = if (max_time == null){
            loadMysql(table)
        }else {
            loadMysql(table)
              .where($"create_time" > max_time or $"operate_time" > max_time)
        }


        val result = mysqlDF
          .withColumn("etl_date", lit(date))

        saveToHive(odsName, "etl_date", result)
    }


    def byCreate(table:String): Unit = {
        val odsName = s"ods3.$table"

        val max_time = spark.table(odsName)
          .select(max("create_time"))
          .first()
          .get(0)

        val mysqlDF = if (max_time == null){
            loadMysql(table)
        }else {
            loadMysql(table)
              .where($"create_time" > max_time)
        }


        val result = mysqlDF
          .withColumn("etl_date", lit(date))

        saveToHive(odsName, "etl_date", result)
    }

    def byId(table:String): Unit = {
        val odsName = s"ods3.$table"

        val max_id = spark.table(odsName)
          .select(max("id"))
          .first()
          .get(0)

        val mysqlDF = if (max_id == null){
            loadMysql(table)
        }else {
            loadMysql(table)
              .where($"id" > max_id)
        }


        val result = mysqlDF
          .withColumn("etl_date", lit(date))
          .withColumn("create_time",current_timestamp())

        saveToHive(odsName, "etl_date", result)
    }
}

3.2子任务二

        编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。(若dwd库中部分表没有数据,正常抽取即可)

题目内容

        编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。

  1. 抽取ods库中user_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  2. 抽取ods库sku_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_sku_info最新分区现有的数据,根据id合并数据到dwd库中dim_sku_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区的数据,id大于等于15且小于等于20,并且按照id升序排序,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  3. 抽取ods库base_province表中昨天的分区(子任务一生成的分区)数据,并结合dim_province最新分区现有的数据,根据id合并数据到dwd库中dim_province的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_province最新分区中,查询该分区中数据的条数,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  4. 抽取ods库base_region表中昨天的分区(子任务一生成的分区)数据,并结合dim_region最新分区现有的数据,根据id合并数据到dwd库中dim_region的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_region最新分区中,查询该分区中数据的条数,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  5. 将ods库中order_info表昨天的分区(子任务一生成的分区)数据抽取到dwd库中fact_order_info的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
  6. 将ods库中order_detail表昨天的分区(子任务一中生成的分区)数据抽取到dwd库中fact_order_detail的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_detail命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下。
代码
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
object Task2 {

    def main(args: Array[String]): Unit = {
        task1("ods3.user_info","dwd3.dim_user_info")
        task2("ods3.sku_info","dwd3.dim_sku_info")
        task2("ods3.base_province","dwd3.dim_province")
        task2("ods3.base_region","dwd3.dim_region")
        task3("ods3.order_info","dwd3.fact_order_info")
        task4("ods3.order_detail","dwd3.fact_order_detail")
    }

    def task1(odsTable:String,dwdTable:String): Unit = {
        //抽取dwd最新
        val dwdDF = spark.sql(
            s"""
               |select * from $dwdTable
               |where etl_date =(select max(etl_date) from $dwdTable)
               |""".stripMargin)

        val odsDF = spark.table(s"$odsTable")
          //抽取ods昨天
          .where($"etl_date" === date)
          //空值填充
          .withColumn("operate_time",coalesce($"operate_time",$"create_time"))
          .withColumn("dwd_insert_user", lit("user1"))
          .withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
          .withColumn("dwd_modify_user", lit("user1"))
          .withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

        val window1 = Window.partitionBy("id")
        //按照operate_time 取最新
        val window2 = Window.partitionBy("id").orderBy($"operate_time".desc)

        val result = dwdDF.unionByName(odsDF)
          .withColumn("dwd_insert_time", min("dwd_insert_time").over(window1))
          .withColumn("dwd_modify_time", max("dwd_modify_time").over(window1))
          .withColumn("row_num", row_number().over(window2))
          //按照operate_time 取最新
          .where($"row_num" === 1)
          .drop("row_num")
          //分区字段和ods一致
          .withColumn("etl_date",lit(date))

        saveToHive(s"$dwdTable","etl_date",result)
    }

    def task2(odsTable:String,dwdTable:String): Unit = {
        //抽取dwd最新
        val dwdDF = spark.sql(
            s"""
               |select * from $dwdTable
               |where etl_date =(select max(etl_date) from $dwdTable)
               |""".stripMargin)

        val odsDF = spark.table(s"$odsTable")
          //抽取ods昨天
          .where($"etl_date" === date)
          .withColumn("dwd_insert_user", lit("user1"))
          .withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
          .withColumn("dwd_modify_user", lit("user1"))
          .withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

        val window1 = Window.partitionBy("id")
        //按照create_time 取最新
        val window2 = Window.partitionBy("id").orderBy($"create_time".desc)

        val result = dwdDF.unionByName(odsDF)
          .withColumn("dwd_insert_time", min("dwd_insert_time").over(window1))
          .withColumn("dwd_modify_time", max("dwd_modify_time").over(window1))
          .withColumn("row_num", row_number().over(window2))
          //按照create_time 取最新
          .where($"row_num" === 1)
          .drop("row_num")
          //分区字段和ods一致
          .withColumn("etl_date",lit(date))

        saveToHive(s"$dwdTable","etl_date",result)
    }



    def task3(odsTable:String,dwdTable:String): Unit = {
        val odsDF = spark.table(s"$odsTable")
          //抽取ods昨天
          .where($"etl_date" === date)
          //分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd
          .withColumn("etl_date",date_format($"create_time","yyyyMMdd"))
          //空值填充
          .withColumn("operate_time",coalesce($"operate_time",$"create_time"))
          .withColumn("dwd_insert_user", lit("user1"))
          .withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
          .withColumn("dwd_modify_user", lit("user1"))
          .withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))


        saveToHive(s"$dwdTable","etl_date",odsDF)
    }

    def task4(odsTable:String,dwdTable:String): Unit = {
        val odsDF = spark.table(s"$odsTable")
          //抽取ods昨天
          .where($"etl_date" === date)
          //分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd
          .withColumn("etl_date",date_format($"create_time","yyyyMMdd"))
          .withColumn("dwd_insert_user", lit("user1"))
          .withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
          .withColumn("dwd_modify_user", lit("user1"))
          .withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))


        saveToHive(s"$dwdTable","etl_date",odsDF)
    }

}

3.3子任务三

编写Scala代码,使用Spark计算相关指标。

注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。

题目一内容

       本任务基于以下2、3、4小题完成,使用Azkaban完成第2、3、4题任务代码的调度。工作流要求,使用shell输出“开始”作为工作流的第一个job(job1),2、3、4题任务为串行任务且它们依赖job1的完成(命名为job2、job3、job4),job2、job3、job4完成之后使用shell输出“结束”作为工作流的最后一个job(endjob),endjob依赖job2、job3、job4,并将最终任务调度完成后的工作流截图,将截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

题目一代码

按照要求创建5个.job文件即可,这里只给出end job的示例

type=command
command=echo 'end job'
dependencies=job2,job3,job4
题目二内容

        根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

regionid

int

地区表主键

regionname

text

地区名称

totalconsumption

double

订单总金额

当月订单总金额

totalorder

int

订单总数

当月订单总数

year

int

订单产生的年

month

int

订单产生的月

题目二代码
select * from shtd_result.provinceeverymonth order by totalorder desc ,totalconsumption desc,provinceid desc limit 5;
import spark.implicits._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType}
import org.apache.spark.storage.StorageLevel


object ThreeTask1 {
    val date = "20240324"

    def main(args: Array[String]): Unit = {
        //order_info 事实表所有数据
        val fact_order_info = spark.table("dwd3.fact_order_info")
          .select(
              $"final_total_amount",
              year($"operate_time").as("year"),
              month($"operate_time").as("month"),
              $"province_id"
          )

        //省份表维表最新分区数据
        val dim_province = spark.table("dwd3.dim_province")
          .where($"etl_date" === date)
          .select(
              $"id".as("province_id"),
              $"name".as("province_name"),
              $"region_id"
          )

        //地区表维表最新分区数据
        val dim_region = spark.table("dwd3.dim_region")
          .where($"etl_date" === date)
          .select(
              $"id".as("region_id"),
              $"region_name"
          )

        //连接
        val join = fact_order_info
          .join(broadcast(dim_province), "province_id")
          .join(broadcast(dim_region), "region_id")
          .persist(StorageLevel.MEMORY_ONLY)

        //每省每个月
        val result = join
          .groupBy("province_id", "province_name", "region_id", "region_name", "year", "month")
          .agg(
              //总销量
              count("final_total_amount").as("totalorder"),
              //总销售额
              sum("final_total_amount").as("totalconsumption"),
          ).select(
              $"province_id".as("provinceid"),
              $"province_name".as("provincename"),
              $"region_id".as("regionid"),
              $"region_name".as("regionname"),
              $"totalconsumption".cast(DoubleType),
              $"totalorder".cast(IntegerType),
              $"year",
              $"month"
          )

        result
          .orderBy($"totalorder".desc)
          .show(false)

        //保存到mysql
        saveToMysql("provinceeverymonth",result)
    }
}
题目三内容

       请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表(表结构如下)中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

字段

类型

中文含义

备注

provinceid

int

省份表主键

provincename

text

省份名称

provinceavgconsumption

double

该省平均订单金额

allprovinceavgconsumption

double

所有省平均订单金额

comparison

text

比较结果

该省平均订单金额和所有省平均订单金额比较结果,值为:高/低/相同

题目三代码
select * from shtd_result.provinceavgcmp order by provinceid desc,provinceavgconsumption desc limit 5;
import spark.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{when, _}
import org.apache.spark.sql.types.{DoubleType, IntegerType}


object ThreeTask2 {
    def main(args: Array[String]): Unit = {
        val fact_order_info = spark.table("dwd3.fact_order_info")
          .where(year($"operate_time") === 2020 and month($"operate_time") === 4)
          .select(
              $"final_total_amount",
              $"province_id"
          )

        val dim_province = spark.table("dwd3.dim_province")
          .where($"etl_date" === date)
          .select(
              $"id".as("province_id"),
              $"name".as("province_name"),
              $"region_id"
          )

        val dim_region = spark.table("dwd3.dim_region")
          .where($"etl_date" === "20240324")
          .select(
              $"id".as("region_id"),
              $"region_name"
          )

        val join = fact_order_info
          .join(broadcast(dim_province), "province_id")
          .join(broadcast(dim_region), "region_id")

        //每个省平均销售额
        val provinceAvg = join.groupBy("province_id", "province_name","region_id")
          .agg(
              avg("final_total_amount").as("provinceavgconsumption")
          )

        //所有省的平均销售额
        val regionAvg = join
          .agg(
              avg("final_total_amount").as("allprovinceavgconsumption")
          )

        //用所有省 和 每个省的表做join
        val result = provinceAvg.crossJoin(broadcast(regionAvg))
          .withColumn("comparison",
              when($"provinceavgconsumption" > $"allprovinceavgconsumption", "高")
                .when($"provinceavgconsumption" === $"allprovinceavgconsumption", "相同")
                .when($"provinceavgconsumption" < $"allprovinceavgconsumption", "低")
          )
          .select(
              $"province_id".as("provinceid"),
              $"province_name".as("provincename"),
              $"provinceavgconsumption".cast(DoubleType),
              $"allprovinceavgconsumption".cast(DoubleType),
              $"comparison"
          )

        result.show()
        //保存到mysql
        saveToMysql("provinceavgcmp",result)
    }
}
题目四内容

       根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表(表结构如下)中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

字段

类型

中文含义

备注

userid

int

客户主键

username

text

客户名称

day

text

记录下单日的时间,格式为

yyyyMMdd_yyyyMMdd

例如: 20220101_20220102

totalconsumption

double

订单总金额

连续两天的订单总金额

totalorder

int

订单总数

连续两天的订单总数

题目四代码
select * from shtd_result.usercontinueorder order by totalorder desc ,totalconsumption desc,userid desc limit 5;
import spark.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType}


object ThreeTask3 {
    def main(args: Array[String]): Unit = {
        val fact_order_info = spark.table("dwd3.fact_order_info")
          .select(
              $"final_total_amount",
              $"user_id",
              to_date($"operate_time").as("order_day")
          )

        val dwd_user_info = spark.table("dwd3.dim_user_info")
          .where($"etl_date" === date)
          .select($"id".as("user_id"),$"name".as("user_name"))

        val window1 = Window.partitionBy("user_id").orderBy("order_day")

        val result = fact_order_info
          .groupBy("user_id", "order_day")
          .agg(
              //每人每天下单总额
              sum("final_total_amount").as("total_amount"),
              //每人每天下单总量
              count("final_total_amount").as("total_count")
          )
          //获取上条数据的日期
          .withColumn("last_day", lag("order_day", 1).over(window1))
          //获取上条数据的总额
          .withColumn("last_day_total_amount", lag("total_amount", 1).over(window1))
          //获取上条数据的总量
          .withColumn("last_day_total_count", lag("total_count", 1).over(window1))
          //过滤出上条数据是当前的上一天,且当前总金额比上一天大的
          .where(datediff($"order_day", $"last_day") === 1 and ($"total_amount" - $"last_day_total_amount" > 0))
          //计算这两天的总额
          .withColumn("totalconsumption", $"total_amount" + $"last_day_total_amount")
          //计算这两天的总数
          .withColumn("totalorder", $"total_count" + $"last_day_total_count")
          //连接日期
          .withColumn("day", concat_ws("_", date_format($"last_day","yyyyMMdd"), date_format($"order_day","yyyyMMdd")))
          //从user表中拿name
          .join(dwd_user_info, "user_id")
          .select(
              $"user_id".as("userid"),
              $"user_name".as("username"),
              $"day",
              $"totalconsumption".cast(DoubleType),
              $"totalorder".cast(IntegerType)
          )


        result.show()
        //保存到mysql

        saveToMysql("usercontinueorder",result)
    }
}

标签:info,赛题,详解,user,dwd,time,date,职业技能,id
From: https://blog.csdn.net/m0_73968016/article/details/142315654

相关文章

  • python标准库模块 pickle 详解
    什么是pickle模块?pickle是Python的一个标准库,用于序列化和反序列化Python对象。所谓序列化,就是将一个Python对象转换成字节流,以便存储到磁盘或通过网络传输。反序列化则是将字节流恢复为原来的Python对象。为什么使用pickle?保存Python对象:可以将Python中的各......
  • Transformer详解
    1Transformer结构https://jalammar.github.io/illustrated-transformer/Transformer一个巨大的优点是:模型在处理序列输入时,可以对整个序列输入进行并行计算,不需要按照时间步循环递归处理输入序列。1.1Transformer宏观结构Transformer可以看作是seq2seq模型的一种,对比之前的RNN,......
  • C++内存管理详解:各类变量的存储区域
      在C++中,变量的存储位置取决于它们的类型和生命周期。那么不同的各个变量究竟存储在哪个区域呢?1.不同类型的变量我们首先从变量类型的不同来说明:1.全局变量和静态变量 -存储区:全局/静态区(静态区)-说明:全局变量(包括文件级和函数级的)和使用`static`关键字声明的变......
  • 二分详解——学习笔记
    首先,使用二分有几个前提:具有单调性要求“最小的最大”或“最大的最小”其次,还要分清楚二分查找与二分答案的区别:二分查找:在某区间使用二分的思想进行查找二分答案:在答案的区间中使用二分的思想并判断从而找到最优解同时还要处理好二分的边界。接下来来理解一下......
  • 跟着问题学10——RNN详解及代码实战
    1循环神经网络RecurrentNeuralNetwork什么是序列信息呢?通俗理解就是一段连续的信息,前后信息之间是有关系地,必须将不同时刻的信息放在一起理解。比如一句话,虽然可以拆分成多个词语,但是需要将这些词语连起来理解才能得到一句话的意思。RNN就是用来处理这些序列信息的任务......
  • 【MySQL】MySQL中JDBC编程——MySQL驱动包安装——(超详解)
    前言:......
  • 指针详解(中秋版)
       久违的键盘声,熟悉的思绪,仿佛时间在这一刻凝固。距离我上一次敲击键盘写下文字,已不知过了多少个日夜。但文字的魅力就在于,它总能跨越时间的长河,将我们的心灵再次相连。今天,我带着满心的感慨与新的故事,重新坐到了屏幕前。让我们一起,再次启程,探索文字的奥秘。(一)理解......
  • MyBatis 详解
    目录目录一、MyBatis是什么二、为什么使用MyBatis(一)灵活性高(二)性能优化(三)易于维护三、怎么用MyBatis(一)添加依赖(二)配置MyBatis(三)创建实体类和接口(四)使用MyBatis一、MyBatis是什么MyBatis是一个优秀的持久层框架,它支持自定义SQL、存储过程以及高级映射。......
  • Go 语言中的空白标识符(_)用法详解
    在Go语言中,空白标识符 _ 有着独特的用途,特别是在循环以及其他一些场景中。本文将详细介绍空白标识符在Go语言中的用法,重点围绕其在循环中的应用以及其他常见场景。 一、空白标识符在循环中的应用 (一)忽略索引值 在遍历切片或数组时,如果我们只关心元素本身而不关......
  • DFT理论知识 scan insertion详解
    ###DFT理论知识:SCANInsertion详解####一、SCANInsertion概述**1.定义**SCANInsertion是设计可测试性(DesignForTestability,DFT)中的一种技术,通过在芯片设计中插入扫描链(ScanChain),使得原本难以测试的组合逻辑电路变得可测试。扫描链通过将触发器(Flip-Flop)转换为......