首页 > 其他分享 >Spark(二)Spark Core(一)

Spark(二)Spark Core(一)

时间:2024-09-20 14:02:51浏览次数:8  
标签:返回 Core 元素 RDD func Spark 数据

RDD详解

  • 前提:MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销,且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象,因此出现了RDD这个概念

概念

  • RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合
  • Resilient:它是弹性的,RDD中的数据可以保存在内存或者磁盘里面
  • Distributed:它里面的元素是分布式存储的,可以用于分布式计算
  • Dataset: 它是一个集合,可以存放很多元素

并行计算能力:指Spark框架将大型数据集自动分割成多个小块,并将这些小块分散到多个计算节点上,每个节点可以独立地对其分配到的数据块进行处理和计算,从而实现计算任务的并行执行,类似于多个人同时完成不同的部分工作,最后将各自的结果汇总起来,以提高数据处理的速度和效率

属性

  • 分区列表:数据集的基本组成单位,对于RDD来说,每个分片都会被一个计算任务处理,分片数决定并行度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值
  • 计算函数:一个函数会被作用在每一个分区,Spark中RDD的计算是以分片为单位的,compute函数会被作用到每个分区上
  • 依赖关系:一个RDD会依赖于其他多个RDD,RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系,在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算,这也是Spark的容错机制
  • 分区函数:可选项,对于kv类型的RDD会有一个Partitioner,即RDD的分区函数,默认为HashPartitioner
  • 最佳位置:可选项,一个列表,存储存取每个分区的优先位置,对于一个HDFS文件来说,这个列表保存的就是每个分区所在的块的位置,按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能选择那些存有数据的worker节点来进行任务计算

RDD是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算

RDD API

RDD的创建方式

(1)由外部存储系统的数据集创建,包括本地的文件系统,还有Hadoop支持的数据集,比例HDFS、Cassandra、HBase等

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
  • val关键字用于声明一个不可变的变量rdd1,
  • sc是一个SparkContext对象的实例,它是与Spark集群交互的主要入口,通过SparkContext,可以创建RDD、执行作业、获取集群的信息等
  • .textFile()是SparkContext对象的一个方法,用于读取存储在给定URI(统一资源标识符)的文本文件,这里的URI是一个HDFS的路径,指向了文件words.txt的位置
  • 这段代码执行结束后,rdd1变量就会指向一个包含文件words.txt中所有行的RDD,每个元素都是文件中的一行文本,可以被用于后续的转换(map、filter、reduceByKey等)和行动(count、collect、saveAsTextFile等)操作

(2)通过已有的RDD经过算子转换生成新的RDD

val rdd2=rdd1.flatMap(_.split(" "))
  • .flatMap():RDD的转换操作,它将函数应用于RDD的每个元素,然后将结果扁平化为一个新的RDD,这里传递给flatMap的是一个匿名函数 _.split(" ")
  • ** _.split(" ")**:这个函数对RDD中的每个元素应用split方法,以空格为分隔符将行分割成单词数组,flatMap将这些数组扁平化成一个单一的RDD,_在scala语言中是一个通配符,表示匿名函数的参数,这里指rdd1中的每一行文本

(3)由一个已经存在的Scala集合创建

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
  • parallelize方法通常用于将本地数据集快速转换为RDD,以便于在Spark中进行并行计算

RDD算子

注意

  • RDD不实际存储真正要计算的数据,而是记录了数据的位置在哪里,数据的转换关系(调用了什么方法,传入什么函数)
  • RDD中的所有转换都是惰性求值/延迟执行的,也就是说并不会直接计算,只有当发生一个要求返回结果给Driver的Action动作时,这些转换才会真正运行
  • 之所以使用惰性求值/延迟执行,是因为这样可以在Action时对RDD操作形成DAG有向无环图进行Stage的划分和并行优化,这种设计让Spark更加有效率地运行

(1)Transformation转换操作:返回一个新的RDD

转换算子 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素,所以func应该返回一个序列,而不是单一元素
mapPartitions(func) 类似于map,但独立地在RDD的每一个分区上运行,因此在类型为T的RDD上运行时,返回一个类型为U的RDD
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD,不会去除重复元素
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
coalesce(numPartitions) 减少RDD的分区数到指定值,在过滤大量数据之后,可以执行此操作
repartition(numPartitions) 重新给RDD分区

(2)Action动作操作:返回值不是RDD(无返回值或返回其他的)

动作算子 含义
reduce(func) 通过fun 函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前n个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统
saveAsObjectFile(path) 将数据集的元素,以Java序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数
foreach(func) 在数据集的每一个元素上,运行函数 func 进行更新
foreachPartition(func) 在数据集的每一个分区上,运行函数func

统计操作

算子 含义
count 个数
mean 均值
sum 求和
max 最大值
min 最小值
variance 方差
sampleVariance 从采样中计算方差
stdev 标准差:衡量数据的离散程度
sampleStdev 采样的标准差
stats 查看统计结果

标签:返回,Core,元素,RDD,func,Spark,数据
From: https://www.cnblogs.com/shihongpin/p/18422132

相关文章

  • linux 下安装 RabbitMq 及 .net core 实操多种模式
    当前系统DebianGNU/Linux12安装命令1、sudoaptupdate//更新系统2、sudoapt-getinstallrabbitmq-server//安装rabbitMq服务3、sudoservicerabbitmq-serverstart//启动rabbitMq4、sudosystemctlenablerabbitmq-server//设置......
  • NetCore Channel-生产者&消费者
    usingSystem.Threading.Channels;namespaceChannelDemo{publicclassChannelMgr{//优势//允许开发者根据需要创建具有固定容量(有界)或无限容量(无界)的通道//staticChannel<string>channel=Channel.CreateBounded<strin......
  • Spark(一)概述
    基本概念Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎SparkvsHadoopSpark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘HadoopSpark类型分布式基础平台,包含计算,存储,调度分......
  • C#|.net core 基础 - 扩展数组添加删除性能最好的方法
    今天在编码的时候遇到了一个问题,需要对数组变量添加新元素和删除元素,因为数组是固定大小的,因此对新增和删除并不友好,但有时候又会用到,因此想针对数组封装两个扩展方法:新增元素与删除元素,并能到达以下三个目标:1、性能优异;2、兼容性好;3、方便使用;这三个目标最麻烦的应该就是性......
  • 如何在 ASP.NET Core Web API 方法执行前后 “偷偷“ 作一些 “坏“ 事?初识 ActionFil
    前言:什么是ActionFilterAttribute?ActionFilterAttribute是一种作用于控制器Action方法的特性(Attribute),通过它,你可以在操作执行前后、异常处理时等不同的阶段插入自定义逻辑。比如在执行操作方法之前修改请求参数、记录日志、进行权限验证等操作,在执行操作方法之后发送邮件......
  • Framebuffer core
    Framebuffercore,在Linux系统中,主要指的是与Framebuffer设备驱动相关的核心代码和功能。Framebuffer是Linux内核为显示设备提供的一套应用程序接口(API),它抽象了显示硬件的底层差异,使得开发者可以通过操作内存中的帧缓冲区来间接控制显示设备,从而实现图形的显示和渲染。Framebuffer......
  • .net core使用RabbitMQ
    目录1.基本概念2.环境搭建3.使用 RabbitMQ是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。1.基本概念生产者(Producer)生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够......
  • ASP.NET Core中如何对不同类型的用户进行区别限流
    老板提出了一个新需求,从某某天起,免费用户每天只能查询100次,收费用户100W次。这是一个限流问题,聪明的你也一定想到了如何去做:记录用户每一天的查询次数,然后根据当前用户的类型使用不同的数字做比较,超过指定的数字就返回错误。嗯,原理就是这么简单。不过真正写起来还要考虑更多问题......
  • 易优eyoucms网站报错 \core\library\think\App.php Fatal error: Call to undefin
    当你遇到 Fatalerror:Calltoundefinedfunctionthink\switch_citysite() 这样的错误时,说明在代码中调用了一个未定义的函数 think\switch_citysite()。这种情况通常是因为函数没有被正确地引入或者该函数根本不存在于当前的代码库中。解决方案确认函数的存在检查 s......
  • 易优eyoucms网站详情页报错报错 \core\library\think\Loader.php 类不存在:app\co
    类不存在:app\common\model\Pic,这个错误表明PHP无法找到类 app\common\model\Pic。这通常是因为类文件未被正确加载或命名空间配置不正确导致的。以下是一些可能的解决步骤:1.确认类文件路径确保类文件 Pic 的路径正确并且文件存在。检查文件路径确认 app\common\model......