数据抽取:
1、 抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.customer_inf命令;
2、抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.product_info命令;
3、抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_master命令;
4、抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_detail命令。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar
object prac1 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME","root")
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("task1")
val spark = new SparkSession.Builder().config(sparkConf)
.config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse")
.config("spark.sql.shuffle.partitions", "1500")
.config("dfs.replications", "3")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "3000")
.enableHiveSupport()
.getOrCreate()
val table=Array("customer_inf","product_info","order_master","order_detail")
table.foreach(x=>{
println(s"===================抽取数据:$x=====================")
val mysql = new Tools().loadMysql(spark, x).cache()
mysql.show()
println("====================获取增量字段=====================")
var incre: Any = null
val simple = new SimpleDateFormat("yyyyMMdd")
val calendar=Calendar.getInstance()
calendar.add(Calendar.DATE,-1)
val etl_date = simple.format(calendar.getTime)
val simple1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
if (x=="customer_inf"){
incre = spark.table(s"ods.customer_inf").select(max("modified_time")).collect()(0).get(0)
if (incre==null){
incre=Timestamp.valueOf(simple1.parse("0").getTime.toString)
}
}
else {
incre = spark.table(s"ods.$x").select(max("modified_time")).collect()(0).get(0)
if (incre==null){
incre=Timestamp.valueOf(simple1.parse("0").getTime.toString)
}
}
println(s"======================Final incre:$incre=======================")
mysql.where(col("modified_time")>incre)
.withColumn("etl_date",lit(etl_date).cast("string"))
// .show()
.write.format("hive").mode("append").partitionBy("etl_date").saveAsTable(s"ods.$x")
})
spark.close()
}
}
class Tools {
def loadMysql(spark:SparkSession,table:String): DataFrame ={
val prop=Map(
"url"->"jdbc:mysql://node1/ds_db01?useSSL=false",
"driver"->"com.mysql.jdbc.Driver",
"user"->"root",
"password"->"8888",
"dbtable"->table
)
spark.read.format("jdbc")
.options(prop)
.load()
}
}
数据清洗:
1、 抽取ods库中customer_inf表中昨天的分区(任务一生成的分区)数据,并结合dim_customer_inf最新分区现有的数据,根据customer_id合并数据到dwd库中dim_customer_inf的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以customer_id为合并字段,根据modified_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_customer_inf命令;
2、 抽取ods库中product_info表中昨天的分区(任务一生成的分区)数据,并结合dim_product_info最新分区现有的数据,根据product_id合并数据到dwd库中dim_product_info的分区表(合并是指对dwd层数据进行插入或修改,需修改的数据以product_id为合并字段,根据modified_time排序取最新的一条),分区字段为etl_date且值与ods库的相对应表该值相等,并添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time四列,其中dwd_insert_user、dwd_modify_user均填写“user1”。若该条记录第一次进入数仓dwd层则dwd_insert_time、dwd_modify_time均存当前操作时间,并进行数据类型转换。若该数据在进入dwd层时发生了合并修改,则dwd_insert_time时间不变,dwd_modify_time存当前操作时间,其余列存最新的值。使用hive cli执行show partitions dwd.dim_product_info命令;
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import java.text.SimpleDateFormat
import java.util.Calendar
object task2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = new Tools().createSpark()
val tables=Array("customer_inf","product_info")
val dwdtables=Array("dim_customer_inf","dim_product_info","fact_order_master","fact_order_detail")
tables.foreach(tbl=>{
merge(spark,tbl,dwdtables(0))
})
spark.close()
}
def merge(spark: SparkSession, table: String,dwdtable:String): Unit ={
val calendar=Calendar.getInstance()
val nowtime: String = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(calendar.getTime)
calendar.add(Calendar.DATE,-1)
val etl_date=new SimpleDateFormat("yyyyMMdd").format(calendar.getTime)
val odsFrame: DataFrame = spark.table(table)
.where("etl_date=20241217")
.drop("etl_date")
.withColumn("dwd_insert_user",lit("user1"))
.withColumn("dwd_insert_time",lit(nowtime).cast("timestamp"))
.withColumn("dwd_modify_user",lit("user1"))
.withColumn("dwd_modify_time",lit(nowtime).cast("timestamp"))
.cache()
val dwdFrame: DataFrame = spark.table(dwdtable).drop("etl_date").cache()
if (table=="customer_inf"){
val mergeFrame: Dataset[Row] = odsFrame.unionByName(dwdFrame)
val result: DataFrame = mergeFrame.withColumn("sequence", row_number().over(Window.partitionBy("customer_id").orderBy(desc("modified_time"))))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(Window.partitionBy("customer_id")))
.where("sequence = 1")
.drop("sequence")
.withColumn("etl_date",lit(etl_date))
result.show()
}
else {
val mergeFrame: Dataset[Row] = odsFrame.unionByName(dwdFrame)
val result: DataFrame = mergeFrame.withColumn("sequence", row_number().over(Window.partitionBy("product_id").orderBy(desc("modified_time"))))
.withColumn("dwd_insert_time", min("dwd_insert_time").over(Window.partitionBy("customer_id")))
.where("sequence = 1")
.drop("sequence")
.withColumn("etl_date",lit(etl_date))
result.show()
}
}
}
指标计算:
1、 编写Scala工程代码,根据dwd的订单表,求各省份下单时间为2022年的支付转化率,并将计算结果写入clickhouse的ds_result库的表。在Linux的clickhouse命令行中根据ranking字段查询出转化率前三的省份,将SQL语句与执行结果截图粘贴至客户端桌面【Release\模块D提交结果.docx】中对应的任务序号下;
支付转化率=完成订单数/总订单数*100%
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{desc, lit, row_number}
object selct {
def main(args: Array[String]): Unit = {
val spark: SparkSession = new Tools().createSpark()
//Task1
// 省份 2022 订单总数 已支付订单
// 订单总数
spark.sql(
"""
|select
| province,
| count(order_sn) as totalCount
|from(
| select
| order_sn,province
| from dwd.fact_order_master
| where order_status="已下单"
| and cast(substring(create_time,1,4) as string)="2022"
|)
|group by province
|""".stripMargin).createTempView("totalOrder")
// 已支付订单
spark.sql(
"""
|select
| province,
| count(order_sn) as finishCount
|from(
| select
| order_sn,province
| from dwd.fact_order_master
| where order_sn not in(
| select order_sn
| from dwd.fact_order_master
| where order_status="已退款"
| )
| and order_status="已下单"
| and cast(substring(create_time,1,4) as string)="2022"
|)
|group by province
|""".stripMargin)
.createTempView("finishOrder")
spark.table("finishOrder").show()
println("====================================")
spark.table("totalOrder").show()
val result = spark.sql(
"""
|select
| t.province as province,
| t.totalCount as totalCount,
| f.finishCount as finalCount,
| round(f.finishCount/t.totalCount,3) as ranking
|from totalOrder t join finishOrder f
|on t.province=f.province
|""".stripMargin)
val frame: DataFrame = result.withColumn("grp",lit("grp"))
.withColumn("ranking", row_number().over(Window.partitionBy("grp").orderBy(desc("ranking"))))
.drop("grp")
frame.show()
val prop=Map(
"url"->"jdbc:clickhouse://node1:8123/test1",
"driver"->"ru.yandex.clickhouse.ClickHouseDriver",
"user"->"default",
"password"->"8888",
"dbtable"->"paytran"
)
frame.show()
frame.write
.format("jdbc")
.mode("append")
.options(prop)
.save()
spark.close()
}
}
标签:val,离线,职业院校,dwd,time,数据处理,import,spark,order
From: https://blog.csdn.net/2302_80311632/article/details/144931037