首页 > 其他分享 >如何在Spark中使用动态数据转置

如何在Spark中使用动态数据转置

时间:2023-06-08 20:08:16浏览次数:61  
标签:map val idoc 转置 df 火花 Spark 动态数据 lkp


Dynamic Transpose是Spark中的一个关键转换,因为它需要大量的迭代。本文将为您提供有关如何使用内存中运算符处理此复杂方案的清晰概念。

首先,让我们看看我们拥有的源数据: 

idoc_number,订单ID,idoc_qualifier_org,idoc_org
7738,2364,6,0
7738,2364,7,0
7738,2364,8,mystr1
7738,2364,12,mystr2
7739,2365,12,mystr3
7739,2365,7,mystr4

我们还有idoc_qualifier_org 源数据记录中列的查找表  。由于查找表的大小会更小,我们可以预期它会在缓存中和驱动程序内存中。

预选赛,降序
6,司
7,分销渠道
8,销售组织
12,订单类型

Dynamic Transpose操作的预期输出是:

idoc_number,order_id,Division,Distribution Channel,Sales org,Order Type
7738,2364,0,0,mystr1,mystr2
7739,2365,空,mystr3,空,mystr4

以下代码实际上将根据数据中的当前列转置数据。此代码是使用Spark中的Transpose Data的另一种方法。此代码严格使用Spark的复杂数据类型,并且还负责迭代的性能。 

对象 DynamicTranspose {
def  dataValidator(map_val:Seq [ Map [ String,String ]],rule:String):String  = {
尝试 {
val  rule_array  =  规则。拆分(“#!”)。toList
val  src_map  =  map_val。toList。压扁。toMap
var  output_str  =  “”
rule_array。foreach(f  =>
output_str  =  output_str  +  “!”  +  src_map。getOrElse(f,“#”)


 


return  output_str。掉落(1)
} catch {
案例 t:
Throwable  =>  t。printStackTrace()。toString()
返回 “0”。toString()
}


 


}


 


def  main(args:Array [ String ]):Unit  = {


 


val  spark  =  SparkSession。builder()。主人(“本地[*]”)。config(“spark.sql.warehouse.dir”,“<src dir>”)。getOrCreate()
val  data_df  =  spark。读。选项(“标题”,“真”)。csv(“<data path src>”)
val  lkp_df  =  spark。读。选项(“标题”,“真”)。csv(“查找路径源>”)
进口 火花。暗示。_
进口 组织。阿帕奇。火花。sql。功能。广播


 


val  lkp_df_brdcast  =  broadcast(lkp_df)
val  result_df  =  data_df。加入(广播(lkp_df_brdcast),$  “idoc_qualifier_org”  ===  $  “限定符”,“内部”)


 


val  df1  =  result_df。groupBy(col(“idoc_number”),col(“orderid”))。agg(collect_list(map($  “desc”,$  “idoc_org”))as  “map”)
进口 组织。阿帕奇。火花。sql。功能。UDF
进口 组织。阿帕奇。火花。sql。功能。{
点燃,
最大,
ROW_NUMBER
}
进口 火花。暗示。_
进口 组织。阿帕奇。火花。sql。行
val  map_val  =  lkp_df。rdd。地图(行 =>  行。的getString(1))。收集()。mkString(“#!”)
火花。sparkContext。广播(map_val)
VAL  recdValidator  =  UDF(dataValidator  _)
var  latest_df  =  df1。withColumn(“explode_out”,split(recdValidator(df1(“map”),lit(map_val)),“!”))。掉落(“地图”)
val  columns  =  map_val。拆分(“#!”)。toList
latest_df  =  列。zipWithIndex。foldLeft(latest_df){
(memodDF,专栏)=> {
memodDF。withColumn(柱。_1,山口(“explode_out” )(柱。_2))
}
}
。drop(“explode_out”)
latest_df。show()
}


 


}

希望这可以帮助!

标签:map,val,idoc,转置,df,火花,Spark,动态数据,lkp
From: https://blog.51cto.com/u_16145034/6442600

相关文章

  • LA_矩阵运算的性质@方阵取行列式@取伴随@取逆@转置
    可逆矩阵@矩阵的逆......
  • SparkUI中的Peak Pool Memory Direct / Mapped (直接缓冲池和映射缓冲池)
      PeakPoolMemoryDirect/Mapped --直接缓冲池和映射缓冲池峰值内存##什么是直接缓冲池和映射缓冲池?在Java中,有两种类型的缓冲池:直接缓冲池和映射缓冲池。直接缓冲池1)从堆外内存分配,不受JVM管理2)占用内存较多3)相比从JVM复制数据到本地,性能更高 映射缓冲池1)将文......
  • spark on yarn 读取hdfs文件报错
    前提读取的文件已经put到hdfs上了,还是报错,仔细想想,为什么两个读取文件只报后面那个读取文件不存在呢?看代码,是读取的方式不同,前面一个是通过sparkcontext读取,后面是file,所以情况应该是只有通过spark生成的对象sc读取才可以,带着这个思路,修改代码,才运行成功。JavaRDD<String>linesR......
  • Spark基础
    Spark基础目录Spark基础1Spark简介1.1Spark介绍1.2Spark组件1.3MapReduce回顾1.4SparkVSMapReduce1.5Spark部署方式2Spark环境搭建3Spark核心3.1Spark框架3.2RDD的五大特征4Spark常用算子4.1代码结构4.2算子分类4.3算子讲解4.3.1基于value类型的Transformation......
  • docker安装spark
    curl-L"https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname-s)-$(uname-m)"-o/usr/local/bin/docker-composechmod+x/usr/local/bin/docker-composedocker-compose--versiondockerpullsingularities/sparkvidocke......
  • Apache Spark源码走读之1 -- Spark论文阅读笔记
    楔子源码阅读是一件非常容易的事,也是一件非常难的事。容易的是代码就在那里,一打开就可以看到。难的是要通过代码明白作者当初为什么要这样设计,设计之初要解决的主要问题是什么。在对Spark的源码进行具体的走读之前,如果想要快速对Spark的有一个整体性的认识,阅读MateiZaharia做的Spa......
  • 【HarmonyOS】【ArkTS】如何使用HTTP网络请求获取动态数据刷新UI界面
    【关键字】HttpRequest、ArkTS、网络数据请求、@ohos.net.http【前言】在使用ArkTS开发HarmonyOS应用时,需要调用HTTP网络请求 @ohos.net.http 动态获取数据,进行UI列表刷新,这想必是应用开发最常见的功能。但是根据官网网络请求的示例代码进行功能开发时,封装方法进行HTTP请求后,返回......
  • 【HarmonyOS】【ArkTS】如何使用HTTP网络请求获取动态数据刷新UI界面
    ​【关键字】HttpRequest、ArkTS、网络数据请求、@ohos.net.http 【前言】在使用ArkTS开发HarmonyOS应用时,需要调用HTTP网络请求 @ohos.net.http 动态获取数据,进行UI列表刷新,这想必是应用开发最常见的功能。但是根据官网网络请求的示例代码进行功能开发时,封装方法进行HTTP......
  • RDS 、HDFS、 mapreduce 、spark 、hive、 hbase 、zookeeper 、kafka 、flume、mysql
    这些技术是大数据领域的常用组件,它们之间的配置文件依赖关系如下:RDS是一种关系型数据库,可以独立安装和使用,不需要依赖其他组件。HDFS是Hadoop分布式文件系统,通常与MapReduce一起使用。在Hadoop集群中,HDFS需要配置core-site.xml和hdfs-site.xml两个文件,其中core-site......
  • Spark消费Kafka
    0.前言之前先写了处理数据的spark,用文件读写测了一批数据,能跑出结果;今天调通了Kafka,拼在一起,没有半点输出,查了半天,发现是之前的处理部分出了问题,把一个不等号打成了等号,把数据全filter没了。很恐怖,我保证这段时间我没动过这段代码,但上次真的跑出东西了啊(尖叫1.配置流程主节点......