首页 > 其他分享 >Spark的算子

Spark的算子

时间:2024-10-31 19:15:35浏览次数:6  
标签:元素 flow RDD 算子 sc Spark data

目录

一、算子

二、转换算子(Transformations)

2.1、map算子

2.2、flatMap算子 

2.3、filter算子

2.4、union算子

2.5、distinct算子

2.6、分组聚合算子

2.6.1groupByKey算子

2.6.2reduceByKey算子

2.7、排序算子

2.7.1sortBy算子

2.7.2sortByKey

2.8、重分区算子

 2.8.1repartition算子

2.8.2coalesce算子 

三、动作算子(Action)

3.1count

3.2、foreach算子

3.3、saveAsTextFile算子

3.4、first 算子

3.5、take 算子

3.6、collect 算子

3.7、reduce算子

3.8、top算子


一、算子

在 Apache Spark 中,算子(Operators)是指用于处理和转换数据的各种操作。Spark 的核心概念之一是 RDD(弹性分布式数据集),而算子则是对 RDD 进行操作的方法。算子可以分为两大类:转换算子(Transformation)和动作算子(Action)。

这篇文章要说的是转换算子

二、转换算子(Transformations)

转换算子是对 RDD 进行转换,生成新的 RDD。转换算子是惰性的(lazy),即它们不会立即执行,而是在遇到动作算子时才会触发执行。

2.1、map算子

功能:对RDD中每个元素调用一次参数中的函数,并将每次调用的返回值直接放入一个新的RDD中
分类:转换算子
场景:一对一的转换,需要返回值
语法格式:
def map(self , f: T -> U ) -> RDD[U]
f:代表参数是一个函数
T:代表RDD中的每个元素
U:代表RDD中每个元素转换的结果

list01 = [1,2,3,4,5,6]
listRdd = sc.parallelize(list01)
mapRdd = listRdd.map(lambda x: math.pow(x,3))
mapRdd.foreach(lambda x: print(x))

2.2、flatMap算子 

功能:将两层嵌套集合中的每个元素取出,扁平化处理,放入一层
集合中返回,类似于SQL中explode函数
分类:转换算子
场景:多层集合元素展开,一个集合对应多个元素【一对多】
语法:
def flatMap(self , f : T -> Iterable[U]) -> RDD[U]

夜曲/发如雪/东风破/七里香
十年/爱情转移/你的背包
日不落/舞娘/倒带
鼓楼/成都/吉姆餐厅/无法长大
月亮之上/荷塘月色

编写代码:
fileRdd = sc.textFile("../datas/a.txt",2)
flatRdd = fileRdd.flatMap(lambda line: line.split("/"))
flatRdd.foreach(lambda x: print(x))

2.3、filter算子

功能:对RDD集合中的每个元素调用一次参数中的表达式对数据进行过滤,符合条件就保留,不符合就过滤
场景:行的过滤,类似于SQL中where或者having
 def filter(self, f: T -> bool ) -> RDD[T]

1 周杰伦 0 夜曲/发如雪/东风破/七里香
2 陈奕迅 0 十年/爱情转移/你的背包
3 1 日不落/舞娘/倒带
4 赵雷 0 鼓楼/成都/吉姆餐厅/无法长大
5 凤凰传奇 -1 月亮之上/荷塘月色

代码演示:
fileRdd = sc.textFile("../datas/b.txt",2)
filterRdd = fileRdd.filter(lambda line: re.split(r"\s",line)[2] != '-1' and len(re.split("\\s",line)) == 4)  #每一行第三列不等于‘-1’的和不足4列的会被剔除
 filterRdd.foreach(lambda x: print(x))

2.4、union算子

union算子
功能:实现两个RDD中数据的合并
分类:转换算子
语法:
def union(self,other:RDD[U]) -> RDD[T/U]

list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)

rdd3.foreach(print)

2.5、distinct算子

功能:实现对RDD元素的去重
分类:转换算子
语法:
def distinct(self) -> RDD[T]

list1 = [1, 2, 3, 4, 5, 6, 7, 8]
list2 = [5, 6, 7, 8, 9, 10]
rdd1 = sc.parallelize(list1,2)
rdd2 = sc.parallelize(list2,2)
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.distinct()
rdd4.foreach(print)

2.6、分组聚合算子

2.6.1groupByKey算子

xxxByKey算子,只有KV类型的RDD才能调用

功能:对KV类型的RDD按照Key进行分组,相同K的Value放入一 个集合列表中,返回一个新的RDD

语法:RDD【K,V】.groupByKey => RDD【K, List[V]】

分类:转换算子

场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑 比较复杂,不适合用reduce

特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd2 = rdd1.groupByKey()  # ("word",List[10,5])
rdd2.foreach(lambda x: print(x[0], *x[1]))

2.6.2reduceByKey算子

功能:对KV类型的RDD按照Key进行分组,并对相同Key的所有
Value使用参数中的reduce函数进行聚合
要求:只有KV类型的RDD才能调用
分类:转换算子
特点:必须经过shuffle,可以指定新的RDD分区个数,可以指定分区规则
语法:
def reduceByKey(self,f: (T,T) ->T,numPartitions,partitionFunction) ->RDD[Tuple[K,V]]

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)

rdd2= rdd1.reduceByKey(lambda total,num: total * num)
 rdd2.foreach(print)

注意:能用reduceByKey就不要用groupByKey+map

reduceByKey代码更简洁,而且性能会更好

2.7、排序算子

2.7.1sortBy算子

功能:对RDD中的所有元素进行整体排序,可以指定排序规则
【按照谁排序,升序或者降序】
分类:转换算子
场景:适用于所有对大数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
特点:经过Shuffle,可以指定排序后新RDD的分区个数,底层只能使用RangePartitioner来实现
def sortBy(self, keyFunc:(T) -> 0, asc: bool,numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortByKey(ascending=False).foreach(print)

2.7.2sortByKey

功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则
要求:只有KV类型的RDD才能调用
分类:转换算子【sortByKey会触发job的运行】
场景:适用于大数据量的KV类型的RDD按照Key排序的场景
特点:经过Shuffle,可以指定排序后新RDD的分区个数
语法:def sortByKey(self, asc, numPartitions) -> RR[Tuple[K,V]]

rdd1 = sc.parallelize([("word", 10), ("word", 5), ("hello", 100), ("hello", 20), ("laoyan", 1)], numSlices=3)
rdd1.sortBy(lambda tuple:tuple[1],ascending=False).foreach(print) #这样会根据value排序

2.8、重分区算子

 2.8.1repartition算子

功能:调整RDD的分区个数
分类:转换算子
场景:一般用于调大分区个数,必须经过shuffle才能实现
语法:
def repartition(self,numPartitions) -> RDD[T]

list01 = [1, 5, 2, 6, 9, 10, 4, 3, 8, 7]
    # 没有指定分区,走默认,默认分区个数,因为是local 模式,所以跟核数有关,所以 分区数为2
rdd = sc.parallelize(list01)
print(rdd.getNumPartitions()) # 2
# repartition 是一个转换算子,必然经历shuffle过程
bigrdd = rdd.repartition(4)
print(bigrdd.getNumPartitions()) # 4

2.8.2coalesce算子 

功能:调整RDD的分区个数
分类:转换算子

特点:可以选择是否经过Shuffle,默认情况下不经过shuffle
def coalesce(self, numPartitions, shuffle:bool) -> RDD[T]

bigbigrdd = bigrdd.coalesce(8,shuffle=True) # 8
 print(bigbigrdd.getNumPartitions())

三、动作算子(Action)

动作算子用于触发 RDD 的计算,并返回结果或将其保存到外部存储系统。动作算子会立即执行所有之前定义的转换算子。

3.1count

count算子
功能:统计RDD集合中元素的个数,返回一个int值
分类:动作算子
场景:统计RDD的数据量,计算行数
语法:
def count(self) -> int

data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)
count = flow.count()
print(f"RDD 中的元素数量: {count}")

3.2、foreach算子

功能:对RDD中每个元素调用一次参数中的函数,没有返回值【与map场景上区别】
分类:触发算子
场景:对RDD中的每个元素进行输出或者保存,一般用于测试打印或者保存数据到第三方系统【数据库等】

data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不等于11的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)

3.3、saveAsTextFile算子

功能:用于将RDD的数据保存到外部文件系统中
分类:触发算子
场景:保存RDD的计算的结果,一般用于将结果保存到HDFS
文件个数 = Task个数 = 分区个数
def saveAsTextFile(self , path ) -> None

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 过滤数据
# 1. 过滤掉手机号码不正确(长度不为11位)的数据
# 2. 过滤掉数据长度不一致的数据
flow_filtered = flow.filter(lambda line: len(re.split(r"\s+", line)) == 11 and len(re.split(r"\s+", line)[1]) == 11)

# 解析数据,提取手机号码、上行流量和下行流量
flow_parsed = flow_filtered.map(lambda line: (re.split(r"\s+", line)[1], (int(re.split(r"\s+", line)[2]), int(re.split(r"\s+", line)[3]))))

# 计算每个手机号码的总流量
total_traffic = flow_parsed.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

# 提取总流量
total_traffic_result = total_traffic.mapValues(lambda v: v[0] + v[1])

# 将结果保存为文本文件
output_path = "D:\\pythonCode\\PySpark\\data\\output\\total_traffic"
total_traffic_result.saveAsTextFile(output_path)

3.4、first 算子

功能:返回RDD集合中的第一个元素【RDD有多个分区,返回的是第一个分区的第一个元素】
分类:触发算子
语法:def first(self) -> T

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的第一个元素
first_element = flow.first()

# 打印结果
print(f"RDD 中的第一个元素: {first_element}")

3.5、take 算子

功能:返回RDD集合中的前N个元素【先从第一个分区取,如果不够再从第二个分区取】
分类:触发算子
注意:take返回的结果放入Driver内存中的,take数据量不能过大

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素
first_five_elements = flow.take(5)

# 打印结果
print(f"RDD 中的前 5 个元素: {first_five_elements}")

3.6、collect 算子

collect算子
功能:将RDD转化成一个列表返回
分类:触发算子
这个RDD的数据一定不能过大,如果RDD数据量很大,导致Driver内存溢出

理解:假如现在有三个分区,三个分区中都有数据,假如你现在想打印数据,此时打印哪个分区呢?先收集,将数据汇总在一起,再打印。

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的所有元素
all_elements = flow.collect()

# 打印结果
print(f"RDD 中的所有元素: {all_elements}")

3.7、reduce算子

功能:将RDD中的每个元素按照给定的聚合函数进行聚合,返回聚合的结果
分类:触发算子
# tmp用于存储每次计算临时结果,item就是RDD中的每个元素
def reduce(self,f : (T,T) -> U) -> U

# 创建一个包含整数的 RDD
numbers = sc.parallelize([1, 2, 3, 4, 5])

# 使用 reduce 算子计算总和
total_sum = numbers.reduce(lambda a, b: a + b)

# 打印结果
print(f"整数的总和: {total_sum}")

3.8、top算子

功能:对RDD中的所有元素降序排序,并返回前N个元素,即返回RDD中最大的前N个元数据
分类:触发算子
场景:取RDD数据中的最大的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
语法:def top(self,num) -> List[0]

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素,按字符串长度降序排序
top_five_elements = flow.top(5, key=lambda x: len(x))

# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度降序): {top_five_elements}")

3.9、takeOrdered算子

功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
分类:触发算子
场景:取RDD数据中的最小的TopN个元素
特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能
适合处理小数据量
语法:def takeOrdered(self,num) -> List[0]

# 读取数据
data_path = "D:\\pythonCode\\PySpark\\data\\wordcount\\HTTP_20130313143750.dat"
flow = sc.textFile(data_path)

# 获取 RDD 中的前 5 个元素,按字符串长度升序排序
ordered_five_elements = flow.takeOrdered(5, key=lambda x: len(x))

# 打印结果
print(f"RDD 中的前 5 个元素(按字符串长度升序): {ordered_five_elements}")

标签:元素,flow,RDD,算子,sc,Spark,data
From: https://blog.csdn.net/xieyichun_/article/details/143413776

相关文章

  • 【时间序列分析】平稳时间序列分析——Wold分解定理和延迟算子
    Wold分解定理(这个定理是平稳时间序列分析的理论基石。)对于任意一个离散平稳时间序列,它都可以分解为两个不相关的平稳序列之和,其中一个为确定性的(deterministic),另一个为随机性的(stochastic) xₜ=Vₜ+ξₜ,{V₁}为确定性平稳序列,ξ₁为随机性平稳序列式中:确定性......
  • 【SQL】Hive/Spark SQL笔记之时间函数、环比/同比/时间比较计算
    获取当天:'${zdt.format("yyyy-MM-dd")}'//获取上月月末select'${zdt.lastMonth().format("yyyy-MM-dd")}'T-1上月末select'${zdt.addDay(-1).lastMonth().format("yyyyMMdd")}'1个小时前select'${zdt.addHour(-1)......
  • 异常处理汇总-Java&Mendix&Spark&SQL&etc...
    1.MyBatis映射问题执行计算过程中出现错误,错误消息:MappedStatementscollectiondoesnotcontainvaluefor... 问题原因:configure.xml文件中未配置对应路径。2. java:Annotationprocessingisnotsupportedformodulecycles.Pleaseensurethatallmodulesfrom......
  • 大数据项目-python基于Spark实现的微博数据可视化分析
    《[含文档+PPT+源码等]精品python基于Spark实现的微博数据可视化分析》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、包运行成功以及课程答疑与微信售后交流群、送查重系统不限次数免费查重等福利!数据库管理工具:phpstudy/Navicat或者phpstudy/sqlyog后台管理系统......
  • 图像处理领域的加速算子收集
    1、Simd库——CPU指令集加速算子 SimdLibraryDocumentation.部分算子截图: 2、VPI库——CPU、GPU(CUDA)加速算子 VPI-VisionProgrammingInterface:Algorithms部分算子截图: 3、CV-CUDA库算子 CV-CUDA—CV-CUDABetadocumentation部分算子截图: ......
  • Python实现图像(边缘)锐化:梯度锐化、Roberts 算子、Laplace算子、Sobel算子的详细方法
    目录Python实现图像(边缘)锐化:梯度锐化、Roberts算子、Laplace算子、Sobel算子的详细方法引言一、图像锐化的基本原理1.1什么是图像锐化?1.2边缘检测的基本概念二、常用的图像锐化算法2.1梯度锐化2.1.1实现步骤2.2Roberts算子2.2.1实现步骤2.3Laplace算子2.3.1实......
  • 球坐标下的 Laplace 算子推导
    球坐标下的Laplace算子推导Ciallo~(∠・ω<)⌒★我是赤川鹤鸣!在学习球谐函数的时候,第一次听说球坐标下的Laplace算子这一概念.在查阅了一些资料后,现在整理出球坐标下拉普拉斯算子的推导公式.1.球坐标我们非常熟悉的坐标系是初中时学习过的具有\(2\)个维度\(x\)和......
  • 计算机毕业设计Python+大模型租房推荐系统 租房大屏可视化 租房爬虫 hadoop spark 58
    温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!温馨提示:文末有CSDN平台官方提供的学长联系方式的名片!用到的技术:  1.python  2.django后端框架  3.django-simpleui,Django后台  4.......
  • 我和chatgpt问答-微分方程与算子、复杂的结构
    问题:微分方程中是否含有充分复杂的结构,才使得方程尤其是微分方程难解是的,微分方程,尤其是非线性微分方程,通常包含非常复杂的结构,这些结构使得它们在解析求解上极其困难。以下是一些导致微分方程难解的复杂结构因素:1.非线性结构非线性项:微分方程中的非线性项(如(y^2)、(e^y......
  • JSON日志处理 | 基于SparkSql实现
    目录0 主要JSON处理函数1JSON处理函数使用2案例分析3小结0 主要JSON处理函数get_json_object:提取单个JSON字段json_tuple:同时提取多个JSON字段from_json:JSON字符串转结构化数据to_json:结构化数据转JSON字符串schema_of_json:推断JSONs......