今日完成了商单案例:
源码:
# coding:utf8标签:province,count,df,31,top3,笔记,receivable,进度,storeProvince From: https://www.cnblogs.com/yuncannotjava/p/17999843
from pyspark import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
if __name__ == '__main__':
spark = SparkSession.builder.appName("test").master("local[*]").\
config("spark.sql.warehouse.dir", "hdfs://node1:8020/user/hive/warehouse").\
config("hive.metastore.uris", "thrift://node1:9083").\
getOrCreate()
sc = spark.sparkContext
df = spark.read.format("json"). \
load("../../data/mini.json").\
dropna(thresh=1, subset=['storeProvince']).\
filter("storeProvince != 'null'").\
filter("receivable < 10000").\
select("storeProvince", "storeID", "receivable", "dateTS", "payType")
# TODO: 需求1:各省销售额的统计
province_sale_df = df.groupBy("storeProvince").\
agg(
F.round(F.sum(df['receivable']),2).alias("receivable")
).\
orderBy("receivable", ascending=False)
province_sale_df.show(truncate=False)
# province_sale_df.write.mode("overwrite"). \
# format("jdbc"). \
# option("url", "jdbc:mysql://node1:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8").\
# option("dbtable", "province_sale").\
# option("user", "root").\
# option("password", "1234").\
# option("encoding", "utf8").\
# save()
#
# province_sale_df.write.mode("overwrite").saveAsTable("default.province_sale","parquet")
# TODO: 需求2:T0P3销售省份中,有多少店铺达到过日销售额1000+
top3_province_df = province_sale_df.limit(3).select("storeProvince").withColumnRenamed("storeProvince","top_store")
df_top3_province_joined = df.join(top3_province_df, on= df['storeProvince'] == top3_province_df['top_store'])
df_top3_province_joined.show()
df_top3_province_joined.persist(StorageLevel.MEMORY_AND_DISK)
province_hot_store_count_df = df_top3_province_joined.\
groupby(df['storeProvince'], df['storeID'],
F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-MM-dd").alias("day")). \
sum("receivable").withColumnRenamed("sum(receivable)", "money").\
filter("money > 1000").\
groupby("storeProvince").\
count().\
orderBy("count", ascending=False).show()
# TODO: 需求3: T0P3省份中,各省的平均单单价
df_top3_province_joined.groupby("storeProvince").\
avg("receivable").\
withColumnRenamed("avg(receivable)", "avg_money"). \
withColumn("avg_money", F.round("avg_money", 2)). \
orderBy("avg_money", ascending=False).\
show()
# TODO: 需求4: T0P3省份中,各个省份的支付类型比例
def udf_func(percent):
return str(round(percent * 100, 2)) + "%"
my_udf = F.udf(udf_func, StringType())
top3_province_payType_count = df_top3_province_joined.groupby("storeProvince").count().\
withColumnRenamed("storeProvince", "storeProvince_1").\
withColumnRenamed("count", "count_1")
df4 = df_top3_province_joined.groupby("storeProvince", "payType").\
count().\
join(top3_province_payType_count, on=top3_province_payType_count['storeProvince_1'] == df_top3_province_joined['storeProvince'])
df4.withColumn("percent", df4['count'] / df4['count_1']).\
withColumn("percent", my_udf("percent")).\
select("storeProvince", "payType","percent").\
show()
"""
storeProvince 店铺所在省份
storeID 店铺ID
dateTS 订单日期
receivable 收款金额
payType 付款类型
"""