Dynamic Transpose是Spark中的一个关键转换,因为它需要大量的迭代。本文将为您提供有关如何使用内存中运算符处理此复杂方案的清晰概念。
首先,让我们看看我们拥有的源数据:
idoc_number,订单ID,idoc_qualifier_org,idoc_org
7738,2364,6,0
7738,2364,7,0
7738,2364,8,mystr1
7738,2364,12,mystr2
7739,2365,12,mystr3
7739,2365,7,mystr4
我们还有idoc_qualifier_org
源数据记录中列的查找表 。由于查找表的大小会更小,我们可以预期它会在缓存中和驱动程序内存中。
预选赛,降序
6,司
7,分销渠道
8,销售组织
12,订单类型
Dynamic Transpose操作的预期输出是:
idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type
7738,2364,0,0,mystr1,mystr2
7739,2365,空,mystr3,空,mystr4
以下代码实际上将根据数据中的当前列转置数据。此代码是使用Spark中的Transpose Data的另一种方法。此代码严格使用Spark的复杂数据类型,并且还负责迭代的性能。
对象 DynamicTranspose {
def dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String = {
尝试 {
val rule_array = 规则。拆分(“#!”)。toList
val src_map = map_val。toList。压扁。toMap
var output_str = “”
rule_array。foreach(f =>
output_str = output_str + “!” + src_map。getOrElse(f,“#”)
)
return output_str。掉落(1)
} catch {
案例 t:
Throwable => t。printStackTrace()。toString()
返回 “0”。toString()
}
}
def main(args:Array [ String ]):Unit = {
val spark = SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()
val data_df = spark。读。选项(“标题”,“真”)。csv(“<data path src>”)
val lkp_df = spark。读。选项(“标题”,“真”)。csv(“查找路径源>”)
进口 火花。暗示。_
进口 组织。阿帕奇。火花。sql。功能。广播
val lkp_df_brdcast = broadcast(lkp_df)
val result_df = data_df。加入(广播(lkp_df_brdcast),$ “idoc_qualifier_org” === $ “限定符”,“内部”)
val df1 = result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($ “desc”,$ “idoc_org”))as “map”)
进口 组织。阿帕奇。火花。sql。功能。UDF
进口 组织。阿帕奇。火花。sql。功能。{
点燃,
最大,
ROW_NUMBER
}
进口 火花。暗示。_
进口 组织。阿帕奇。火花。sql。行
val map_val = lkp_df。rdd。地图(行 => 行。的getString(1))。收集()。mkString(“#!”)
火花。sparkContext。广播(map_val)
VAL recdValidator = UDF(dataValidator _)
var latest_df = df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地图”)
val columns = map_val。拆分(“#!”)。toList
latest_df = 列。zipWithIndex。foldLeft(latest_df){
(memodDF,专栏)=> {
memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))
}
}
。drop(“explode_out”)
latest_df。show()
}
}
希望这可以帮助!
标签:map,val,idoc,转置,df,火花,Spark,动态数据,lkp From: https://blog.51cto.com/u_16145034/6442600