intRDD = sc.parallelize([3, 1, 2, 5, 5])
intRDD.collect()
[3, 1, 2, 5, 5]
stringRDD = sc.parallelize(["Apple", "Orange", "Banana", "Grape", "Apple"])
stringRDD.collect()
['Apple', 'Orange', 'Banana', 'Grape', 'Apple']
def addOne(x):
return (x + 1)
intRDD.map(addOne).collect()
[4, 2, 3, 6, 6]
intRDD.map(lambda x : x + 1).collect()
[4, 2, 3, 6, 6]
stringRDD.map(lambda x: "fruit" + x).collect()
['fruitApple', 'fruitOrange', 'fruitBanana', 'fruitGrape', 'fruitApple']
intRDD.filter(lambda x : x < 3).collect()
[1, 2]
intRDD.filter(lambda x : x == 3).collect()
[3]
intRDD.filter(lambda x : 1 < x and x < 5).collect()
[3, 2]
intRDD.filter(lambda x : x >= 5 or x < 3).collect()
[1, 2, 5, 5]
stringRDD.filter(lambda x : "ra" in x).collect()
['Orange', 'Grape']
intRDD.distinct().collect()
['Orange', 'Grape', 'Apple', 'Banana']
stringRDD.distinct().collect()
['Orange', 'Grape', 'Apple', 'Banana']
sRDD = intRDD.randomSplit([0.4, 0.6])
sRDD[0].collect()
[3, 5]
sRDD[1].collect()
[1, 2, 5]
gRDD = intRDD.groupBy(lambda x : "even" if(x % 2 == 0) else "odd").collect()
print(gRDD[0][0], sorted(gRDD[0][1]))
('even', [2])
print(gRDD[1][0], sorted(gRDD[1][1]))
('odd', [1, 3, 5, 5])
intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
intRDD2 = sc.parallelize([5, 6])
intRDD3 = sc.parallelize([2, 7])
intRDD1.union(intRDD2).union(intRDD3).collect()
[3, 1, 2, 5, 5, 5, 6, 2, 7]
intRDD1.intersection(intRDD2).collect()
[5]
intRDD1.subtract(intRDD2).collect()
[2, 1, 3]
print(intRDD1.cartesian(intRDD2).collect())
[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]
基本动作运算
1.读取元素
intRDD.first()
3
intRDD.take(2)
[3, 1]
intRDD.takeOrdered(3)
[1, 2, 3]
intRDD.takeOrdered(3, key = lambda x : -x)
[5, 5, 3]
2.统计功能
intRDD.stats()
(count: 5, mean: 3.2, stdev: 1.6, max: 5.0, min: 1.0)
intRDD.min()
1
intRDD.max()
5
intRDD.stdev()
1.6000000000000001
intRDD.count()
5
intRDD.sum()
16
intRDD.mean()
3.2
RDD Key-Value 基本“转换”运算
1.创建Key-Value RDD
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
kvRDD1.keys().collect()
[3, 3, 5, 1]
kvRDD1.values().collect()
[4, 6, 6, 2]
2.使用filter筛选key运算
kvRDD1.filter(lambda keyValue : keyValue[0] < 5).collect()
[(3, 4), (3, 6), (1, 2)]
3.使用filter筛选Value运算
kvRDD1.filter(lambda keyValue : keyValue[1] < 5).collect()
[(3, 4), (1, 2)]
4.mapValues运算
kvRDD1.mapValues(lambda x : x * x).collect()
[(3, 16), (3, 36), (5, 36), (1, 4)]
5.sortByKey从小到大按照key排序
kvRDD1.sortByKey(ascending=True).collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]
kvRDD1.sortByKey().collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]
6.sortByKey从大到小按照key排序
kvRDD1.sortByKey(ascending=False).collect()
[(5, 6), (3, 4), (3, 6), (1, 2)]
7.reduceByKey
kvRDD1.reduceByKey(lambda x, y : x + y).collect()
[(1, 2), (3, 10), (5, 6)]
多个RDD Key-Value"转换"运算
1.创建多个Key-Value RDD
kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD2 = sc.parallelize([(3, 8)])
kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
kvRDD2.collect()
[(3, 8)]
2.Key-Value RDD join 运算
join运算可以把两个RDD按照相同的key值join起来
kvRDD1.join(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
3.Key-Value leftOuterJoin 运算
和普通的join区别是leftjoin找不到会显示none
kvRDD1.leftOuterJoin(kvRDD2).collect()
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]
4.Key-Value RDD rightOuterJoin运算
kvRDD1.rightOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]
5.Key-Value subtractBuKey运算
subtractByKey会删除相同的key值数据
kvRDD1.subtractByKey(kvRDD2).collect()
[(1, 2), (5, 6)]
Key-Value “动作”运算
1.Key_Value first运算
kvRDD1.first()
(3, 4)
kvRDD1.take(2)
[(3, 4), (3, 6)]
2.读取第一项数据的元素
kvFirst=kvRDD1.first()
kvFirst
(3, 4)
kvFirst[0]
3
kvFirst[1]
4
3.计算RDD中每一个Key值的项数
kvRDD1.countByKey()
defaultdict(int, {1: 1, 3: 2, 5: 1})
4.collectAsMap 创建Key-value的字典
KV = kvRDD1.collectAsMap()
KV
{1: 2, 3: 6, 5: 6}
type(KV)
dict
5. 使用对照表转换数据
KV[3]
6
KV[1]
2
6. Key-Value lookup运算
kvRDD1.lookup(3)
[4, 6]
kvRDD1.lookup(5)
[6]
Broadcast 广播变量
Broadcast 属于共享变量,共享变量可以节省内存和运行时间,提升并行处理的执行效率。共享变量包括Broadcast和 accumulator。
Broadcast广播变量使用规则如下:
(1)可以使用SparkContext.broadcast([value])创建。
(2)使用.value的方法来读取广播变量的值。
(3)Broadcast广播变量被创建后不能修改。
kvFruit = sc.parallelize([(1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")])
fruitMap=kvFrult.collectAsMap()
print("对照表:" + str(fruitMap))
对照表:{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
fruitMap = kvFrult.collectAsMap()
bcFruitMap = sc.broadcast(fruitMap)
print("字典 : " + str(fruitMap))
字典 : {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
fruitIds = sc.parallelize([2, 4, 1, 3])
print("水果编号: " + str(fruitIds.collect()))
水果编号: [2, 4, 1, 3]
print("使用Broadcast 广播变量进行转换==>")
fruitNames = fruitIds.map(lambda x : bcFruitMap.value[x]).collect()
print("水果名称: " + str(fruitNames))
使用Broadcast 广播变量进行转换==>
水果名称: ['orange', 'grape', 'apple', 'banana']
accumulator 累加器
使用规则
accumulator累加器可以使用SparkContext.accumlator([value])创建
使用.add()进行累加
在task中, 例如for each循环中,不能读取累加器的值
只有循环外才能使用.value来读取累加器的值
intRDD = sc.parallelize([3, 1, 2, 5, 5])
total = sc.accumulator(0.0)
num = sc.accumulator(0)
intRDD.foreach(lambda i : [total.add(i), num.add(1)])
avg = total.value / num.value
print("total = " + str(total.value) + ", num = " + str(num.value) + ", avg = " + str(avg))
total = 16.0, num = 5, avg = 3.2
RDD Persistence持久化
intRddMemory = sc.parallelize([3, 1, 2, 5, 5])
intRddMemory.persist()
ParallelCollectionRDD[143] at parallelize at PythonRDD.scala:475
intRddMemory.is_cached
True
intRddMemory.unpersist()
ParallelCollectionRDD[143] at parallelize at PythonRDD.scala:475
intRddMemory.is_cached
False
intRddMemoryAndDisk = sc.parallelize([3, 1, 2, 5, 5])
intRddMemoryAndDisk.persist(StorageLevel.MEMORY_AND_DISK)
ParallelCollectionRDD[144] at parallelize at PythonRDD.scala:475
intRddMemoryAndDisk.is_cached
True
标签:kvRDD1,parallelize,RDD,collect,intRDD,sc,基本操作,lambda
From: https://www.cnblogs.com/leexiao/p/18095611