1、写在前言
- 关于本篇:博主在团队中主要负责数据抽取、数据清洗、数据代码部分的编写,也就是任务书中的任务B模块,因此本片博客内容主要也会集中在任务B题目上
- 关于代码风格:任务B的内容基本都是和spark打交道,众所周知spark有两种编写风格sql风格、dsl风格。两种风格功能都是能实现的,博主这里无脑推荐dsl风格(比赛里idea用的是社区版没有数据库插件,sql没有补全,但是dsl的函数是有的),比赛题目都比较简单,因此速度上的越快越好。
- 关于技巧:1.比赛开始就要将常用方法如读mysql写mysql写到工具类里,这样能节省很多时间,也能防止写错 2.比赛能idea本地跑的部分就不要打包到服务器运行,除了一些必须要服务器截图的题目,方便debug 3.题干关键字划出,尤其是子任务一,子任务一错后面全错。
- 关于内容更新:博主已经是一个大三的专科仔,主要时间在写毕设和专升本,关于题目会一点点更新,另外因为当时主要练的是国赛任务书,一些地方任务书可能我也没做过就不会贴出来了。
2、常用代码封装
package object jhc {
System.setProperty("HADOOP_USER_NAME","root")
val odsPath = "hdfs://bigdata1:9000/user/hive/warehouse/ods_ds_hudi.db/"
val dwdPath = "hdfs://bigdata1:9000/user/hive/warehouse/dwd_ds_hudi.db/"
val dwsPath = "hdfs://bigdata1:9000/user/hive/warehouse/dws_ds_hudi.db/"
val jdbcUrl = "jdbc:mysql://localhost:3306/shtd_store"
//创建spark 配置
private val conf = new SparkConf()
.set("spark.driver.host","localhost")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.setMaster("local[*]")
.setAppName("app")
//创建spark对象
val spark = SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//mysql 连接信息
private val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")
properties.setProperty("driver","com.mysql.jdbc.Driver")
/**
* 从mysql加载数据
* @param table
* @return
*/
def loadToMysql(table:String): DataFrame = spark.read.jdbc(jdbcUrl,table,properties)
/**
* 保存到mysql
* @param table
* @param dataFrame
*/
def saveToMysql(table: String, dataFrame: DataFrame): Unit = dataFrame.write.jdbc(jdbcUrl, table, properties)
/**
* 保存到hive
* @param table
* @param dataFrame
* @param partitionCol
* @param saveMode
*/
def saveToHive(
table:String,
partition:String,
dataFrame: DataFrame,
saveMode: SaveMode = SaveMode.Append
): Unit = {
dataFrame.write
.format("Hive")
.mode(saveMode)
.partitionBy(partition.split(','):_*)
.saveAsTable(table)
}
/**
* 保存到hudi
* @param outPutPath
* @param table
* @param recordkeyField
* @param precombineField
* @param partitionFields
* @param dataFrame
*/
def saveToHudi(outPutPath:String,
table:String,
recordkeyField:String,
precombineField:String,
partitionFields:String,
dataFrame: DataFrame): Unit = {
dataFrame.write
.format("hudi")
.options(QuickstartUtils.getQuickstartWriteConfigs)
//表名
.option(TBL_NAME.key(),table)
//主键
.option(RECORDKEY_FIELD.key(),recordkeyField)
//分区字段
.option(PRECOMBINE_FIELD.key(),precombineField)
//分区字段
.option(PARTITIONPATH_FIELD.key(),partitionFields)
.mode(SaveMode.Append)
.save(s"$outPutPath/$table")
}
}
3、大数据应用开发赛题第01套
3.1子任务一
题目内容
编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。
- 抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取shtd_store库中sku_info的增量数据进入Hive的ods库中表sku_info。根据ods.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.sku_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取shtd_store库中base_province的增量数据进入Hive的ods库中表base_province。根据ods.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_province命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取shtd_store库中base_region的增量数据进入Hive的ods库中表base_region。根据ods.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_region命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取shtd_store库中order_info的增量数据进入Hive的ods库中表order_info,根据ods.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取shtd_store库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_detail命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下。
代码
import org.apache.spark.sql.functions._
import spark.implicits._
object Task1 {
def main(args: Array[String]): Unit = {
byOperateAndCreate("user_info")
byCreate("sku_info")
byId("base_province")
byId("base_region")
byOperateAndCreate("order_info")
byCreate("order_detail")
}
def byOperateAndCreate(table:String): Unit = {
val odsName = s"ods3.$table"
val max_time = spark.table(odsName)
.select(greatest(max("create_time"), max("operate_time")))
.first()
.get(0)
val mysqlDF = if (max_time == null){
loadMysql(table)
}else {
loadMysql(table)
.where($"create_time" > max_time or $"operate_time" > max_time)
}
val result = mysqlDF
.withColumn("etl_date", lit(date))
saveToHive(odsName, "etl_date", result)
}
def byCreate(table:String): Unit = {
val odsName = s"ods3.$table"
val max_time = spark.table(odsName)
.select(max("create_time"))
.first()
.get(0)
val mysqlDF = if (max_time == null){
loadMysql(table)
}else {
loadMysql(table)
.where($"create_time" > max_time)
}
val result = mysqlDF
.withColumn("etl_date", lit(date))
saveToHive(odsName, "etl_date", result)
}
def byId(table:String): Unit = {
val odsName = s"ods3.$table"
val max_id = spark.table(odsName)
.select(max("id"))
.first()
.get(0)
val mysqlDF = if (max_id == null){
loadMysql(table)
}else {
loadMysql(table)
.where($"id" > max_id)
}
val result = mysqlDF
.withColumn("etl_date", lit(date))
.withColumn("create_time",current_timestamp())
saveToHive(odsName, "etl_date", result)
}
}
3.2子任务二
编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。(若dwd库中部分表没有数据,正常抽取即可)
题目内容
编写Scala代码,使用Spark将ods库中相应表数据全量抽取到Hive的dwd库中对应表中。表中有涉及到timestamp类型的,均要求按照yyyy-MM-dd HH:mm:ss,不记录毫秒数,若原数据中只有年月日,则在时分秒的位置添加00:00:00,添加之后使其符合yyyy-MM-dd HH:mm:ss。
- 抽取ods库中user_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_user_info最新分区现有的数据,根据id合并数据到dwd库中dim_user_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据operate_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取ods库sku_info表中昨天的分区(子任务一生成的分区)数据,并结合dim_sku_info最新分区现有的数据,根据id合并数据到dwd库中dim_sku_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli查询表dim_sku_info的字段id、sku_desc、dwd_insert_user、dwd_modify_time、etl_date,条件为最新分区的数据,id大于等于15且小于等于20,并且按照id升序排序,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取ods库base_province表中昨天的分区(子任务一生成的分区)数据,并结合dim_province最新分区现有的数据,根据id合并数据到dwd库中dim_province的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_province最新分区中,查询该分区中数据的条数,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 抽取ods库base_region表中昨天的分区(子任务一生成的分区)数据,并结合dim_region最新分区现有的数据,根据id合并数据到dwd库中dim_region的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以id为合并字段,根据create_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条数据第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli在表dwd.dim_region最新分区中,查询该分区中数据的条数,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 将ods库中order_info表昨天的分区(子任务一生成的分区)数据抽取到dwd库中fact_order_info的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,同时若operate_time为空,则用create_time填充,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
- 将ods库中order_detail表昨天的分区(子任务一中生成的分区)数据抽取到dwd库中fact_order_detail的动态分区表,分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”,dwd_insert_time、dwd_modify_time均填写当前操作时间,并进行数据类型转换。使用hive cli执行show partitions dwd.fact_order_detail命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下。
代码
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
object Task2 {
def main(args: Array[String]): Unit = {
task1("ods3.user_info","dwd3.dim_user_info")
task2("ods3.sku_info","dwd3.dim_sku_info")
task2("ods3.base_province","dwd3.dim_province")
task2("ods3.base_region","dwd3.dim_region")
task3("ods3.order_info","dwd3.fact_order_info")
task4("ods3.order_detail","dwd3.fact_order_detail")
}
def task1(odsTable:String,dwdTable:String): Unit = {
//抽取dwd最新
val dwdDF = spark.sql(
s"""
|select * from $dwdTable
|where etl_date =(select max(etl_date) from $dwdTable)
|""".stripMargin)
val odsDF = spark.table(s"$odsTable")
//抽取ods昨天
.where($"etl_date" === date)
//空值填充
.withColumn("operate_time",coalesce($"operate_time",$"create_time"))
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
val window1 = Window.partitionBy("id")
//按照operate_time 取最新
val window2 = Window.partitionBy("id").orderBy($"operate_time".desc)
val result = dwdDF.unionByName(odsDF)
.withColumn("dwd_insert_time", min("dwd_insert_time").over(window1))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(window1))
.withColumn("row_num", row_number().over(window2))
//按照operate_time 取最新
.where($"row_num" === 1)
.drop("row_num")
//分区字段和ods一致
.withColumn("etl_date",lit(date))
saveToHive(s"$dwdTable","etl_date",result)
}
def task2(odsTable:String,dwdTable:String): Unit = {
//抽取dwd最新
val dwdDF = spark.sql(
s"""
|select * from $dwdTable
|where etl_date =(select max(etl_date) from $dwdTable)
|""".stripMargin)
val odsDF = spark.table(s"$odsTable")
//抽取ods昨天
.where($"etl_date" === date)
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
val window1 = Window.partitionBy("id")
//按照create_time 取最新
val window2 = Window.partitionBy("id").orderBy($"create_time".desc)
val result = dwdDF.unionByName(odsDF)
.withColumn("dwd_insert_time", min("dwd_insert_time").over(window1))
.withColumn("dwd_modify_time", max("dwd_modify_time").over(window1))
.withColumn("row_num", row_number().over(window2))
//按照create_time 取最新
.where($"row_num" === 1)
.drop("row_num")
//分区字段和ods一致
.withColumn("etl_date",lit(date))
saveToHive(s"$dwdTable","etl_date",result)
}
def task3(odsTable:String,dwdTable:String): Unit = {
val odsDF = spark.table(s"$odsTable")
//抽取ods昨天
.where($"etl_date" === date)
//分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd
.withColumn("etl_date",date_format($"create_time","yyyyMMdd"))
//空值填充
.withColumn("operate_time",coalesce($"operate_time",$"create_time"))
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
saveToHive(s"$dwdTable","etl_date",odsDF)
}
def task4(odsTable:String,dwdTable:String): Unit = {
val odsDF = spark.table(s"$odsTable")
//抽取ods昨天
.where($"etl_date" === date)
//分区字段为etl_date,类型为String,取create_time值并将格式转换为yyyyMMdd
.withColumn("etl_date",date_format($"create_time","yyyyMMdd"))
.withColumn("dwd_insert_user", lit("user1"))
.withColumn("dwd_insert_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
.withColumn("dwd_modify_user", lit("user1"))
.withColumn("dwd_modify_time", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))
saveToHive(s"$dwdTable","etl_date",odsDF)
}
}
3.3子任务三
编写Scala代码,使用Spark计算相关指标。
注:在指标计算中,不考虑订单信息表中order_status字段的值,将所有订单视为有效订单。计算订单金额或订单总金额时只使用final_total_amount字段。需注意dwd所有的维表取最新的分区。
题目一内容
本任务基于以下2、3、4小题完成,使用Azkaban完成第2、3、4题任务代码的调度。工作流要求,使用shell输出“开始”作为工作流的第一个job(job1),2、3、4题任务为串行任务且它们依赖job1的完成(命名为job2、job3、job4),job2、job3、job4完成之后使用shell输出“结束”作为工作流的最后一个job(endjob),endjob依赖job2、job3、job4,并将最终任务调度完成后的工作流截图,将截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
题目一代码
按照要求创建5个.job文件即可,这里只给出end job的示例
type=command
command=echo 'end job'
dependencies=job2,job3,job4
题目二内容
根据dwd层表统计每个省份、每个地区、每个月下单的数量和下单的总金额,存入MySQL数据库shtd_result的provinceeverymonth表中(表结构如下),然后在Linux的MySQL命令行中根据订单总数、订单总金额、省份表主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
字段 | 类型 | 中文含义 | 备注 |
provinceid | int | 省份表主键 | |
provincename | text | 省份名称 | |
regionid | int | 地区表主键 | |
regionname | text | 地区名称 | |
totalconsumption | double | 订单总金额 | 当月订单总金额 |
totalorder | int | 订单总数 | 当月订单总数 |
year | int | 年 | 订单产生的年 |
month | int | 月 | 订单产生的月 |
题目二代码
select * from shtd_result.provinceeverymonth order by totalorder desc ,totalconsumption desc,provinceid desc limit 5;
import spark.implicits._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType}
import org.apache.spark.storage.StorageLevel
object ThreeTask1 {
val date = "20240324"
def main(args: Array[String]): Unit = {
//order_info 事实表所有数据
val fact_order_info = spark.table("dwd3.fact_order_info")
.select(
$"final_total_amount",
year($"operate_time").as("year"),
month($"operate_time").as("month"),
$"province_id"
)
//省份表维表最新分区数据
val dim_province = spark.table("dwd3.dim_province")
.where($"etl_date" === date)
.select(
$"id".as("province_id"),
$"name".as("province_name"),
$"region_id"
)
//地区表维表最新分区数据
val dim_region = spark.table("dwd3.dim_region")
.where($"etl_date" === date)
.select(
$"id".as("region_id"),
$"region_name"
)
//连接
val join = fact_order_info
.join(broadcast(dim_province), "province_id")
.join(broadcast(dim_region), "region_id")
.persist(StorageLevel.MEMORY_ONLY)
//每省每个月
val result = join
.groupBy("province_id", "province_name", "region_id", "region_name", "year", "month")
.agg(
//总销量
count("final_total_amount").as("totalorder"),
//总销售额
sum("final_total_amount").as("totalconsumption"),
).select(
$"province_id".as("provinceid"),
$"province_name".as("provincename"),
$"region_id".as("regionid"),
$"region_name".as("regionname"),
$"totalconsumption".cast(DoubleType),
$"totalorder".cast(IntegerType),
$"year",
$"month"
)
result
.orderBy($"totalorder".desc)
.show(false)
//保存到mysql
saveToMysql("provinceeverymonth",result)
}
}
题目三内容
请根据dwd层表计算出2020年4月每个省份的平均订单金额和所有省份平均订单金额相比较结果(“高/低/相同”),存入MySQL数据库shtd_result的provinceavgcmp表(表结构如下)中,然后在Linux的MySQL命令行中根据省份表主键、该省平均订单金额均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
字段 | 类型 | 中文含义 | 备注 |
provinceid | int | 省份表主键 | |
provincename | text | 省份名称 | |
provinceavgconsumption | double | 该省平均订单金额 | |
allprovinceavgconsumption | double | 所有省平均订单金额 | |
comparison | text | 比较结果 | 该省平均订单金额和所有省平均订单金额比较结果,值为:高/低/相同 |
题目三代码
select * from shtd_result.provinceavgcmp order by provinceid desc,provinceavgconsumption desc limit 5;
import spark.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{when, _}
import org.apache.spark.sql.types.{DoubleType, IntegerType}
object ThreeTask2 {
def main(args: Array[String]): Unit = {
val fact_order_info = spark.table("dwd3.fact_order_info")
.where(year($"operate_time") === 2020 and month($"operate_time") === 4)
.select(
$"final_total_amount",
$"province_id"
)
val dim_province = spark.table("dwd3.dim_province")
.where($"etl_date" === date)
.select(
$"id".as("province_id"),
$"name".as("province_name"),
$"region_id"
)
val dim_region = spark.table("dwd3.dim_region")
.where($"etl_date" === "20240324")
.select(
$"id".as("region_id"),
$"region_name"
)
val join = fact_order_info
.join(broadcast(dim_province), "province_id")
.join(broadcast(dim_region), "region_id")
//每个省平均销售额
val provinceAvg = join.groupBy("province_id", "province_name","region_id")
.agg(
avg("final_total_amount").as("provinceavgconsumption")
)
//所有省的平均销售额
val regionAvg = join
.agg(
avg("final_total_amount").as("allprovinceavgconsumption")
)
//用所有省 和 每个省的表做join
val result = provinceAvg.crossJoin(broadcast(regionAvg))
.withColumn("comparison",
when($"provinceavgconsumption" > $"allprovinceavgconsumption", "高")
.when($"provinceavgconsumption" === $"allprovinceavgconsumption", "相同")
.when($"provinceavgconsumption" < $"allprovinceavgconsumption", "低")
)
.select(
$"province_id".as("provinceid"),
$"province_name".as("provincename"),
$"provinceavgconsumption".cast(DoubleType),
$"allprovinceavgconsumption".cast(DoubleType),
$"comparison"
)
result.show()
//保存到mysql
saveToMysql("provinceavgcmp",result)
}
}
题目四内容
根据dwd层表统计在两天内连续下单并且下单金额保持增长的用户,存入MySQL数据库shtd_result的usercontinueorder表(表结构如下)中,然后在Linux的MySQL命令行中根据订单总数、订单总金额、客户主键均为降序排序,查询出前5条,将SQL语句复制粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下,将执行结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;
字段 | 类型 | 中文含义 | 备注 |
userid | int | 客户主键 | |
username | text | 客户名称 | |
day | text | 日 | 记录下单日的时间,格式为 yyyyMMdd_yyyyMMdd 例如: 20220101_20220102 |
totalconsumption | double | 订单总金额 | 连续两天的订单总金额 |
totalorder | int | 订单总数 | 连续两天的订单总数 |
题目四代码
select * from shtd_result.usercontinueorder order by totalorder desc ,totalconsumption desc,userid desc limit 5;
import spark.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType}
object ThreeTask3 {
def main(args: Array[String]): Unit = {
val fact_order_info = spark.table("dwd3.fact_order_info")
.select(
$"final_total_amount",
$"user_id",
to_date($"operate_time").as("order_day")
)
val dwd_user_info = spark.table("dwd3.dim_user_info")
.where($"etl_date" === date)
.select($"id".as("user_id"),$"name".as("user_name"))
val window1 = Window.partitionBy("user_id").orderBy("order_day")
val result = fact_order_info
.groupBy("user_id", "order_day")
.agg(
//每人每天下单总额
sum("final_total_amount").as("total_amount"),
//每人每天下单总量
count("final_total_amount").as("total_count")
)
//获取上条数据的日期
.withColumn("last_day", lag("order_day", 1).over(window1))
//获取上条数据的总额
.withColumn("last_day_total_amount", lag("total_amount", 1).over(window1))
//获取上条数据的总量
.withColumn("last_day_total_count", lag("total_count", 1).over(window1))
//过滤出上条数据是当前的上一天,且当前总金额比上一天大的
.where(datediff($"order_day", $"last_day") === 1 and ($"total_amount" - $"last_day_total_amount" > 0))
//计算这两天的总额
.withColumn("totalconsumption", $"total_amount" + $"last_day_total_amount")
//计算这两天的总数
.withColumn("totalorder", $"total_count" + $"last_day_total_count")
//连接日期
.withColumn("day", concat_ws("_", date_format($"last_day","yyyyMMdd"), date_format($"order_day","yyyyMMdd")))
//从user表中拿name
.join(dwd_user_info, "user_id")
.select(
$"user_id".as("userid"),
$"user_name".as("username"),
$"day",
$"totalconsumption".cast(DoubleType),
$"totalorder".cast(IntegerType)
)
result.show()
//保存到mysql
saveToMysql("usercontinueorder",result)
}
}
标签:info,赛题,详解,user,dwd,time,date,职业技能,id
From: https://blog.csdn.net/m0_73968016/article/details/142315654