首页 > 其他分享 >rdd常用的Action算子和分区操作算子

rdd常用的Action算子和分区操作算子

时间:2024-01-30 13:55:38浏览次数:23  
标签:RDD 分区 print rdd numbers 算子 Action

from pyspark import SparkConf, SparkContext

# 创建Spark配置和上下文对象
conf = SparkConf().setAppName("SparkActionsAndPartitions")
sc = SparkContext(conf=conf)

# 示例数据
data = [("apple", 1), ("banana", 2), ("apple", 3), ("orange", 4)]
numbers_rdd = sc.parallelize([1, 2, 3, 4, 5])
fruit_rdd = sc.parallelize(data)
range_rdd = sc.parallelize(range(100))

# Action算子示例
# countByKey:计算每个key出现的次数
fruit_counts = fruit_rdd.countByKey()
print("countByKey: ", fruit_counts)  # 预期结果:{'apple': 2, 'banana': 1, 'orange': 1}

# collect:收集RDD所有元素到Driver端
collected_data = numbers_rdd.collect()
print("collect: ", collected_data)  # 预期结果:[1, 2, 3, 4, 5]

# first:获取RDD的第一个元素
first_element = numbers_rdd.first()
print("first: ", first_element)  # 预期结果:1

# take:获取RDD的前N个元素
first_n_elements = numbers_rdd.take(3)
print("take: ", first_n_elements)  # 预期结果:[1, 2, 3]

# topN(模拟):对RDD进行排序并获取前N个元素
sorted_fruits = fruit_rdd.sortBy(lambda x: -x[1]).take(2)
print("topN: ", sorted_fruits)  # 预期结果:[('apple', 3), ('banana', 2)]

# count:计算RDD元素数量
data_count = numbers_rdd.count()
print("count: ", data_count)  # 预期结果:5

# takeSample:随机抽样RDD数据
sampled_data = range_rdd.takeSample(withReplacement=True, num=8)
print("takeSample: ", sampled_data)  # 随机结果,例如:[93, 17, 96, 17, 17, 17, 55, 34]

# takeOrdered:对RDD进行排序并获取前N个元素
sorted_first_n = numbers_rdd.takeOrdered(3)
print("takeOrdered: ", sorted_first_n)  # 预期结果:[1, 2, 3]

# foreach:对RDD的每个元素执行操作,无返回值
def print_element(x):
    print(x)

numbers_rdd.foreach(print_element)  # 输出:1, 2, 3, 4, 5

# saveAsTextFile:将RDD保存为文本文件
words_rdd = sc.parallelize(["Hello World!", "Welcome to Spark!"])
words_rdd.saveAsTextFile("output.txt")  # 在指定目录下生成part-00000等文件

# 分区操作算子示例
# mapPartitions:一次处理一个分区的数据
def sum_partition(iter):
    return [sum(iter)]

partition_sums = numbers_rdd.mapPartitions(sum_partition).collect()
print("mapPartitions: ", partition_sums)  # 预期结果根据分区数决定,例如:[15, 15, 15, 10]

# foreachPartition:对RDD的每个分区执行操作,无返回值
def write_to_file(partition):
    with open('output_partitions.txt', 'a') as f:
        for item in partition:
            f.write(str(item) + '\n')

numbers_rdd.foreachPartition(write_to_file)  # 将数据写入output_partitions.txt文件

# partitionBy:按照自定义规则重新分区
hashed_rdd = numbers_rdd.partitionBy(HashPartitioner(5))

# repartition:改变RDD的分区数
repartitioned_numbers_rdd = numbers_rdd.repartition(10)

# 最后,记得关闭SparkContext
sc.stop()

常用的Action算子

1.countBYKey算子,主要是针对KV型数据,计算key出现的次数。

2.collect算子,将rdd各个分区的数据,统一收集到Driver中,形成list对象。

3.first算子,取出rdd第一个元素。

4.take算子,取出rdd中前N个元素。

5.top算子,对rdd数据进行降序排序,取出前N个数据。

6.count算子,计算rdd有多少条数据,返回一个数字值。

7.takeSample算子,随机抽样rdd数据。takeSample(True,8)True表示取同一数据表示同一位置的数据,False则反之。

8.takeOrdered算子,对rdd进行排序取前N个数据。

9.foreach算子,对rdd的每一个元素,执行你所提供的逻辑,但此方法无返回值。

10.saveAsTextFile算子,将rdd的数据存储到文本文件中。

分区操作算子

mapPartitions算子,一次被传递的是一整个分区的数据。

foreachPartition算子,无返回值,和foreach类似,按分区输出。

partitionBy算子,对rdd进行自定义分区

repartition算子,对rdd分区执行重新分区。数量

标签:RDD,分区,print,rdd,numbers,算子,Action
From: https://www.cnblogs.com/syhxx/p/17996940

相关文章

  • 【揭秘】RecursiveAction全面解析
    内容概要RecursiveAction是Java中一个强大的工具,它允许将复杂任务分解为更小的子任务,这些子任务可以并行执行,从而提高整体性能,其主要优点在于能够有效地利用多核处理器,减少任务执行时间,并简化并行编程的复杂性。核心概念RecursiveAction是Java并发包java.util.concurrent......
  • 常用的rddTransformation算子
    根据文章:暑假生活每周总结10  frompysparkimportSparkContext#创建SparkContext对象sc=SparkContext("local","RDDTransformationsExample")#假设我们有一个RDD数据源data=sc.parallelize([1,2,3,4,5])#1.map算子mapped_data=data.map(lambda......
  • 为什么不能这样使用 Object.assign(state, { visibilityFilter: action.filter })
    为什么不能这样使用Object.assign(state,{visibilityFilter:action.filter})?在Redux的reducer中,直接使用Object.assign(state,{visibilityFilter:action.filter})来修改状态是不推荐的做法。原因如下:纯函数原则:Redux要求reducer必须是一个纯函数,即给定相同的输入(sta......
  • React 使用的Redux, action type 抽离成常量好还是直接写死好
    React使用的Redux,actiontype抽离成常量好还是直接写死好?在React与Redux结合进行状态管理时,将actiontype抽离成常量是一种更推荐的做法,而不是直接写死在代码中。原因如下:减少错误:如果直接在actioncreator或reducer中硬编码字符串类型的actiontype,容易因拼写错误或大......
  • 事务提交之后再执行某些操作 → 引发对 TransactionSynchronizationManager 的探究
    开心一刻昨晚,小妹跟我妈聊天小妹:妈,跟你商量个事,我想换车,资助我点呀妈:哎呀,你那分扣的攒一堆都够考清华的,还换车资助点,有车开就不错了小妹:你要是这么逼我,别说哪天我去学人家傍大款啊妈:哎呀妈,你脸上那褶子比你人生规划都清晰,咋地,大款缺地图呀,找你?小妹:......
  • MetaGPT day04 MetaGPT ActionNode
    ActionNode说明文档导读#什么是ActionNode?1.ActionNode是Action的通用化抽象2.ActionNode是SOP的最小单元#ActionNode是Action的通用化抽象:反推可得知Action不够通用化?也就是说ActionNode的粒度比action更细? Action-粒度更细->ActionNode#Actio......
  • github action 自动化部署asp.net core应用到服务器
    在自己的仓库里工作流编辑workflow贴上自己的工作流name:ASP.NETCoreDeploymenton:push:branches:-master#你可以根据需要更改分支名称(在向master分支推送的时候触发这个workflow)jobs:deploy:runs-on:ubuntu-latest#使用Ubuntu环......
  • Programming Abstractions in C阅读笔记:p254-p257
    《ProgrammingAbstractionsinC》学习第70天,p254-p257总结,总计4页。一、技术总结1.minimaxstrategy(极小化极大算法)p255,Thisidea--findingthepositionthatleavesyouropponentwiththeworstpossiblebestmove--iscalledtheminimaxstrategybecausethegoa......
  • 《SAIS Supervising and Augmenting Intermediate Steps for Document-Level Relation
    代码 原文地址 预备知识:1.什么是标记索引(tokenindices)?标记索引是一种用于表示文本中的单词或符号的数字编码。它们可以帮助计算机理解和处理自然语言。例如,假如有一个字典{"我":1,"是":2,"Bing":3,".":4},那么文本"我是Bing."的标记索引就是[1,2,3,4]。不同的模......
  • GPTs创建及action使用
    一、新建GPTsNewGPT:https://chat.openai.com/gpts/editor二、创建GPT的选项1、Create进入创建GPT页面,在CreateTab下,点击左下角曲别针符号,可以向GPT上传知识库文档。GPT可以根据这些文档进行回答。2、Configure2.1、Action创建Action官方文档Actions与Plugins类似,不......