要实现这一目标,必须将rdd转换为一对rdd,以使其只包含键值对/元组。
category_price_rdd = rdd.map(lambda x: (x[1],x[2]))
category_price_rdd.collect()
-----------------------------------------------------------------
[(‘Fruit’, 200), (‘Fruit’, 24), (‘Fruit’, 56), (‘Vegetable’, 103), (‘Vegetable’, 34)]
此处应用map函数获取所需格式的rdd。使用文本格式运行时,形成的RDD有很多字符串。然后使用map函数将其转换为所需格式。
所以现在的category_price_rdd中包含产品类别和售价。
如果想将关键类别进行约归并统计总价,那么可以这样做:
category_total_price_rdd = category_price_rdd.reduceByKey(lambda x,y:x+y)
category_total_price_rdd.collect()
---------------------------------------------------------[(‘Vegetable’, 137), (‘Fruit’, 280)]
6. Group By Key函数:
与reduceByKey相似,Group By Key只是把所有元素放入迭代器中,并不会reduce。举个例子,如果想保留关键类别和所有产品你的价值,可以使用此函数。
再次使用map函数,获取所需形式的数据。
data = [('Apple','Fruit',200),('Banana','Fruit',24),('Tomato','Fruit',56),('Potato','Vegetable',103),('Carrot','Vegetable',34)]
rdd = sc.parallelize(data,4)
category_product_rdd = rdd.map(lambda x: (x[1],x[0]))
category_product_rdd.collect()
------------------------------------------------------------
[('Fruit', 'Apple'), ('Fruit', 'Banana'), ('Fruit', 'Tomato'), ('Vegetable', 'Potato'), ('Vegetable', 'Carrot')]
然后像下面这样使用groupByKey:
grouped_products_by_category_rdd = category_product_rdd.groupByKey()
findata = grouped_products_by_category_rdd.collect()
for data in findata:
print(data[0],list(data[1]))
------------------------------------------------------------
Vegetable ['Potato', 'Carrot']
Fruit ['Apple', 'Banana', 'Tomato']
此处groupByKey函数运行,其返回该类别中的类别和产品列表。
操作基础
至此已经筛选了数据,并在其上映射了一些函数。接下来要完成计算。
现在希望获取本地计算机上的数据或将其保存到文件中,或者以excel或任何可视化工具中的某些图形的形式显示结果。
为此需要进行一些操作。
完整操作列表:http://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
笔者倾向使用的一些常见操作如下:
1. collect
上文已多次使用过此操作。该操作将整个RDD返回到应用程序中。
2. reduce
使用函数func(该函数接受两个参数并返回一个)来聚合数据集的元素。该函数可交换和组合,以便并行进行正确计算。
rdd = sc.parallelize([1,2,3,4,5])
rdd.reduce(lambda x,y : x+y)
---------------------------------
15
3. take
有时需要查看RDD包含内容,但无需获取内存中的所有元素。take操作返回包含RDD前n个元素的列表。
rdd = sc.parallelize([1,2,3,4,5])
rdd.take(3)
---------------------------------
[1, 2, 3]
4. takeOrdered
takeOrdered操作使用自然顺序或自定义比较器返回RDD的前n个元素。
rdd = sc.parallelize([5,3,12,23])
# descending order
rdd.takeOrdered(3,lambda s:-1*s)
----
[23, 12, 5]
rdd = sc.parallelize([(5,23),(3,34),(12,344),(23,29)])
# descending order
rdd.takeOrdered(3,lambda s:-1*s[1])
---
[(12, 344), (3, 34), (23, 29)]
至此所有的基础都已涉及,接下来回到wordcount示例。
标签:总结,category,price,rdd,Fruit,Vegetable,今日,lambda From: https://www.cnblogs.com/zhaoyueheng/p/17975812