首页 > 其他分享 >Spark常用RDD算子:transformation转换算子以及action触发算子

Spark常用RDD算子:transformation转换算子以及action触发算子

时间:2024-10-13 20:17:35浏览次数:9  
标签:rdd2 rdd print RDD collect 算子 action lambda

文章目录

1. 算子(方法)介绍

rdd中封装了各种算子方便进行计算,主要分为两类:

  • transformation 转换算子
    • 对RDD数据进行转化得到新的RDD,定义了一个线程任务。
    • 常见:map、filter、flatMap、reduceByKey、groupByKey、sortByKey
  • action 触发算子
    • 触发计算任务,让计算任务进行执行,得到结果。
    • 触发线程执行的。
    • 常见:foreach、first、count、reduce、saveAsTextFile、collect、take

RDD的转换算子大部分都是从RDD中读取元素数据(RDD中每条数据),具体计算需要开发人员编写函数传递到RDD算子中。
RDD的执行算子则大部分是用来获取数据,collect方法就是触发算子。

注意

  • 转换算子是lazy模式,一般不会触发job和task的运行,返回值一定是RDD。
  • 执行算子,会触发job和task的运行,返回值一定不是RDD。

2. 常用transformation算子

2.1 map

  • RDD.map(lambda 参数:参数计算)
  • 参数接受每个元素数据
# Map算子使用
# map算子主要使用长场景,一个转化rdd中每个元素的数据类型,拼接rdd中的元素数据,对rdd中的元素进行需求处理
# 需求,处理hdfs中的学生数据,单独获取每个学生的信息
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
# 3-从rdd2中获取姓名数据
rdd3 = rdd2.map(lambda x:x[1])

# lambda 函数能进行简单的数据计算,如果遇到复杂数据计算时,就需要使用自定义函数
# 获取年龄数据,并且转化年龄数据为int类型,将年龄和性别合并一起保存成元组  (男,20) (女,21)
def func(x):
    # 1-先切割数据
    data_split = x.split(',')
    # 2-转化数据类型
    age = int(data_split[2])
    # 3-拼接性别和年龄
    data_tuple = (data_split[3],age)
    return data_tuple
# 将函数的名字传递到map中,不要加括号
rdd4 = rdd.map(func)


# 触发执行算子,查看读取的数据
res = rdd.collect()
print(res)

res2 = rdd2.collect()
print(res2)

res3 = rdd3.collect()
print(res3)


res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.2 flatMap

  • 处理的是二维嵌套列表数据[[1,‘张三’],[2,‘李四’],[3,‘王五’]] --> [1, ‘张三’, 2, ‘李四’, 3, ‘王五’]
  • rdd.flatMap(lambda 参数:[参数计算])
#flatmap算子使用
# 主要使用场景是对二维嵌套的数据降维操作 [[1,'张三'],[2,'李四'],[3,'王五']] --> [1, '张三', 2, '李四', 3, '王五']
from pyspark import SparkContext
sc = SparkContext()

#生成rdd
rdd = sc.parallelize([[1,'张三'],[2,'李四'],[3,'王五']])
#使用flatmap算子进行转化
rdd2 = rdd.flatMap(lambda x: x)

#查看数据
res = rdd2.collect()
print(res)

运行结果:
在这里插入图片描述

2.3 filter

  • rdd.filter(lambda 参数:参数条件过滤)
  • 条件过滤的书写和Python中if判断的一样
# RDD数据过滤
# 需求:过滤年龄大于20岁的信息
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))
#使用fliter方法进行数据过滤
rdd3 = rdd2.filter(lambda x: int(x[2]) > 20)
rdd4 = rdd2.filter(lambda x:x[3]=='男')


# 查看数据
res = rdd2.collect()
print(res)

res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.4 distinct

  • 不需要lambda rdd.distinct
# distinct  去重算子
# rdd中有重复数据时,可以进行去重
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3-从rdd2中获取性别数据
rdd3 = rdd2.map(lambda x: x[3])

#对rdd3中的数据去重
rdd4 = rdd3.distinct()

#查看数据
res = rdd3.collect()
print(res)

res1 = rdd4.collect()
print(res1)

运行结果:
在这里插入图片描述

2.6 groupBy

  • rdd.groupBy(lambda 参数:根据参数编写分组条件)
  • mapValues(list)
# groupBy分组
# 按照不同性别进行分组
# 原理: 就是对需要分组的数据进行hash取余数 ,余数相同会放入同一组
from pyspark import SparkContext

sc = SparkContext()

# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2- 使用转化算子进行数据处理
# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))
# 3- 对性别进行分组
rdd3 = rdd2.groupBy(lambda x: hash(x[3]) % 2)
#查看分组的数据内容 mapValues 取出分组后的数据值,对数据值转为列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))

# 查看数据内容
res = rdd2.collect()
print(res)


res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

运行结果:
在这里插入图片描述

2.7 sortBy()

  • rdd.sortBy(lambda x:x,ascending=False)
#RDD的数据排序
from pyspark import SparkContext
sc = SparkContext()

# 生成rdd数据
# 非k,v数据
rdd = sc.parallelize([4,7,3,2,8])
#在spark中使用元组表示k,v数据
rdd2 = sc.parallelize([('张三',90),('李四',70),('王五',99)])

# 数据排序
rdd3 = rdd.sortBy(lambda x: x)
rdd4 = rdd.sortBy(lambda x: x,ascending=False)

#k,V数据排序
rdd5 = rdd2.sortBy(lambda x: x[1],ascending=False)
rdd6 = rdd2.sortBy(lambda x: x[1])

#查看结果
res = rdd3.collect()
print(res)

res2 = rdd4.collect()
print(res2)

res3 = rdd5.collect()
print(res3)

res4 = rdd6.collect()
print(res4)

运行结果:
在这里插入图片描述

2.8 k-v数据[(k,v),(k1,v1)]

  • groupByKey()
    • rdd.groupByKey()
  • reduceByKey()
    • rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
  • sortByKey()
    • rdd.sortByKey()
#k,v结构数据处理
from pyspark import SparkContext
sc = SparkContext()
#k,v分组
# 1. 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# 2. 使用转化算子进行数据处理
rdd2 = rdd.map(lambda x: x.split(','))
#将数据转为k,v结构,然后进行分组,把分组的字段作为key值
rdd3 = rdd2.map(lambda x: (x[3], x))
# 使用groupBykey方法,按key进行分组
rdd4 = rdd3.groupByKey().mapValues(lambda x: list(x))

#k,v数据计算
#统计不同性别的年龄总和 (求和  平均数  最大值  最小值  数量)
#将需要计算的数据转为k,v结构  分组的字段是key值  聚合数据是value值
rdd5 = rdd2.map(lambda x: (x[3],int(x[2])))
# 使用reduceBykey方法进行聚合计算  会将相同key值的数据先合并,然后在聚合计算
# 聚合计算的算子,lambda x,y 需要结合两个参数
rdd6 = rdd5.reduceByKey(lambda x, y: x+y)

rdd7 = rdd5.groupByKey().mapValues(lambda x: sum(list(x))/len(list(x)))
rdd8 = rdd5.groupByKey().mapValues(lambda x: max(list(x)))


res = rdd2.collect()
print(res)

res3 = rdd3.collect()
print(res3)

res4 = rdd4.collect()
print(res4)

res5 = rdd5.collect()
print(res5)

res6 = rdd6.collect()
print(res6)

res7 = rdd7.collect()
print(res7)

res8 = rdd8.collect()
print(res8)

运行结果:

3. 常用action算子

  • collecct()取出RDD中所有值
    • rdd.collect()
  • reduce() 非k-v类型数据累加[1,2,3,4,6]
    • rdd.reduce(lambda 参数1,参数2:两个参数计算)
  • count() 统计RDD元素个数
    • rdd.count()
  • take() 取出指定数量值
    • rdd.take(数量)
# action算子使用
# 触发转化算子执行
from pyspark import SparkContext

sc = SparkContext()

# 生成rdd
rdd = sc.parallelize([1,2,3,4])

rdd_kv = sc.parallelize([('a',2),('b',3)])

# 进行转化处理

# 使用action
# 获取rdd中所有元素数据,转为列表展示
res = rdd.collect()
print(res)
# 指定取出的数据数量
res2 = rdd.take(3)
print(res2)
# 对非kv数据计算
# 求和
res3 = rdd.reduce(lambda x,y:x+y)
print(res3)
# 求数量
res4 = rdd.count()
print(res4)
# 求最大值
res5 = rdd.max()
print(res5)

res6 = rdd.mean()
print(res6)

# 将kv数据转为字典输出
res7 = rdd_kv.collectAsMap()
print(res7)

# 将rdd结果保存到hdfs 指定目录路径,指定的目录不能存在
rdd_kv.saveAsTextFile('hdfs://node1:8020/data/result')

运行结果:
在这里插入图片描述在这里插入图片描述

标签:rdd2,rdd,print,RDD,collect,算子,action,lambda
From: https://blog.csdn.net/m0_70882914/article/details/142788702

相关文章

  • ERROR [org.hibernate.transaction.JDBCTransaction] - Could not toggle autocommit
    错误描述: DEBUG[org.hibernate.SQL]-SELECTorp.ATTR6FROMDISTRIBUT_VIEWd WHEREd.state='1'ANDd.oper_logLIKE'%下单%' GROUPBYorp.ATTR6 ERROR[org.hibernate.transaction.JDBCTransaction]-Couldnottoggleautocommitjava.sql.SQLE......
  • github action的使用
    近年来,我一直在使用jenkins来部署自己的项目,发现太耗内存了,因此将自动化部迁的操作改为使用githubaction。初始化action配置选择一个合适的action类型,比如webpack、gitPage、Nodejs等等。比如我这里选择了webpack,选择完成后可以看到在仓库里多了一个文件.github/workflo......
  • webservice接口调用报:由于 ContractFilter 在 EndpointDispatcher 不匹配,因此 Action
    1、问题:<s:Envelopexmlns:s="http://schemas.xmlsoap.org/soap/envelope/"><s:Body><s:Fault><faultcodexmlns:a="http://schemas.microsoft.com/ws/2005/05/addressing/none">a:ActionNotSupported</faultcode><faul......
  • selenium:ActionChains类模拟鼠标和键盘操作(6)
    selenium包中提供了ActionChains类,主要用于鼠标和键盘的一些操作,比如鼠标移动,鼠标按键,或者是悬停和拖放等;模拟键盘按键输入,比如按住control+C键等。使用时先导入该类:fromselenium.webdriverimportActionChainsActionChains类的方法介绍 ActionChains类常用方法函......
  • 工作 6 年,@Transactional 注解用的一塌糊涂
    接手新项目一言难尽,别的不说单单就一个 @Transactional 注解用的一塌糊涂,五花八门的用法,很大部分还失效无法回滚。有意识的在涉及事务相关方法上加 @Transactional 注解,是个好习惯。不过,很多同学只是下意识地添加这个注解,一旦功能正常运行,很少有人会深入验证异常情况下事务......
  • 【D3.js in Action 3 精译_032】第四章 D3 直线、曲线与弧线的绘制 + 4.1 坐标轴的创
    当前内容所在位置(可进入专栏查看其他译好的章节内容)第一部分D3.js基础知识第一章D3.js简介(已完结)1.1何为D3.js?1.2D3生态系统——入门须知1.3数据可视化最佳实践(上)1.3数据可视化最佳实践(下)1.4本章小结第二章DOM的操作方法(已完结)2.1第一......
  • C#联合Visionpro编程学习记录(将指定颜色的十字线图形添加到CogRecordDisplay上)
    1///<summary>2///将指定颜色的十字线图形添加到CogRecordDisplay上3///</summary>4///<paramname="icogimage"></param>5///<returns></returns>6publicstaticstringAddCrossCurveRecord2CogRecordDisplay(I......
  • sqlite_action
     CREATETRIGGERhttps://www.sqlite.org/lang_createtrigger.html 触发器DROPTABLEIFEXISTSuser_token;CREATETABLEIFNOTEXISTSuser_token(idINTEGERNOTNULLPRIMARYKEYAUTOINCREMENT,tokenTEXTNULL,"uid"INTEGERNULL,"cid"INTEGER......
  • Camstar Create Transaction Database
    sqlserverUSE[master]GO--CreatedatabaseCREATEDATABASEINSITEONPRIMARY(NAME='INSITE',FILENAME='C:\ProgramFiles\MicrosoftSQLServer\MSSQL13.MSSQLSERVER\MSSQL\DATA\INSITE.mdf',SIZE=100MB,FileGrowth=10%)LOGON(......
  • WPF ListBox IsSynchronizedWithCurrentItem True ScrollIntoView via behavior CallM
    <ListBoxGrid.Column="0"ItemContainerStyle="{StaticResourcelbxItemContainerStyle}"ItemsSource="{BindingBooksCollection,Mode=TwoWay,UpdateSourceTrigger=PropertyChanged}"IsSynchronizedWith......