首页 > 编程语言 >Spark - [04] RDD编程

Spark - [04] RDD编程

时间:2024-05-12 22:41:58浏览次数:22  
标签:parallelize 04 scala Int rdd RDD Spark Array

题记部分

 

一、RDD编程模型

  在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。经过一系列的 transformations 定义 RDD 之后,就可以调用 actions 触发 RDD的计算,action 可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等)。在 Spark 中,只有遇到 action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
  要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker。

 

 

二、RDD的创建

在 Spark 中创建 RDD 的创建方式可以分为三种:

  • 从集合中创建 RDD
  • 从外部存储创建 RDD
  • 从其他 RDD 创建

(1)从集合中创建RDD

# 使用 parallelize( ) 从集合创建
scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

# 使用 makeRDD( ) 从集合创建
scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

 

(2)从外部存储创建RDD

包括本地的文件系统,还有所有 Hadoop 支持的数据集,比如 HDFS、Cassandra、HBase 等。

scala> var rdd2 = sc.textFile("hdfs://host:9000/RELEASE")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs://host:9000/RELEASE
MapPartitionsRDD[4] at textFile at <console>:24

 

(3)从其他RDD创建

通过 transformation 算子返回 value 类型的RDD 或者 KV 类型的RDD

 

 

三、RDD的转换

3.1、Value类型

 

(1)map( )

作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成。

理解:可以对接收到的每一条数据进行处理

// 创建
scala> val source = sc.parallelize(1 to 10)
source:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[8] at parallelize at <console>:24
// 打印
scala> source.collect()
res7:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10)
// 将所有元素 *2 
scala> val mapadd = source.map(_*2)
mapadd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[9] at map at <console>: 26
// 打印最终结果
scala> mapadd.collect()
res8:Array[Int]=Array(2,4,6,8,10,12,14,16,18,20)

 

(2)mapPartitions( )

作用:类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]。

假设有N个元素,有M个分区,那么map 的函数将被调用N次,而mapPartitions 被调用M次,一个函数一次处理所有分区。

// 创建RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4] at parallelize at <console>:24
// 每个元素*2组成新的RDD
scala> rdd.mapPartitions(x=>x.map(_*2))
res3:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD [6] at mapPartitions at <console>:27
// 打印新的RDD
scala> res3.collect
res4: Array[Int]=Array(2,4,6,8)

 

map( ) 和 mapPartitions( ) 区别

map():每次处理一条数据

mapPartitions():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致 OOM

开发指导:当内存空间较大的时候建议使用 mapPartition( ),以提高处理效率。

 

(3)mapPartitionsWithIndex( )

作用:类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD上运行时,func的函数类型必须是(Int,Iterator[T]) => Iterator[U]

// 创建RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4] at parallelize at <console>:24
// 每个元素跟所在分区形成一个元组组成一个新的RDD
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map(index,_)))
indexRdd:org.apache.spark.rdd.RDD[(Int,Int)]=MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26

scala>indexRdd.collect
res2:Array[(Int,Int)]=Array((0,1),(0,2),(1,3),(1,4))

 

(4)flatMap( )

作用:类似于 map,但是每一个输入元素可以被映射为0或者多个输出元素(所以func 应该返回一个序列,而不是单一元素)

需求:创建一个元素为1-5的RDD,运用 flatMap 创建一个新的RDD,新的RDD为原 RDD 的每个元素的2倍(2,4,6,8,10)

// 创建RDD
scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24

scala>sourceFlat.collect()
res11: Array[Int] = Array(1,2,3,4,5)

scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap:org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>: 26

scala> flatMap.collect()
res12: Array[Int]=Array(1,1,2,1,2,3,1,2,3,4,1,2,3,4,5)

 

 

 

 

 

3.2、双Value类型交互

 

3.3、key-value类型

 

 

 

 

 

 

 

 

— 要养成终身学习的习惯 —

标签:parallelize,04,scala,Int,rdd,RDD,Spark,Array
From: https://www.cnblogs.com/houhuilinblogs/p/18188343

相关文章

  • Spark - [03] RDD概述
    RDD,分布式数据集,是Spark中最基本的数据抽象。 一、什么是RDDRDD(ResilientDistributedDataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。  二、RDD的属性①一组分区(Partition),即数据......
  • hdu2049递归问题
    解法:从N中选出M个C[n][m],然后乘上错排公式;f[n]=(n-1)*(f[n-1]+f[n-2]);f[0]=0;f[1]=1;importjava.util.Scanner;publicclasshdu2049{publicstaticintC(inta,intb){if(a==b){return1;}elseif(b==1){returna......
  • hdu2048递归问题
    题目思路:其实我还真没怎么看出来这个是递推(嘤嘤自己好菜哇)……不过很清楚的是我们需要求出每个人拿到的都不是自己的牌子的情况有几种,按照日常经验,如果前n个人已经做到了错排(也就是拿的都不是自己的牌子),当第n+1个人来的时候,他跟任意一个人交换后就能做到这n+1个人都实现错排,!!但是......
  • 在 Kubernetes 上运行 Apache Spark 进行大规模数据处理的实践
    在刚刚结束的KubernetesCommunityDay上海站,亚马逊云科技在云原生分论坛分享的“在Kunernets上运行ApacheSpark进行大规模数据处理实践”引起了现场参与者的关注。开发者告诉我们,为了充分利用Kubernetes的高可用设计、弹性,在越来越多的应用场景中,他们选择从Yarn迁移到......
  • hdu2045 递归水题
    这道题的解法就是站在涂色后的最后一块,思考前一块是怎么涂色的就可以了,比如如果最后一块的前一块是与第一块颜色不同的情况,则最后一块只有一种颜色可以涂,其方法的数目等于f(n-1),而当最后一块前面一块的颜色与第一块相同时,则倒数第三块一定与第一块的颜色不同,则涂到倒数第三块有f(n-......
  • 软件测评笔记04--操作系统
    编译原理高级语言源程序中的错误分为两类:语法错误和语义错误,其中语义错误可分为静态语义和动态语义错误语法错误:语言结构上的错误静态语义错误:编译时能发现的程序含义上的错误动态语义错误:只有程序运行时才能表现出来 程序编译过程过程:词法分析、语法分析、语义分析词法......
  • m2_day04 [线程]
    课程内容:线程的概念引用多线程的原因?如何实现线程?如何控制线程?线程类其它常用方法线程的概念线程所在包:java.lang.Thread理解程序进程线程之间的区别:程序:保存在物理介质中的代码片段​进程:一旦程序运行起来就变成了操作系统当中的一个进程......
  • hive on spark配置
     hive-site.xml <?xmlversion="1.0"?><?xml-stylesheettype="text/xsl"href="configuration.xsl"?><configuration>  <!--配置Hive保存元数据信息所需的MySQLURL地址-->  <property>    <name>javax......
  • 升级Artalk评论至V2-8-3-解决Error-请求响应404
    记录升级Artalk评论至V2.8.3,并解决ArtalkErrorError:请求响应404,无法获取评论列表数据朗读全文Yourbrowserdoesnotsupporttheaudioelement.有什么用升级Artalk评论至V2.8.3解决ArtalkErrorError:请求响应404,无法获取评论列表数据心路历程Artalk配置的SMT......
  • docker-compose spark集群搭建
    需求满足产品数据团队计算相关需求,搭建spark集群,本集群为一主两从(两台设备)部署环境服务器资源docker-compose安装curl-Lhttps://github.com/docker/compose/releases/download/1.24.1/docker-compose-`uname-s`-`uname-m`-o/usr/local/bin/docker-compose......