from pyspark import SparkConf, SparkContext # 创建Spark配置和上下文对象 conf = SparkConf().setAppName("SparkActionsAndPartitions") sc = SparkContext(conf=conf) # 示例数据 data = [("apple", 1), ("banana", 2), ("apple", 3), ("orange", 4)] numbers_rdd = sc.parallelize([1, 2, 3, 4, 5]) fruit_rdd = sc.parallelize(data) range_rdd = sc.parallelize(range(100)) # Action算子示例 # countByKey:计算每个key出现的次数 fruit_counts = fruit_rdd.countByKey() print("countByKey: ", fruit_counts) # 预期结果:{'apple': 2, 'banana': 1, 'orange': 1} # collect:收集RDD所有元素到Driver端 collected_data = numbers_rdd.collect() print("collect: ", collected_data) # 预期结果:[1, 2, 3, 4, 5] # first:获取RDD的第一个元素 first_element = numbers_rdd.first() print("first: ", first_element) # 预期结果:1 # take:获取RDD的前N个元素 first_n_elements = numbers_rdd.take(3) print("take: ", first_n_elements) # 预期结果:[1, 2, 3] # topN(模拟):对RDD进行排序并获取前N个元素 sorted_fruits = fruit_rdd.sortBy(lambda x: -x[1]).take(2) print("topN: ", sorted_fruits) # 预期结果:[('apple', 3), ('banana', 2)] # count:计算RDD元素数量 data_count = numbers_rdd.count() print("count: ", data_count) # 预期结果:5 # takeSample:随机抽样RDD数据 sampled_data = range_rdd.takeSample(withReplacement=True, num=8) print("takeSample: ", sampled_data) # 随机结果,例如:[93, 17, 96, 17, 17, 17, 55, 34] # takeOrdered:对RDD进行排序并获取前N个元素 sorted_first_n = numbers_rdd.takeOrdered(3) print("takeOrdered: ", sorted_first_n) # 预期结果:[1, 2, 3] # foreach:对RDD的每个元素执行操作,无返回值 def print_element(x): print(x) numbers_rdd.foreach(print_element) # 输出:1, 2, 3, 4, 5 # saveAsTextFile:将RDD保存为文本文件 words_rdd = sc.parallelize(["Hello World!", "Welcome to Spark!"]) words_rdd.saveAsTextFile("output.txt") # 在指定目录下生成part-00000等文件 # 分区操作算子示例 # mapPartitions:一次处理一个分区的数据 def sum_partition(iter): return [sum(iter)] partition_sums = numbers_rdd.mapPartitions(sum_partition).collect() print("mapPartitions: ", partition_sums) # 预期结果根据分区数决定,例如:[15, 15, 15, 10] # foreachPartition:对RDD的每个分区执行操作,无返回值 def write_to_file(partition): with open('output_partitions.txt', 'a') as f: for item in partition: f.write(str(item) + '\n') numbers_rdd.foreachPartition(write_to_file) # 将数据写入output_partitions.txt文件 # partitionBy:按照自定义规则重新分区 hashed_rdd = numbers_rdd.partitionBy(HashPartitioner(5)) # repartition:改变RDD的分区数 repartitioned_numbers_rdd = numbers_rdd.repartition(10) # 最后,记得关闭SparkContext sc.stop()
常用的Action算子
1.countBYKey算子,主要是针对KV型数据,计算key出现的次数。
2.collect算子,将rdd各个分区的数据,统一收集到Driver中,形成list对象。
3.first算子,取出rdd第一个元素。
4.take算子,取出rdd中前N个元素。
5.top算子,对rdd数据进行降序排序,取出前N个数据。
6.count算子,计算rdd有多少条数据,返回一个数字值。
7.takeSample算子,随机抽样rdd数据。takeSample(True,8)True表示取同一数据表示同一位置的数据,False则反之。
8.takeOrdered算子,对rdd进行排序取前N个数据。
9.foreach算子,对rdd的每一个元素,执行你所提供的逻辑,但此方法无返回值。
10.saveAsTextFile算子,将rdd的数据存储到文本文件中。
分区操作算子
mapPartitions算子,一次被传递的是一整个分区的数据。
foreachPartition算子,无返回值,和foreach类似,按分区输出。
partitionBy算子,对rdd进行自定义分区
repartition算子,对rdd分区执行重新分区。数量
标签:RDD,分区,print,rdd,numbers,算子,Action From: https://www.cnblogs.com/syhxx/p/17996940