RDD介绍
Resilient: RDD中的数据可以存储在内存中或者磁盘中。
Dataset:一个数据集合,用于存放数据的。
Distributed: RDD中的数据是分布式存储的,可用于分布式计算
RDD五大特性
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("RDD")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
# 特性1:分区
rdd1 = sc.parallelize([1,2,3,4,5,6],3)
print(rdd1.glom().collect())
""" [[1, 2], [3, 4], [5, 6]] """
# 特性2:RDD的方法会作用在其所有的分区上
rdd2 = sc.parallelize([1,2,3,4,5,6],3).map(lambda x:x * 10)
print(rdd2.glom().collect())
""" [[10, 20], [30, 40], [50, 60]] """
# 特性3:RDD之间是有依赖关系(RDD有血缘关系)
rdd1 = sc.textFile("./data/words.txt")
rdd2 = rdd1.flatMap(lambda x:x.split(' '))
rdd3 = rdd2.map(lambda x: (x, 1))
print(rdd3.collect())
"""[('make', 1), ('make', 1), ('make', 1), ('make', 1), ('love', 1), ('love', 1), ('love', 1), ('love', 1)]"""
# 特性4: Key-Value型的RDD可以有分区器
""" ("a",1) ("a",2) ("b",3) ... 根据K的不同进行分区 """
# 特性5: RDD的分区规划,会尽量靠近数据所在的服务器
RDD 创建方式
# coding:utf8
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setAppName("RDD").setMaster("local[*]")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)
List = [1,2,3,4,5,6,7,8,9]
""" 创建RDD对象: parallelize """
# sc.parallelize(可迭代对象,分区数量)
api1 = sc.parallelize(List,numSlices=3)
print("集合分布式RDD得出的结果:",api1.collect())
""" 读取本地/HDFS文件数据: textFile"""
# sc.textFile(路径,最小分区,编码)
# sc.textFile("hdfs://192.168.88.201:8020/input/words.txt")
api2 = sc.textFile('./data/words.txt',)
print("结果:", api2.collect())
""" 读取本地/HDFS一堆文件: wholeTextFiles"""
# sc.textFile(路径,最小分区,编码)
api3 = sc.wholeTextFiles('./data/tiny_files',)
print("结果:",api3.collect())
Transformation算子
"""
map: 遍历每行数据进行操作
res: [2,3,4,5,6,7,8,9,10]
"""
def check(x):
return x + 1
res = sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(check).collect()
"""
flatMap: 对rdd执行map操作,然后解除嵌套
res:
map : ["D,W,Q","M,A,D"] -> [['D', 'W', 'Q'], ['M', 'A', 'D']]
flatMap : ["D,W,Q","M,A,D"] -> ['D', 'W', 'Q', 'M', 'A', 'D']
"""
def check(data):
return str(data).split(",")
res = sc.parallelize(["D,W,Q","M,A,D"]).flatMap(check).collect()
"""
reduceByKey: 自动按照key分组,对v进行你想要的逻辑方式处理.
理解: check函数传入的x1与x2,实际是相同的K的数量 按照你需要的方式进行处理
res :
x1 * x2 : [('a',1),('a',5),('a',2)] -> [('a', 10)]
结果叠加:第一次的结果 + 第二次 R1 + 公式 -> (1):1 * 5 = 5(R1) (2):5(R1) * 2 = 10
x1 * x2 + 5: [('a',1),('a',5),('a',2)] -> [('a', 25)]
结果叠加:第一次的结果 + 第二次 R1 + 公式 -> (1):1 * 5 + 5 = 10(R1) (2):10(R1) * 2 + 5 = 25
"""
def check(x1,x2):
return x1 * x2 + 5
res = sc.parallelize([('a',1),('a',5),('a',2)]).reduceByKey(check).collect()
"""
mapValues : 针对二元元组RDD ,对其内部的二元元组的 Value执行map操作
res : [('a',1),('a',5),('a',2)] -> [('a', 2), ('a', 6), ('a', 3)]
"""
def check(x):
return x + 1
res = sc.parallelize([('a',1),('a',5),('a',2)]).mapValues(check).collect()
"""
groupBy : 对数据进行分组
res : [('Alice', [('Alice', 25), ('Alice', 35)]), ('Bob', [('Bob', 30), ('Bob', 40)]), ('Chris', [('Chris', 20)])]
"""
def check(x):
return x[0],list(x[1])
data = [("Alice", 25), ("Bob", 30), ("Alice", 35), ("Bob", 40), ("Chris", 20)]
rdd = sc.parallelize(data)
grouped_rdd = rdd.groupBy(lambda x: x[0]) # 使用groupBy()方法按 x[0] 姓名进行分组
res = grouped_rdd.map(check).collect() # 格式化输出 : x[0],list(x[1])
"""
filter: 筛选
res : [1, 3, 5, 7, 9]
"""
def check(x):
if x % 2 == 0:
pass
else:
return x
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
filtered_rdd = rdd.filter(check).collect()
print(filtered_rdd)
"""
distinct : 去重
res : [1, 2, 3, 4, 5]
"""
data = [1, 2, 3, 1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
distinct_rdd = rdd.distinct().collect()
"""
union : 合并
res : [1, 1, 3, 3, "a", "b", "c"]
"""
rdd1 = sc.parallelize([1, 1, 3,3])
rdd2 = sc.parallelize(["a", "b", "c"])
union_rdd = rdd1.union(rdd2).collect()
"""
按照K来关联:
join(内连接): rdd1与rdd2都有的才做计算
[(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]
leftOuterJoin(左外): rdd1与rdd2 以左边的为准计算,没有的为None(4在rdd2里面没有,所有为None)
[(2, ('Banana', 'Yellow')), (4, ('Grapes', None)), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]
rightOuterJoin(右外): rdd1与rdd2 以右边边的为准计算,没有的为None(5在rdd1里面没有,所有为None)
[(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange')), (5, (None, 'Green'))]
"""
rdd1 = sc.parallelize([(1, "Apple"), (2, "Banana"), (3, "Orange"), (4, "Grapes")])
rdd2 = sc.parallelize([(1, "Red"), (2, "Yellow"), (3, "Orange"), (5, "Green")])
joined_rdd = rdd1.join(rdd2).collect()
"""
intersection : 俩交集
res : [4, 5]
"""
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
intersect_rdd = rdd1.intersection(rdd2).collect()
"""
glom : 返回分区列表
res : [[1, 2], [3, 4, 5]]
"""
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
glom_rdd = rdd.glom().collect() # 使用glom()方法将每个分区的元素转换为列表
"""
groupByKey: 根据键对RDD中的KEY进行分组
res:
Key: 1, Values: ['apple', 'orange']
Key: 2, Values: ['banana', 'grape']
Key: 3, Values: ['kiwi']
"""
rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "orange"), (2, "grape"), (3, "kiwi")])
# 使用groupByKey()方法根据键对元素进行分组
grouped_rdd = rdd.groupByKey()
# 打印每个键对应的值列表
for key, values in grouped_rdd.collect():
print(f"Key: {key}, Values: {list(values)}")
"""
sortBy: 排序
res:
Ture: [1, 2, 3, 5, 8]
False:[8, 5, 3, 2, 1]
"""
rdd = sc.parallelize([5, 2, 8, 1, 3])
sorted_rdd = rdd.sortBy(lambda x: x,True).collect()
"""
sortByKey: 排序
res:
Ture: [(1, 'Banana'), (2, 'Orange'), (3, 'Apple')]
False:~
"""
rdd = sc.parallelize([(3, "Apple"), (1, "Banana"), (2, "Orange")])
sorted_rdd = rdd.sortByKey(ascending=True).collect()
Action算子
# countByKey: 用于 对键值 对RDD中的 键 进行计数
data = [("apple", 1), ("banana", 2), ("orange", 3), ("apple", 4), ("kiwi", 5), ("banana", 6)]
rdd = sc.parallelize(data)
print(rdd.countByKey())
# defaultdict(<class 'int'>, {'apple': 2, 'banana': 2, 'orange': 1, 'kiwi': 1})
# collect: 将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
# 数据量不能太大,考虑到内存问题
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
# [1, 2, 3, 4, 5]
# reduce: 数据集 按照你传入的逻辑进行 聚合
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda x, y: x + y))
# 15
# fold: 与reduce不同的是,fold操作还可以指定一个初始值,用于处理空RDD的情况
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.fold(5, lambda x, y: x + y)) # 设置初始值
# 25
# first: 第一个元素
rdd = sc.parallelize([1, 2, 3, 4, 5]).first
# 1
# takeSample : 随机抽样
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeSample(False, 3) # False:不取相同数据,Ture:可以取相同数据
# [3, 8, 5]
# takeOrdered : 排序取前N个
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeOrdered(3) # rdd.takeOrdered(3, lambda x:-x) 降序排序
# [1, 2, 3]
# foreach: 跟map一样,但是没有返回值(由分区Executor直接执行!)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x:x + 1)
# None
# saveAsTextFile: 将RDD保存为文本文件(由分区Executor直接执行!)
rdd = sc.parallelize(["Hello", "World", "PySpark", "Example"])
rdd.saveAsTextFile("output_directory")
分区算子
# mapPartitions: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
List = []
for x in iterator: # 将每个分区里面的元素都放到空列表
List.append(x + 1)
return List
result = rdd.mapPartitions(check).collect()
# [2, 3, 4, 5, 6]
# foreachPartition: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
List = []
for x in iterator: # 将每个分区里面的元素都放到空列表
List.append(x + 1)
print(List)
result = rdd.foreachPartition(check)
print(result)
# [2, 3, 4, 5, 6]
# None
# partitionBy : 自定义分区(分成几个列表)
data = [("A", 1), ("B", 2), ("C", 3), ("D", 4), ("E", 5)]
rdd = sc.parallelize(data)
result = rdd.partitionBy(3).glom().collect()
print(result)
# [[('B', 2), ('C', 3), ('D', 4)], [('A', 1)], [('E', 5)]]
# repartition: 决定新的分区数
rdd = sc.parallelize(range(100))
print("初始分区数:", rdd.getNumPartitions())
# 对 RDD 进行重分区,将分区数设置为 4
repartitioned_rdd = rdd.repartition(4)
print("重分区后的分区数:", repartitioned_rdd.getNumPartitions())
# 初始分区数: 1
# 重分区后的分区数: 4
标签:parallelize,collect,data,rdd,RDD,sc,数据
From: https://www.cnblogs.com/wanghong1994/p/17777424.html