首页 > 编程语言 >【pyspark速成专家】5_Spark之RDD编程3

【pyspark速成专家】5_Spark之RDD编程3

时间:2024-05-25 10:57:07浏览次数:28  
标签:glom parallelize pyspark 分区 RDD key collect sc Spark

目录

​编辑

六,共享变量

七,分区操作


六,共享变量

当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。

但是,有时候需要在不同节点或者节点和Driver之间共享变量。

Spark提供两种类型的共享变量,广播变量和累加器。

广播变量是不可变变量,实现在不同节点不同任务之间共享数据。

广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。

累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。

累加器的值只有在Driver上是可读的,在节点上不可见。

#广播变量 broadcast 不可变,在所有节点可读

broads = sc.broadcast(100)

rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())

print(broads.value)

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100

#累加器 只能在Driver上可读,在其它节点只能进行累加

total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)

rdd.foreach(lambda x:total.add(x))
total.value

45

# 计算数据的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)

def func(x):
    total.add(x)
    count.add(1)
    
rdd.foreach(func)

total.value/count.value

2.6

七,分区操作

分区操作包括改变分区操作,以及针对分区执行的一些转换操作。

glom:将一个分区内的数据转换为一个列表作为一行。

coalesce:shuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现。

repartition:按随机数进行shuffle,相同key不一定在同一个分区

partitionBy:按key进行shuffle,相同key放入同一个分区

HashPartitioner:默认分区器,根据key的hash值进行分区,相同的key进入同一分区,效率较高,key不可为Array.

RangePartitioner:只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。

TaskContext: 获取当前分区id方法 TaskContext.get.partitionId

mapPartitions:每次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支

mapPartitionsWithIndex:类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)

foreachPartition:类似foreach,但每次提供一个Partition的一批数据

glom

#glom将一个分区内的数据转换为一个列表作为一行。
a = sc.parallelize(range(10),2)
b = a.glom()
b.collect() 

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

coalesce

#coalesce 默认shuffle为False,不能增加分区,只能减少分区
#如果要增加分区,要设置shuffle = true
#parallelize等许多操作可以指定分区数
a = sc.parallelize(range(10),3)  
print(a.getNumPartitions())
print(a.glom().collect())

3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]

b = a.coalesce(2) 
print(b.glom().collect())

[[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]

repartition

#repartition按随机数进行shuffle,相同key不一定在一个分区,可以增加分区
#repartition实际上调用coalesce实现,设置了shuffle = True
a = sc.parallelize(range(10),3)  
c = a.repartition(4) 
print(c.glom().collect())

[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]

#repartition按随机数进行shuffle,相同key不一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.repartition(2)
print(c.glom().collect())

[[('a', 1), ('a', 2), ('c', 3)], [('a', 1)]]

partitionBy

#partitionBy按key进行shuffle,相同key一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])  
c = a.partitionBy(2)
print(c.glom().collect())

mapPartitions

#mapPartitions可以对每个分区分别执行操作
#每次处理分区内的一批数据,适合需要按批处理数据的情况
#例如将数据写入数据库时,可以极大的减少连接次数。
#mapPartitions的输入分区内数据组成的Iterator,其输出也需要是一个Iterator
#以下例子查看每个分区内的数据,相当于用mapPartitions实现了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()

[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

mapPartitionsWithIndex

#mapPartitionsWithIndex可以获取两个参数
#即分区id和每个分区内的数据组成的Iterator
a = sc.parallelize(range(11),2)

def func(pid,it):
    s = sum(it)
    return(iter([str(pid) + "|" + str(s)]))
    [str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()

#利用TaskContext可以获取当前每个元素的分区
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()

[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]

foreachPartitions

#foreachPartition对每个分区分别执行操作
#范例:求每个分区内最大值的和
total = sc.accumulator(0.0)

a = sc.parallelize(range(1,101),3)

def func(it):
    total.add(max(it))
    
a.foreachPartition(func)
total.value

199.0

aggregate

#aggregate是一个Action操作
#aggregate比较复杂,先对每个分区执行一个函数,再对每个分区结果执行一个合并函数。
#例子:求元素之和以及元素个数
#三个参数,第一个参数为初始值,第二个为分区执行函数,第三个为结果合并执行函数。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
    return((t[0]+x,t[1]+1))

def outer_func(p,q):
    return((p[0]+q[0],p[1]+q[1]))

rdd.aggregate((0,0),inner_func,outer_func)

(210, 20)

aggregateByKey

#aggregateByKey的操作和aggregate类似,但是会对每个key分别进行操作
#第一个参数为初始值,第二个参数为分区内归并函数,第三个参数为分区间归并函数

a = sc.parallelize([("a",1),("b",1),("c",2),
                             ("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
                            lambda x,y:max(x,y))
b.collect()

[('b', 3), ('a', 2), ('c', 2)]

标签:glom,parallelize,pyspark,分区,RDD,key,collect,sc,Spark
From: https://blog.csdn.net/qq_32146369/article/details/138395477

相关文章

  • 计算机毕业设计python+spark天气预测 天气可视化 天气大数据 空气质量检测 空气质量分
    摘  要近些年大数据人工智能等技术发展迅速,我国工业正努力从“制造”迈向“智造”实现新跨越。神经网络(NeuronNetwork)是一种计算模型,通过大量数据的学习,来发现数据之间的模式和规律,模仿人脑神经元的工作方式。随着算力的提升和算法的不断成熟图像识别技术已经完全融入到生......
  • Spark-Web页面(默认端口:4040)
    访问WebUI页面的前提:启动Spark安装目录下sbin/start-all.sh。jps可以看到Master和Worker。1、Spark的Master页面http://master:8080/2、Spark的Worker页面http://master:8081/3、Spark的Job页面(只有任务运行过程中可以查看该页面)http://master:4040/  调用Jar包时......
  • PySpark-大数据分析实用指南-全-
    PySpark大数据分析实用指南(全)原文:zh.annas-archive.org/md5/62C4D847CB664AD1379DE037B94D0AE5译者:飞龙协议:CCBY-NC-SA4.0前言ApacheSpark是一个开源的并行处理框架,已经存在了相当长的时间。ApacheSpark的许多用途之一是在集群计算机上进行数据分析应用程序。本书......
  • Spark_DLS语法:
    Spark_DLS语法:目录Spark_DLS语法:1.[Spark]-SQL2.DSL示例3.DSL解析json,csv文件1.printSchema()打印表结构2.studentDF.show(100)默认20条数据3.studentDF.show(false)某些值太长,完整打印每一列的数据4.DSL函数4.DataSource4.1csv:需要手动指定列名和类型4.2jsonparquet格式......
  • 《Spark编程基础》(Scala版)第八章简答题答案(自制)
    8SparkMLlib简答题T1与MapReduce框架相比,为何Spark更适合进行机器学习各算法的处理?答:通常情况下,机器学习算法参数学习的过程都是迭代计算。MapReduce由于延迟高、磁盘开销大、无法高效支持迭代计算,不适合高效的实现机器学习算法;Spark由于立足于内存计算,所以能很好地与......
  • 分布式数据处理-《Spark编程基础》(Scala版)第二章简答题答案(自制)
    2Scala语言基础简答题T1简述Scala语言与Java语言的联系与区别。答:①联系:(1)Scala和Java均运行在JVM之上;(2)Scala和Java均有面向对象语言特点;②区别:(1)Scala是类Java的多范式编程;Java是命令式编程。T2简述Scala语言的基本特性。......
  • 分布式数据处理-《Spark编程基础》(Scala版)第四章简答题答案(自制)
    4Spark环境搭建和使用方法简答题T1请阐述Spark的四种部署模式。......
  • 分布式数据处理-《Spark编程基础》(Scala版)第三章简答题答案(自制)
    3Spark的设计与运行原理简答题T1Spark是基于内存计算的大数据计算平台,请阐述Spark的主要特点。......
  • 分布式数据处理-《Spark编程基础》(Scala版)第七章简答题答案(自制)
    7SparkStreaming简答题T1请阐述静态数据和流数据的区别?答:静态数据是可以看作是静止不动的,适合进行批量计算;流数据是指数据以大量、快速、时变的流形式持续到达的,适合进行实时计算。同时,流计算被处理后,只有部分进入数据库成为静态数据,其余部分则被丢弃。T2请阐述批量计算和......
  • 分布式数据处理-《Spark编程基础》(Scala版)第六章简答题答案(自制)
    6SparkSQL简答题T1请阐述Hive中SQL查询转化为MapReduce作业的具体过程。❌答:HiveSQL命令或查询首先进入到驱动模块,由驱动模块中的编译器进行解析编译,并由优化器对该操作进行优化计算,然后交给执行器去执行,执行器通常的任务是启动一个或多个MapReduce任务,详细过程如......