from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)
# 1. collect 指的是把数据汇集到driver端 ,便于后续操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]
# 2.first 取第一个元素
rdd = sc.parallelize(['a', 'b', 'c'])
print(rdd.first())
# a
# 3. collectAsMap :转换为dict,使用这个要注意不要对大数据使用,不然全部加载到driver端会爆内存
rdd = sc.parallelize([(1, 2), (3, 4)])
print(rdd.collectAsMap())
# {1: 2, 3: 4}
# 4. reduce 逐步对两个元素进行操作
rdd = sc.parallelize(range(10), 5)
print(rdd.collect())
print(rdd.reduce(lambda x, y: x+y))
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 45
# 5.countByKey/countByValue
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 1)])
print(rdd.countByKey().items())
print(rdd.countByValue().items())
# dict_items([('a', 2), ('b', 1)])
# dict_items([(('a', 1), 2), (('b', 2), 1)])
# 6.take 相当于取几个数据到driver端
rdd = sc.parallelize([("a", 1), ("b", 2), ("a", 1)])
print(rdd.take(2))
# [('a', 1), ('b', 2)]
# 7.saveAsTextFile 保存rdd成text文件到本地
text_file = "./rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
# 8.takesample 随机取数
rdd = sc.textFile("./word.txt", 4) # 4代表分区数量
rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 参数1:代表是否是有放回抽样
print(rdd_sample)
# 9.foreach 对每一个元素执行某种操作,不生成新的RDD
rdd = sc.parallelize(range(5))
accum = sc.accumulator(0)
rdd.foreach(lambda x : accum.add(x))
print(accum.value)
# 10
标签:sc,parallelize,pyspark,items,print,rdd,collect,算子,action
From: https://www.cnblogs.com/whiteY/p/17767982.html