首页 > 其他分享 >Spark(三)Spark Core(二)

Spark(三)Spark Core(二)

时间:2024-09-20 14:45:41浏览次数:8  
标签:Core 存储 缓存 分区 RDD 依赖 内存 Spark

RDD详解

RDD持久化/缓存

  • 某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //缓存/持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读取HDFS的文件,rdd2会真正执行持久化
rdd2.sortBy(_._2,false).collect//触发action,会去读缓存中的数据,执行速度会比之前快,因为rdd2已经持久化到内存中了
  • 创建了一个RDD,从HDFS上指定的路径"hdfs://node01:8020/words.txt"读取文本文件,每行文本都是RDD中的一个元素
  • flatMap(x=>x.split(" ")):将每行文本拆分成单词,使用空格作为分隔符
  • map((_,1)):对于每个单词,创建一个键值对,键是单词本身,值是1
  • reduceByKey(+):对每个键(单词)的值进行求和,这样每个单词的计数就计算出来了
  • sortBy(_._2,false):对RDD中的元素按照它们的值(即单词计数)进行排序,_._2指的是元组的第二个元素,false参数表示降序排序
  • collect:这是一个行动操作,它会触发实际的计算,并将结果作为数组返回给驱动程序

persist方法和cache方法

  • RDD通过persist或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

存储级别

  • 默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的
持久化级别 说明
MORY_ONLY(默认) 将RDD以非序列化的Java对象存储在JVM中,如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算,这是默认级别
MORY_AND_DISK(开发中可以使用这个) 将RDD以非序列化的Java对象存储在JVM中,如果数据在内存中放不下,则溢写到磁盘上,需要时则会从磁盘上读取
MEMORY_ONLY_SER(Java and Scala) 将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储,这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU
MEMORY_AND_DISK_SER(Java and Scala) 与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们
DISK_ONLY 将RDD分区存储在磁盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上
OFF_HEAP(实验中) 与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中(即不是直接存储在JVM内存中)

RDD容错机制Checkpoint

  • Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用
SparkContext.setCheckpointDir("目录") //HDFS的目录

RDD.checkpoint
  • 开发中如何保证数据的安全性性及读取效率:可以对频繁使用且重要的数据,先做缓存/持久化,再做checkpint操作

RDD的依赖关系

  • RDD有两种依赖,分别为宽依赖(wide dependency/shuffle dependency)和窄依赖(narrow dependency)

窄依赖

  • 父RDD的一个分区只会被子RDD的一个分区依赖
  • 窄依赖的多个分区可以并行计算
  • 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了

宽依赖

  • 父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
  • 划分Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段

标签:Core,存储,缓存,分区,RDD,依赖,内存,Spark
From: https://www.cnblogs.com/shihongpin/p/18422389

相关文章

  • Spark(二)Spark Core(一)
    RDD详解前提:MapReduce框架采用非循环式的数据流模型,把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销,且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象,因此出现了RDD这个概念概念RDD(ResilientDistributedDataset)叫做弹性......
  • linux 下安装 RabbitMq 及 .net core 实操多种模式
    当前系统DebianGNU/Linux12安装命令1、sudoaptupdate//更新系统2、sudoapt-getinstallrabbitmq-server//安装rabbitMq服务3、sudoservicerabbitmq-serverstart//启动rabbitMq4、sudosystemctlenablerabbitmq-server//设置......
  • NetCore Channel-生产者&消费者
    usingSystem.Threading.Channels;namespaceChannelDemo{publicclassChannelMgr{//优势//允许开发者根据需要创建具有固定容量(有界)或无限容量(无界)的通道//staticChannel<string>channel=Channel.CreateBounded<strin......
  • Spark(一)概述
    基本概念Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎SparkvsHadoopSpark和Hadoop的根本差异是多个作业之间的数据通信问题:Spark多个作业之间数据通信是基于内存,而Hadoop是基于磁盘HadoopSpark类型分布式基础平台,包含计算,存储,调度分......
  • C#|.net core 基础 - 扩展数组添加删除性能最好的方法
    今天在编码的时候遇到了一个问题,需要对数组变量添加新元素和删除元素,因为数组是固定大小的,因此对新增和删除并不友好,但有时候又会用到,因此想针对数组封装两个扩展方法:新增元素与删除元素,并能到达以下三个目标:1、性能优异;2、兼容性好;3、方便使用;这三个目标最麻烦的应该就是性......
  • 如何在 ASP.NET Core Web API 方法执行前后 “偷偷“ 作一些 “坏“ 事?初识 ActionFil
    前言:什么是ActionFilterAttribute?ActionFilterAttribute是一种作用于控制器Action方法的特性(Attribute),通过它,你可以在操作执行前后、异常处理时等不同的阶段插入自定义逻辑。比如在执行操作方法之前修改请求参数、记录日志、进行权限验证等操作,在执行操作方法之后发送邮件......
  • Framebuffer core
    Framebuffercore,在Linux系统中,主要指的是与Framebuffer设备驱动相关的核心代码和功能。Framebuffer是Linux内核为显示设备提供的一套应用程序接口(API),它抽象了显示硬件的底层差异,使得开发者可以通过操作内存中的帧缓冲区来间接控制显示设备,从而实现图形的显示和渲染。Framebuffer......
  • .net core使用RabbitMQ
    目录1.基本概念2.环境搭建3.使用 RabbitMQ是一个可靠且成熟的消息传递和流代理,它很容易部署在云环境、内部部署和本地机器上。它目前被全世界数百万人使用。1.基本概念生产者(Producer)生产者是一个发送消息的程序。发送消息的程序可以是任何语言编写的,只要它能够......
  • ASP.NET Core中如何对不同类型的用户进行区别限流
    老板提出了一个新需求,从某某天起,免费用户每天只能查询100次,收费用户100W次。这是一个限流问题,聪明的你也一定想到了如何去做:记录用户每一天的查询次数,然后根据当前用户的类型使用不同的数字做比较,超过指定的数字就返回错误。嗯,原理就是这么简单。不过真正写起来还要考虑更多问题......
  • 易优eyoucms网站报错 \core\library\think\App.php Fatal error: Call to undefin
    当你遇到 Fatalerror:Calltoundefinedfunctionthink\switch_citysite() 这样的错误时,说明在代码中调用了一个未定义的函数 think\switch_citysite()。这种情况通常是因为函数没有被正确地引入或者该函数根本不存在于当前的代码库中。解决方案确认函数的存在检查 s......