目录
六,共享变量
当spark集群在许多节点上运行一个函数时,默认情况下会把这个函数涉及到的对象在每个节点生成一个副本。
但是,有时候需要在不同节点或者节点和Driver之间共享变量。
Spark提供两种类型的共享变量,广播变量和累加器。
广播变量是不可变变量,实现在不同节点不同任务之间共享数据。
广播变量在每个机器上缓存一个只读的变量,而不是为每个task生成一个副本,可以减少数据的传输。
累加器主要是不同节点和Driver之间共享变量,只能实现计数或者累加功能。
累加器的值只有在Driver上是可读的,在节点上不可见。
#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)
[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value
45
# 计算数据的平均值
rdd = sc.parallelize([1.1,2.1,3.1,4.1])
total = sc.accumulator(0)
count = sc.accumulator(0)
def func(x):
total.add(x)
count.add(1)
rdd.foreach(func)
total.value/count.value
2.6
七,分区操作
分区操作包括改变分区操作,以及针对分区执行的一些转换操作。
glom:将一个分区内的数据转换为一个列表作为一行。
coalesce:shuffle可选,默认为False情况下窄依赖,不能增加分区。repartition和partitionBy调用它实现。
repartition:按随机数进行shuffle,相同key不一定在同一个分区
partitionBy:按key进行shuffle,相同key放入同一个分区
HashPartitioner:默认分区器,根据key的hash值进行分区,相同的key进入同一分区,效率较高,key不可为Array.
RangePartitioner:只在排序相关函数中使用,除相同的key进入同一分区,相邻的key也会进入同一分区,key必须可排序。
TaskContext: 获取当前分区id方法 TaskContext.get.partitionId
mapPartitions:每次处理分区内的一批数据,适合需要分批处理数据的情况,比如将数据插入某个表,每批数据只需要开启一次数据库连接,大大减少了连接开支
mapPartitionsWithIndex:类似mapPartitions,提供了分区索引,输入参数为(i,Iterator)
foreachPartition:类似foreach,但每次提供一个Partition的一批数据
glom
#glom将一个分区内的数据转换为一个列表作为一行。
a = sc.parallelize(range(10),2)
b = a.glom()
b.collect()
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
coalesce
#coalesce 默认shuffle为False,不能增加分区,只能减少分区
#如果要增加分区,要设置shuffle = true
#parallelize等许多操作可以指定分区数
a = sc.parallelize(range(10),3)
print(a.getNumPartitions())
print(a.glom().collect())
3
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
b = a.coalesce(2)
print(b.glom().collect())
[[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]]
repartition
#repartition按随机数进行shuffle,相同key不一定在一个分区,可以增加分区
#repartition实际上调用coalesce实现,设置了shuffle = True
a = sc.parallelize(range(10),3)
c = a.repartition(4)
print(c.glom().collect())
[[6, 7, 8, 9], [3, 4, 5], [], [0, 1, 2]]
#repartition按随机数进行shuffle,相同key不一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c = a.repartition(2)
print(c.glom().collect())
[[('a', 1), ('a', 2), ('c', 3)], [('a', 1)]]
partitionBy
#partitionBy按key进行shuffle,相同key一定在一个分区
a = sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c = a.partitionBy(2)
print(c.glom().collect())
mapPartitions
#mapPartitions可以对每个分区分别执行操作
#每次处理分区内的一批数据,适合需要按批处理数据的情况
#例如将数据写入数据库时,可以极大的减少连接次数。
#mapPartitions的输入分区内数据组成的Iterator,其输出也需要是一个Iterator
#以下例子查看每个分区内的数据,相当于用mapPartitions实现了glom的功能。
a = sc.parallelize(range(10),2)
a.mapPartitions(lambda it:iter([list(it)])).collect()
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
mapPartitionsWithIndex
#mapPartitionsWithIndex可以获取两个参数
#即分区id和每个分区内的数据组成的Iterator
a = sc.parallelize(range(11),2)
def func(pid,it):
s = sum(it)
return(iter([str(pid) + "|" + str(s)]))
[str(pid) + "|" + str]
b = a.mapPartitionsWithIndex(func)
b.collect()
#利用TaskContext可以获取当前每个元素的分区
from pyspark.taskcontext import TaskContext
a = sc.parallelize(range(5),3)
c = a.map(lambda x:(TaskContext.get().partitionId(),x))
c.collect()
[(0, 0), (1, 1), (1, 2), (2, 3), (2, 4)]
foreachPartitions
#foreachPartition对每个分区分别执行操作
#范例:求每个分区内最大值的和
total = sc.accumulator(0.0)
a = sc.parallelize(range(1,101),3)
def func(it):
total.add(max(it))
a.foreachPartition(func)
total.value
199.0
aggregate
#aggregate是一个Action操作
#aggregate比较复杂,先对每个分区执行一个函数,再对每个分区结果执行一个合并函数。
#例子:求元素之和以及元素个数
#三个参数,第一个参数为初始值,第二个为分区执行函数,第三个为结果合并执行函数。
rdd = sc.parallelize(range(1,21),3)
def inner_func(t,x):
return((t[0]+x,t[1]+1))
def outer_func(p,q):
return((p[0]+q[0],p[1]+q[1]))
rdd.aggregate((0,0),inner_func,outer_func)
(210, 20)
aggregateByKey
#aggregateByKey的操作和aggregate类似,但是会对每个key分别进行操作
#第一个参数为初始值,第二个参数为分区内归并函数,第三个参数为分区间归并函数
a = sc.parallelize([("a",1),("b",1),("c",2),
("a",2),("b",3)],3)
b = a.aggregateByKey(0,lambda x,y:max(x,y),
lambda x,y:max(x,y))
b.collect()
[('b', 3), ('a', 2), ('c', 2)]
标签:glom,parallelize,pyspark,分区,RDD,key,collect,sc,Spark
From: https://blog.csdn.net/qq_32146369/article/details/138395477