首页 > 其他分享 >RDD缓存 检查点 共享变量 累加器

RDD缓存 检查点 共享变量 累加器

时间:2024-03-26 22:00:49浏览次数:26  
标签:map 缓存 rdd 累加器 RDD 检查点 line

1.缓存的基本介绍
    缓存介绍: 
        1. 当一个RDD的产生过程(计算过程), 是比较昂贵的(生成RDD整个计算流程比较复杂), 并且这个RDD可能会被多方(RDD会被重复使用)进行使用,
        2. 此时为了提升计算效率, 可以将RDD的结果设置为缓存, 这样后续在使用这个RDD的时候, 无需在重新计算了, 直接获取缓存中数据即可.
        3. 缓存可以提升Spark的容错的能力, 正常情况, 当Spark中某一个RDD计算失败的时候, 需要对整个RDD链条进行整体的回溯计算.
        4. 有了缓存后, 可以将某些阶段的RDD进行缓存操作, 这样当后续的RDD计算失败的时候, 可以从最近的一个缓存中恢复数据 重新计算即可, 无需在回溯所有链条.

    应用场景: 
        1. 当一个RDD被重复使用的时候, 可以使用缓存来解决
        2. 当一个RDD产生非常昂贵的时候, 可以将RDD设置为缓存
        3. 当需要提升容错能力的时候, 可以在局部设置一些缓存来提升容错能力

    注意事项:
        1. 缓存仅仅是一种临时存储, 可以将RDD的结果数据存储到内存(executor) 或者 磁盘, 甚至可以存储到堆外内存(executor以外系统内存)中.
        2. 由于缓存的存储是一种临时存储, 所以缓存的数据有可能丢失的, 所以缓存操作并不会将RDD之间的依赖关系给截断掉(清除掉), 
           以防止当缓存数据丢失的时候, 可以让程序进行重新计算操作
        3. 缓存的API都是lazy的, 设置缓存后, 并不会立即触发, 如果需要立即触发, 后续必须跟一个action算子, 建议使用 count
    
    如何使用缓存呢?
        设置缓存的相关API: 
            rdd.cache()                    //执行设置缓存的操作, cache在设置缓存的时候, 仅能将缓存数据放置到内存中
            rdd.persist(设置缓存级别)    //执行设置缓存的操作, 默认情况下, 将缓存数据放置到内存中, 同时支持设置其他缓存方案

        清理缓存的相关API:
            1. 默认情况下, 当程序执行完成后, 缓存会被自动清理
            2. 如需手动清理缓存, 则写法为:     rdd.unpersist()                //清理缓存
        
        常用的缓存级别有那些呢? 
            NONE                //表示不缓存
            MEMORY_ONLY         //仅缓存到内存中,直接将整个对象保存到内存中
            MEMORY_ONLY_SER        //仅缓存到内存中, 同时在缓存数据的时候, 会对数据进行序列化(从对象 --> 二进制数据)操作, 可以在一定程序上减少内存的使用量
            
            MEMORY_AND_DISK:
            MEMORY_AND_DISK_2    //优先将数据保存到内存中, 当内存不足的时候, 可以将数据保存到磁盘中, 带2的表示保存二份
            
            MEMORY_AND_DISK_SER: 
            MEMORY_AND_DISK_SER_2    //优先将数据保存到内存中, 当内存不足的时候, 可以将数据保存到磁盘中, 带2的表示保存二份, 
                                    //对于保存到内存的数据, 会进行序列化的操作, 从而减少内存占用量 提升内存保存数据体量,对磁盘必须要进行序列化
            
            //上述的缓存级别, 带2表示的保存多个副本, 从而提升数据可靠性
            
    序列化解释:
        将数据 从 对象 转换为 二进制的数据, 对于RDD的数据来说, 内部数据都是一个个对象, 
        如果没有序列化是直接将对象存储到内存中, 如果有序列化会将对象转换为二进制然后存储到内存中.
        
    好处: 
        减少内存的占用量, 从而让有限内存可以存储更多的数据
    弊端: 
        会增大对CPU的占用量, 因为转换的操作, 需要使用CPU来工作

	#cache sample. 
	from pyspark import SparkContext, SparkConf,StorageLevel
	import os
	import jieba
	import time
	
	if __name__ == '__main__':

		conf = SparkConf().setMaster('local[*]').setAppName('cache sample')
		sc = SparkContext(conf=conf)

		rdd_init = sc.textFile('file:///export/data/workspace/cache.sample')

		rdd_filter = rdd_init.filter(lambda line: line.strip() != '' and len(line.split()) == 6)

		rdd_map = rdd_filter.map(lambda line: (
			line.split()[0],
			line.split()[1],
			line.split()[2][1:-1],
			line.split()[3],
			line.split()[4],
			line.split()[5]
		))

		# -----------------设置缓存的代码--------------------
		# StorageLevel 这个类需要在前面的from pyspark中加入此对象的导入
		# 一般建议, 设置完缓存后, 让其立即触发
		rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()


		# 快速抽取函数: ctrl + alt + m
		xuqiu_1()

		# ----------手动清理缓存------------
		rdd_map.unpersist().count()		//如果这里清理缓存了, 则后续的 xuqiu_2()这个函数, 就用不了缓存了, 可以在DAG图中查看"小绿球"

		xuqiu_2()

		time.sleep(1000)

    node1:4040页面的 导航条部分, 选择 Storage 即可查看缓存.
        

3. 检查点的基本介绍
    概述:
        1. checkPoint跟缓存类似, 也可以将某一个RDD结果进行存储操作, 一般都是将数据保存到HDFS中, 提供一种更加可靠的存储方案.
        2. 所以说采用checkpoint方案, 会将RDD之间的依赖关系给截断掉(因为 数据存储非常的可靠)
        3. checkpoint出现,  从某种角度上也可以提升执行效率(没有缓存高), 更多是为了容错能力
        4. 对于checkpoint来说, 大家可以将其理解为对整个RDD链条进行设置阶段快照的操作
        5. 由于checkpoint这种可靠性, 所以Spark本身只管设置, 不管删除, 所以checkpoint即使程序停止了, checkpoint数据依然存储着, 不会被删除, 需要手动删除
    
    如何设置checkpoint呢? 
        1. 通过sc对象, 设置checkpoint保存数据的位置: sc.setCheckpointDir('hdfs路径')
        2. 通过rdd.checkpoint() 设置开启检查点 (lazy)
        3. 通过rdd.count() 触发检查点的执行
    

# checkpoint sample code
	from pyspark import SparkContext, SparkConf
	import os
	import time

	os.environ['SPARK_HOME'] = '/export/server/spark'
	os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
	os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

	if __name__ == '__main__':

		conf = SparkConf().setMaster('local[*]').setAppName('sougou')
		sc = SparkContext(conf=conf)

		//设置检查点位置
		sc.setCheckpointDir('/spark/checkpoint/')		//默认路径是HDFS的, 所以可以省略 hdfs://node1:8020  如果存Linux本地, 需要写: file:///

		# 读取数据
		rdd_init = sc.textFile('file:///export/data/workspace/checkpoint.sample')

		# 演示代码, 无任何价值
		rdd_map1 = rdd_init.map(lambda line:line)
		rdd_map2 = rdd_map1.map(lambda line: line)
		rdd_3 = rdd_map2.repartition(3)
		rdd_map3 = rdd_3.map(lambda line: line)
		rdd_map4 = rdd_map3.map(lambda line: line)
		rdd_4 = rdd_map4.repartition(2)
		rdd_map5 = rdd_4.map(lambda line: line)

		# 开启检查点
		rdd_map5.checkpoint()		//这后边没法直接 接着 .count(),  因为 checkpoint()方法不返回任何的内容. 
		rdd_map5.count()

		print(rdd_map5.count())

		time.sleep(1000)
	//步骤: 先注释检查点的代码, 然后去node1:4040中把该程序的(runjob)部分的DAG图截下来, 然后在开启检查点的代码, 同样截取DAG图, 然后对比下即可.
	

 

5. 缓存和检查点的区别
    面试题:
        在Spark中 RDD的缓存和检查点有什么区别呢?
    答案:
        1. 存储位置
            缓存: 会将RDD的结果数据缓存到内存或者磁盘, 或者堆外内存
            检查点: 会将RDD的结果数据存储到HDFS(默认),当然也支持本地存储(仅在local模式,但如果是local模式, 检查点无所谓)
                    //因为本地模式主要是测试来用的, 数据量也不会特别的大, 所以, 设置检查点意义不是特别大.

        2. 依赖关系
            缓存: 由于缓存存储是一种临时存储, 所以缓存不会截断掉依赖关系, 以防止缓存丢失后, 进行回溯计算
            检查点: 会截断掉依赖关系, 因为检查点方案认为存储数据是可靠的, 不会丢失

        3. 生命周期
            缓存: 当整个程序执行完成后(一个程序中是包含多个JOB任务的), 会自动清理掉缓存数据,或者也可以在程序运行中手动清理
            检查点: 会将数据保存到HDFS中, 不会自动删除, 即使程序停止了, 检查点数据依然存在, 只能手动删除数据(会永久保存)
            

6. 缓存和检查点共用操作
    问:
        在实际使用中, 在Spark程序中, 是使用缓存呢 还是检查点呢?
    答:
        会将两种方案都作用于程序中, 一般是先设置检查点, 然后设置缓存

 

 

# cache and checkpoint sample code
		from pyspark import SparkContext, SparkConf,StorageLevel
		import os
		import jieba
		import time

		os.environ['SPARK_HOME'] = '/export/server/spark'
		os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
		os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'


		def xuqiu_1():
			# 获取搜索词
			rdd_search = rdd_map.map(lambda line_tup: line_tup[2])
			# 对搜索词进行分词操作
			rdd_keywords = rdd_search.flatMap(lambda search: jieba.cut(search))
			# 将每个关键词转换为  (关键词,1) 进行分组统计
			rdd_res = rdd_keywords.map(lambda keyword: (keyword, 1)).reduceByKey(lambda agg, curr: agg + curr)
			#  对结果数据进行排序(倒序)
			rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
			# 获取结果(前50)
			print(rdd_sort.take(50))


		def xuqiu_2():
			# SQL: select  user,搜索词 ,count(1) from  表 group by user,搜索词;
			# 提取 用户和搜索词数据
			rdd_user_search = rdd_map.map(lambda line_tup: (line_tup[1], line_tup[2]))
			# 基于用户和搜索词进行分组统计即可
			rdd_res = rdd_user_search.map(lambda user_search: (user_search, 1)).reduceByKey(lambda agg, curr: agg + curr)
			rdd_sort = rdd_res.sortBy(lambda res: res[1], ascending=False)
			print(rdd_sort.take(30))


		if __name__ == '__main__':
			conf = SparkConf().setMaster('local[*]').setAppName('sougou')
			sc = SparkContext(conf=conf)

			//------------设置检查点保存位置------------
			sc.setCheckpointDir('/spark/checkpoint')

			rdd_init = sc.textFile('file:///export/data/workspace/cache_checkpoint.sample')

			# 过滤数据: 保证数据不能为空 并且数据字段数量必须为 6个
			rdd_filter = rdd_init.filter(lambda line: line.strip() != '' and len(line.split()) == 6)

			# 对数据进行切割, 将数据放置到一个元组中: 一行放置一个元组
			rdd_map = rdd_filter.map(lambda line: (
				line.split()[0],
				line.split()[1],
				line.split()[2][1:-1],
				line.split()[3],
				line.split()[4],
				line.split()[5]
			))

			# ---- 开启检查点 和 缓存 -----
			# 设置开启检查点
			rdd_map.checkpoint()			
			rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()	//先开启检查点, 后开启缓存(推荐)
			
			#或者
			
			rdd_map.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
			rdd_map.checkpoint()			
			rdd_map.count()			//先开启缓存, 后开启检查点, 可能会导致检查点失效, 这种方式理解即可, 一般不推荐. 
			
		
			# 快速抽取函数: ctrl + alt + m
			xuqiu_1()


			xuqiu_2()

			time.sleep(1000)

        
7. RDD共享变量的引入操作
    


8. RDD的共享变量_广播变量(Broadcast Variables)
      作用:   
       减少Driver和executor之间网络数据传输数据量, 以及减少内存的使用 从而提升效率
    
    适用于: 
        多个Task线程需要使用到同一个变量的值的时候
    
    默认做法:     //即: 没有广播变量的情况.
        各个线程会将这个变量形成一个副本, 然后拷贝到自己的线程中, 进行使用即可, 由于一个executor中有多个线程, 那么意味需要拷贝多次, 
        导致executor和 Driver之间的传输量增加, 对带宽有一定影响, 同时拷贝了多次, 对内存占用量提升
    
    解决方案:     //引入一个广播变量
        让executor从Driver中拉取过来一个副本即可, 一个executor只需要拉取一次副本, 让executor中各个线程读取executor中变量即可, 
        这样减少网络传输量, 同时减少内存使用量
    
    注意: 
        广播变量是只读的, 各个线程只能读取数据, 不能修改数据.
    
    如何使用广播变量: 
        通过sc创建一个广播变量:      //在Driver设置
            广播变量对象 = sc.broadcast(值)
        
        获取变量: 在Task获取
            广播变量对象.value

        from pyspark import SparkContext, SparkConf
		import os
		import time

		os.environ['SPARK_HOME'] = '/export/server/spark'
		os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
		os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

		if __name__ == '__main__':
			print("演示广播变量的使用操作")
			conf = SparkConf().setMaster('local[*]').setAppName('sougou')
			sc = SparkContext(conf=conf)

			# 设置广播变量
			bc = sc.broadcast(1000)

			# 2 读取数据
			rdd_init = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

			# 3- 将每个数据都加上指定值 ,此值由广播变量给出:
			# 获取广播:  bc.value
			rdd_res = rdd_init.map(lambda num: num + bc.value)

			# 4- 打印结果
			rdd_res.foreach(lambda num: print(num))

			time.sleep(10000)

 

 9. RDD的共享变量_累加器(Accumulators)的使用操作
    解释:
        累加器主要提供在多个线程中对同一个变量进行累加的操作, 对于多个线程来说只能对数据进行累加, 不能读取数据, 读取数据的操作只能有Driver来处理
    
    应用场景: 
        全局累加操作
    
    如何使用呢?  
        1. 由于Driver设置一个累加器的初始值
            累加器对象 = sc.accumulator(初始值)
        2. 由rdd(线程)来进行累加操作
            累加器对象.add(累加内容)
        3. 在Driver中获取值:
            累加器.value

 累加器错误示范

        from pyspark import SparkContext, SparkConf
		import os

		# 锁定远端python版本:
		os.environ['SPARK_HOME'] = '/export/server/spark'
		os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
		os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

		if __name__ == '__main__':
			print("演示累加器")
			# 1- 创建SparkContext对象
			conf = SparkConf().setMaster('local[*]').setAppName('sougou')
			sc = SparkContext(conf=conf)

			# 定义一个变量
			a = 10

			# 2 读取数据
			rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

			# 3- 处理数据: 为a将列表中变量的值累加上去
			def fn1(num):
				global a
				a += num
				//a.value		//这里如果直接写  会报错, 因为: 对于多个线程来说只能对数据进行累加, 不能读取数据, 读取数据的操作只能有Driver来处理.
				return num

			rdd_map = rdd_init.map(fn1)

			print(rdd_map.collect())
	
			print(a)		//a的值, 还是 10

 累加器正确使用

		from pyspark import SparkContext, SparkConf
		import os

		# 锁定远端python版本:
		os.environ['SPARK_HOME'] = '/export/server/spark'
		os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
		os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

		if __name__ == '__main__':
			print("演示累加器")
			# 1- 创建SparkContext对象
			conf = SparkConf().setMaster('local[*]').setAppName('sougou')
			sc = SparkContext(conf=conf)

			# 定义一个变量, 引入累加器
			a = sc.accumulator(10)

			# 2 读取数据
			rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

			# 3- 处理数据: 为a将列表中变量的值累加上去
			def fn1(num):
				# 对累加器进行进行增加
				a.add(num)
				return num


			rdd_map = rdd_init.map(fn1)

			print(rdd_map.collect())
			
			
			print(a.value)		//获取累加器的结果, 65
			

10. RDD的累加器的小问题说明(重复累加)
    问题:
        当我们对设置过累加器的RDD, 后续在进行一些其他的操作, 调度多次action算子后, 发现累加器被累加了多次, 本应该只累加一次, 这种情况是如何产生的呢? 
    
    原因: 
        当调度用多次action的时候, 会产生多个JOB(计算任务), 由于RDD值存储计算的规则, 不存储数据, 当第一个action计算完成后, 得到一个结果,
        整个任务完成了,  接下来再运行下一个job的任务, 这个任务依然需要重头开始进行计算得到最终结果.
        这样就会 累加的操作就会被触发多次,从而被累加了多次
    
    解决方案: 
        对累加器执行完的RDD 设置为缓存或者检查点, 或者两个都设置, 即可解决. 

 

		from pyspark import SparkContext, SparkConf
		import os

		# 锁定远端python版本:
		os.environ['SPARK_HOME'] = '/export/server/spark'
		os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
		os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

		if __name__ == '__main__':
			print("演示累加器")
			# 1- 创建SparkContext对象
			conf = SparkConf().setMaster('local[*]').setAppName('sougou')
			sc = SparkContext(conf=conf)

			# 定义一个变量, 引入累加器
			a = sc.accumulator(10)

			# 2 读取数据
			rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

			# 3- 处理数据: 为a将列表中变量的值累加上去
			def fn1(num):
				# 对累加器进行进行增加
				a.add(num)
				return num

			rdd_map = rdd_init.map(fn1)
			
			//rdd_map.cache().count()		//设置缓存, 如果不设置, 最终 a.value的值是 120, 如果设置了, 是65(说明读缓存了)

			print(rdd_map.collect())
			
			rdd_2 = rdd_map.map(lambda num: num + 1)
			print(rdd_2.top(10))		//我们发现, 下述再打印累加器的结果时, 它的值就变成了: 120, 说明累加器又执行了一次.
			
			print(a.value)		//获取累加器的结果

11. RDD内核调度_RDD的依赖关系
    RDD之间是存在依赖关系, 这也是RDD中非常重要特性, 一般将RDD之间的依赖关系划分为两种依赖关系: 窄依赖  和 宽依赖
    窄依赖:
        目的: 
            让各个分区的数据可以并行的计算操作.

        指的是:
            上一个RDD的某一个分区的数据 被下一个RDD的某一个分区全部都继承处理下来, 我们将这种关系称为窄依赖关系.
            //简单理解: 一对一的关系.
            
    宽依赖:
        目的: 
            划分stage(阶段)

        指的是:
            上一个RDD的分区数据被下一个RDD的多个分区所接收并处理(shuffle), 我们将这种关系称为宽依赖.

    结论:
        1. 判断两个RDD之间是否存在宽依赖, 主要看两个RDD之间是否存在shuffle, 一旦产生了shuffle, 必须是前面的先计算完成后, 然后才能进行后续的计算操作
        2. 在Spark中, 每一个算子是否存在shuffle操作, 在Spark设计的时候就已经确定了, 比如说 map一定不会有shuffle, 比如说reduceByKey一定是存在shuffle.
        3. 如何判断这个算子是否会走shuffle呢?  
            //可以从查看DAG执行流程图, 如果发现一执行到这个算子, 阶段被分为多个, 那么一定是存在shuffle, 
            //以及可以通过查看每个算子的文档的说明信息, 里面也会有一定的说明
        4. 但是: 在实际操作中, 我们一般不会纠结这个事情, 我们要以实现需求为导向, 需要用什么算子的时候, 我们就采用什么算子来计算即可, 
           虽然说过多的shuffle操作, 会影响我们的执行的效率, 但是依然该用的还是要用的
        5. 判断宽窄依赖的关系最重要就是: 看两个RDD之间是否存在shuffle.
    

12. DAG以及DAG流程图形成说明
    DAG解释:    //有向无环图.
        整个的流程, 有方向, 不能往回走, 不断的往下继续的过程

    问: 
        如何形成一个DAG执行流程图呢?
    答:
        1. 第一步: 当Driver遇到一个action算子后, 就会将这个算子所对应所有依赖的RDD全部都加载进来形成一个stage阶段
        2. 第二步: 对整个阶段进行回溯操作, 从后往前, 判断每一个RDD之间依赖关系, 如果是宽依赖形成一个新的阶段, 如果窄依赖, 放置到一起
        3. 当整个回溯全部完成后, 形成了DAG的执行流程图
        //详见图解.


    
    
13. DAG的阶段划分以及线程的划分操作
    //详见图解.


    分区数量相关的文档链接: https://spark.apache.org/docs/3.1.2/configuration.html  搜 spark.default.parallelism 这个配置信息.
  

 

标签:map,缓存,rdd,累加器,RDD,检查点,line
From: https://blog.csdn.net/qq_43428465/article/details/137058040

相关文章

  • RDD基本操作(残)
    intRDD=sc.parallelize([3,1,2,5,5])intRDD.collect()[3,1,2,5,5]stringRDD=sc.parallelize(["Apple","Orange","Banana","Grape","Apple"])stringRDD.collect()['Apple','Oran......
  • SparkSQL与RDD的选择?
        对当下的企业级数据应用来说,SparkSQL的应用空间肯定要比单纯的写RDD处理大很多,因为SparkSQL比RDD好写的多,也更贴近业务需求和更友好的能处理数据,而且技术门槛也更低。        但RDD是Spark中所有的数据抽象的基础,最大的特点是对开发者而言暴露的是不带sch......
  • spark-rdd
    分布式集合对象上的API称之为算子算子分为两类:transformation算子:指返回值仍然是rdd,类似于stream里的中间流这类算子与中间流相同,是懒加载的action算子:返回值不是rdd,类似于stream里的终结流常见算子:transformation算子1.map(func):将rdd的数据一条一条的处理,返回新......
  • 寒假学习 11 编程实现将 RDD 转换为 DataFrame
    请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。scala>importorg.apache.spark.sql.types._importorg.apache.spark.sql.types._ scala>importorg.......
  • RDD 任务划分
    v>RDD任务切分中间分为:Application、Job、Stage和TaskApplication:初始化一个SparkContext即生成一个Application; Job:一个Action算子就会生成一个Job; Stage:Stage等于宽依赖(ShuffleDependency)的个数加1; Task:一个Stage阶段中,最后一个RDD的......
  • Spark中RDD阶段划分
    分析源码步骤:第一步程序入口: 第二步一直查看runjob方法,可以看出collect()是RDD行动算子,与Job运行提交相关rdd.scala sparkcontext.scala  sparkcontext.scala  sparkcontext.scala 第三步runJob()与DAG调度有关sparkcontext.scala第四步runJob()核心代码-......
  • spark实验四RDD 编程初级实践
    1.spark-shell交互式编程请到本教程官网的“下载专区”的“数据集”中下载chapter5-data1.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:Tom,DataBase,80Tom,Algorithm,50Tom,DataStructure,60Jim,DataBase,90Jim,Algorithm,60Jim,DataStructure,80……请根......
  • RDD算子
    分布式集合对象上的API称之为算子算子分为两类:transformation算子:指返回值仍然是rdd,类似于stream里的中间流这类算子与中间流相同,是懒加载的action算子:返回值不是rdd,类似于stream里的终结流 常见算子:1.map(func):将rdd的数据一条一条的处理,返回新的rdd,和stream流的......
  • rdd常用的Action算子和分区操作算子
    frompysparkimportSparkConf,SparkContext#创建Spark配置和上下文对象conf=SparkConf().setAppName("SparkActionsAndPartitions")sc=SparkContext(conf=conf)#示例数据data=[("apple",1),("banana",2),("apple",3),(&quo......
  • 常用的rddTransformation算子
    根据文章:暑假生活每周总结10  frompysparkimportSparkContext#创建SparkContext对象sc=SparkContext("local","RDDTransformationsExample")#假设我们有一个RDD数据源data=sc.parallelize([1,2,3,4,5])#1.map算子mapped_data=data.map(lambda......