首页 > 其他分享 >【大数据】RDD

【大数据】RDD

时间:2023-10-20 16:33:45浏览次数:21  
标签:parallelize collect data rdd RDD sc 数据

RDD介绍


Resilient: RDD中的数据可以存储在内存中或者磁盘中。

Dataset:一个数据集合,用于存放数据的。 

Distributed: RDD中的数据是分布式存储的,可用于分布式计算

RDD五大特性

# coding:utf8
from pyspark import SparkConf, SparkContext


if __name__ == '__main__':
    conf = SparkConf().setAppName("RDD")
    # 通过SparkConf对象构建SparkContext对象
    sc = SparkContext(conf=conf)

    # 特性1:分区
    rdd1 = sc.parallelize([1,2,3,4,5,6],3)
    print(rdd1.glom().collect())
    """  [[1, 2], [3, 4], [5, 6]]    """


    # 特性2:RDD的方法会作用在其所有的分区上
    rdd2 = sc.parallelize([1,2,3,4,5,6],3).map(lambda x:x * 10)
    print(rdd2.glom().collect())
    """  [[10, 20], [30, 40], [50, 60]]    """


    # 特性3:RDD之间是有依赖关系(RDD有血缘关系)
    rdd1 = sc.textFile("./data/words.txt")
    rdd2 = rdd1.flatMap(lambda x:x.split(' '))
    rdd3 = rdd2.map(lambda x: (x, 1))
    print(rdd3.collect())
    """[('make', 1), ('make', 1), ('make', 1), ('make', 1), ('love', 1), ('love', 1), ('love', 1), ('love', 1)]"""


    # 特性4: Key-Value型的RDD可以有分区器
    """ ("a",1) ("a",2) ("b",3) ...  根据K的不同进行分区  """

    
    # 特性5: RDD的分区规划,会尽量靠近数据所在的服务器

RDD 创建方式

# coding:utf8
from pyspark import SparkConf, SparkContext


if __name__ == '__main__':

    conf = SparkConf().setAppName("RDD").setMaster("local[*]")
    # 通过SparkConf对象构建SparkContext对象
    sc = SparkContext(conf=conf)

    List = [1,2,3,4,5,6,7,8,9]

    """  创建RDD对象:   parallelize """
    # sc.parallelize(可迭代对象,分区数量)
    api1 = sc.parallelize(List,numSlices=3)
    print("集合分布式RDD得出的结果:",api1.collect())


    """  读取本地/HDFS文件数据:   textFile"""
    # sc.textFile(路径,最小分区,编码)
    # sc.textFile("hdfs://192.168.88.201:8020/input/words.txt")
    api2 = sc.textFile('./data/words.txt',)
    print("结果:", api2.collect())


    """  读取本地/HDFS一堆文件:   wholeTextFiles"""
    # sc.textFile(路径,最小分区,编码)
    api3 = sc.wholeTextFiles('./data/tiny_files',)
    print("结果:",api3.collect())

Transformation算子

"""  
     map:       遍历每行数据进行操作 
     res:       [2,3,4,5,6,7,8,9,10]
"""
	def check(x):
    	return x + 1
	res = sc.parallelize([1,2,3,4,5,6,7,8,9], 3).map(check).collect()


    

"""  
    flatMap:   对rdd执行map操作,然后解除嵌套
    res:
          map : ["D,W,Q","M,A,D"] -> [['D', 'W', 'Q'], ['M', 'A', 'D']]
      flatMap : ["D,W,Q","M,A,D"] -> ['D', 'W', 'Q', 'M', 'A', 'D']
"""
	def check(data):
    	return str(data).split(",")
	res = sc.parallelize(["D,W,Q","M,A,D"]).flatMap(check).collect()




"""
    reduceByKey:  自动按照key分组,对v进行你想要的逻辑方式处理.
    理解:         check函数传入的x1与x2,实际是相同的K的数量 按照你需要的方式进行处理

    res :         
                x1 * x2 :    [('a',1),('a',5),('a',2)] ->  [('a', 10)]
                结果叠加:第一次的结果 + 第二次 R1 + 公式  ->  (1):1 * 5 = 5(R1) (2):5(R1) * 2 = 10

                 x1 * x2 + 5:  [('a',1),('a',5),('a',2)] -> [('a', 25)]
                结果叠加:第一次的结果 + 第二次 R1 + 公式     ->  (1):1 * 5 + 5 = 10(R1) (2):10(R1) * 2 + 5 = 25
"""
	def check(x1,x2):
		 return x1 * x2 + 5
	res = sc.parallelize([('a',1),('a',5),('a',2)]).reduceByKey(check).collect()



"""
    mapValues : 针对二元元组RDD ,对其内部的二元元组的 Value执行map操作
    res :       [('a',1),('a',5),('a',2)] ->  [('a', 2), ('a', 6), ('a', 3)]
"""
	def check(x):
   		return x + 1
	res = sc.parallelize([('a',1),('a',5),('a',2)]).mapValues(check).collect()


"""
   groupBy : 对数据进行分组
   res :     [('Alice', [('Alice', 25), ('Alice', 35)]), ('Bob', [('Bob', 30), ('Bob', 40)]), ('Chris', [('Chris', 20)])]
"""
	def check(x):
    	return x[0],list(x[1])
	data = [("Alice", 25), ("Bob", 30), ("Alice", 35), ("Bob", 40), ("Chris", 20)]
	rdd = sc.parallelize(data)
	grouped_rdd = rdd.groupBy(lambda x: x[0])  # 使用groupBy()方法按 x[0] 姓名进行分组
	res = grouped_rdd.map(check).collect()     # 格式化输出 : x[0],list(x[1])


"""
    filter: 筛选
    res   :   [1, 3, 5, 7, 9]
"""
	def check(x):
		if x % 2 == 0:
      		pass
 		else:
       		return x
	data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
	rdd = sc.parallelize(data)
	filtered_rdd = rdd.filter(check).collect()
	print(filtered_rdd)


"""
   distinct : 去重
   res      : [1, 2, 3, 4, 5]
"""
	data = [1, 2, 3, 1, 2, 3, 4, 5]
	rdd = sc.parallelize(data)
	distinct_rdd = rdd.distinct().collect()


    
"""
    union : 合并
      res : [1, 1, 3, 3, "a", "b", "c"]
"""
	rdd1 = sc.parallelize([1, 1, 3,3])
	rdd2 = sc.parallelize(["a", "b", "c"])
	union_rdd = rdd1.union(rdd2).collect()



"""
    按照K来关联:
    join(内连接):    rdd1与rdd2都有的才做计算
                      [(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]

    leftOuterJoin(左外): rdd1与rdd2  以左边的为准计算,没有的为None(4在rdd2里面没有,所有为None)
                         [(2, ('Banana', 'Yellow')), (4, ('Grapes', None)), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange'))]

    rightOuterJoin(右外):  rdd1与rdd2  以右边边的为准计算,没有的为None(5在rdd1里面没有,所有为None)
                            [(2, ('Banana', 'Yellow')), (1, ('Apple', 'Red')), (3, ('Orange', 'Orange')), (5, (None, 'Green'))]
"""
	rdd1 = sc.parallelize([(1, "Apple"), (2, "Banana"), (3, "Orange"), (4, "Grapes")])
	rdd2 = sc.parallelize([(1, "Red"), (2, "Yellow"), (3, "Orange"), (5, "Green")])
	joined_rdd = rdd1.join(rdd2).collect()



"""
    intersection :  俩交集
    res          :  [4, 5]
"""
	rdd1 = sc.parallelize([1, 2, 3, 4, 5])
	rdd2 = sc.parallelize([4, 5, 6, 7, 8])
	intersect_rdd = rdd1.intersection(rdd2).collect()


    
    
"""
    glom :  返回分区列表
    res  :  [[1, 2], [3, 4, 5]]
"""
	rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
	glom_rdd = rdd.glom().collect()         # 使用glom()方法将每个分区的元素转换为列表


    
"""
   groupByKey: 根据键对RDD中的KEY进行分组
   res: 
       Key: 1, Values: ['apple', 'orange']
       Key: 2, Values: ['banana', 'grape']
       Key: 3, Values: ['kiwi']
"""
	rdd = sc.parallelize([(1, "apple"), (2, "banana"), (1, "orange"), (2, "grape"), (3, "kiwi")])
	# 使用groupByKey()方法根据键对元素进行分组
	grouped_rdd = rdd.groupByKey()
	# 打印每个键对应的值列表
	for key, values in grouped_rdd.collect():
    	print(f"Key: {key}, Values: {list(values)}")


"""
   sortBy: 排序
   res:  
       Ture: [1, 2, 3, 5, 8]
       False:[8, 5, 3, 2, 1]
"""
	rdd = sc.parallelize([5, 2, 8, 1, 3])
	sorted_rdd = rdd.sortBy(lambda x: x,True).collect()


    
"""
   sortByKey: 排序
   res:  
        Ture: [(1, 'Banana'), (2, 'Orange'), (3, 'Apple')]
        False:~
"""
	rdd = sc.parallelize([(3, "Apple"), (1, "Banana"), (2, "Orange")])
	sorted_rdd = rdd.sortByKey(ascending=True).collect()

Action算子

# countByKey: 用于 对键值 对RDD中的 键 进行计数
data = [("apple", 1), ("banana", 2), ("orange", 3), ("apple", 4), ("kiwi", 5), ("banana", 6)]
rdd = sc.parallelize(data)
print(rdd.countByKey())
# defaultdict(<class 'int'>, {'apple': 2, 'banana': 2, 'orange': 1, 'kiwi': 1})



# collect: 将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象
# 数据量不能太大,考虑到内存问题
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())
# [1, 2, 3, 4, 5]




# reduce:  数据集 按照你传入的逻辑进行 聚合
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.reduce(lambda x, y: x + y))
# 15



# fold: 与reduce不同的是,fold操作还可以指定一个初始值,用于处理空RDD的情况
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.fold(5, lambda x, y: x + y)) # 设置初始值
# 25



# first: 第一个元素
rdd = sc.parallelize([1, 2, 3, 4, 5]).first
# 1



# takeSample : 随机抽样
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeSample(False, 3) # False:不取相同数据,Ture:可以取相同数据
# [3, 8, 5]




# takeOrdered : 排序取前N个
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
result = rdd.takeOrdered(3)   # rdd.takeOrdered(3, lambda x:-x) 降序排序
# [1, 2, 3]



# foreach: 跟map一样,但是没有返回值(由分区Executor直接执行!)
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(lambda x:x + 1)
# None


# saveAsTextFile: 将RDD保存为文本文件(由分区Executor直接执行!)
rdd = sc.parallelize(["Hello", "World", "PySpark", "Example"])
rdd.saveAsTextFile("output_directory")

分区算子

# mapPartitions: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
    List = []
    for x in iterator: # 将每个分区里面的元素都放到空列表
        List.append(x + 1)
        return List
    result = rdd.mapPartitions(check).collect()
# [2, 3, 4, 5, 6]
    


# foreachPartition: 对每个分区进行操作,提升效率
data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)
def check(iterator):
    List = []
    for x in iterator: # 将每个分区里面的元素都放到空列表
        List.append(x + 1)
        print(List)
result = rdd.foreachPartition(check)
print(result)
# [2, 3, 4, 5, 6]
# None
 
    
# partitionBy : 自定义分区(分成几个列表)
data = [("A", 1), ("B", 2), ("C", 3), ("D", 4), ("E", 5)]
rdd = sc.parallelize(data)
result = rdd.partitionBy(3).glom().collect()
print(result)
# [[('B', 2), ('C', 3), ('D', 4)], [('A', 1)], [('E', 5)]]




# repartition:  决定新的分区数
rdd = sc.parallelize(range(100))
print("初始分区数:", rdd.getNumPartitions())
# 对 RDD 进行重分区,将分区数设置为 4
repartitioned_rdd = rdd.repartition(4)
print("重分区后的分区数:", repartitioned_rdd.getNumPartitions())
# 初始分区数: 1
# 重分区后的分区数: 4

标签:parallelize,collect,data,rdd,RDD,sc,数据
From: https://www.cnblogs.com/wanghong1994/p/17777424.html

相关文章

  • 智能工厂解决方案:设备生产运营数据化管理平台
    随着工业化进程的不断加快,设备管理成为了工业生产中不可或缺的一环。传统的设备管理方式往往存在效率低下、管理难度大等问题,基于物联网技术打造设备生产运营数据化管理平台可以有效解决这些问题,提供更加高效、智能化的设备管理手段。 数之能提供的工业互联网平台可以实现PLC、仪......
  • STM32 EEPROM_Emulation 保存数据使用注意事项
    1目的:stm32官方提供flash模拟eeprom的代码例子,为了能给产品添加数据保存功能,可以改造该例子迅速完成数据保存的功能。示例代码路径:C:\Users\rd-yhzhang\STM32Cube\Repository\STM32Cube_FW_F1_V1.8.5\Projects\STM32F103RB-Nucleo\Applications\EEPROM\EEPROM_Emulation。2......
  • 【Python&GIS】基于Python批量合并矢量数据
    ​老样子最近有项目需要将N个矢量文件合并成一个,总不能用ArcGIS一个个导入吧。所以我就想着用Python编个程序实现批量合并矢量。我之前也发了一些关于Python操作矢量数据的文章:【Python&GIS】Python处理矢量数据的基本操作(查询、修改、删除、新建),如果大家感兴趣可以去我的主......
  • 数据类型分类(看到建议看一下)
    数据类型分类按存值个数存一个值:整型/浮点型/字符串存多个值:列表/字典/元组/集合按有序无序有序:字符串/列表/元组无序:字典/集合按可变or不可变可变:列表/字典/集合不可变:整型/浮点型/字符串/元组......
  • 元组数据类型内置方法
    元组数据类型内置方法元组和列表的内置方法一模一样,但是元祖无法修改元组咋i定义的那一刻他的元素个数以及元素的值就全部固定了毫无用处,早期永远一般用于减小内存占用,以后只要定义列表就行了定义方式列表的中括号改成小括号tup=(1,2,3,4,5)内置方法查看索引位置......
  • 字典数据类型内置方法
    字典数据类型内置方法1.作用对于值添加描述信息使用他2.定义方式用{}以逗号隔开加入键值对:key:valueinfo_dict={'name':'wangdapao','age':18,'height':120,'gender':'female','hobby_list':['dapao','basketball'......
  • 列表数据类型的内置方法
    列表数据类型的内置方法1.作用列表的作用就是可以描述多个值,就比如一个人可以有很多的爱好2.定义方式hobby_list=['play','swimming','dancing']print(hobby_list)lt=list('randysun')print(lt)3.内置方法优先掌握索引取值hobby_list=['play','swimming......
  • mysql常用报表处理及数据迁移写法SQL
    熟悉一些常用的sql写法便于工作中快速导出数据,本文不涉及到业务,所以对表库做了名字的修改,仅提供一些用法的说明。以下直接举例子并讲解1单表批量数据迁移场景:日志迁移具体实例:将test_log2日志表2的数据全部迁移到test_log1日志表1sql:......
  • 【从零学习python 】03. Python编程基础:变量、数据类型与标识符
    变量以及数据类型一、变量的定义对于重复使用,并且经常需要修改的数据,可以定义为变量,来提高编程效率。定义变量的语法为:变量名=变量值。(这里的=作用是赋值。)定义变量后可以使用变量名来访问变量值。如下示例:#不使用变量打印三次"今天天气真好",如果需要变成打印"......
  • 服务器数据恢复-服务器多块硬盘掉线导致银行业务模块崩溃的数据恢复案例
    服务器故障&分析:某银行的业务模块崩溃,无法正常使用。排查服务器故障,发现运行该业务模块的服务器中多块硬盘离线,导致上层应用崩溃。故障服务器内多块硬盘掉线,硬盘掉线数量超过服务器raid阵列冗余级别所允许的硬盘掉线数量,导致服务器瘫痪。可以通过修复硬盘物理故障,提取故障盘数据......