首页 > 其他分享 >RDD基本操作(残)

RDD基本操作(残)

时间:2024-03-25 23:15:51浏览次数:33  
标签:kvRDD1 parallelize RDD collect intRDD sc 基本操作 lambda

intRDD = sc.parallelize([3, 1, 2,  5, 5])
intRDD.collect()
[3, 1, 2, 5, 5]
stringRDD = sc.parallelize(["Apple", "Orange", "Banana",  "Grape", "Apple"])
stringRDD.collect()
['Apple', 'Orange', 'Banana', 'Grape', 'Apple']
def addOne(x):
    return (x + 1)

intRDD.map(addOne).collect()
[4, 2, 3, 6, 6]
intRDD.map(lambda x : x + 1).collect()
[4, 2, 3, 6, 6]
stringRDD.map(lambda x: "fruit" + x).collect()
['fruitApple', 'fruitOrange', 'fruitBanana', 'fruitGrape', 'fruitApple']
intRDD.filter(lambda x : x < 3).collect()
[1, 2]
intRDD.filter(lambda x : x == 3).collect()
[3]
intRDD.filter(lambda x : 1 < x and x < 5).collect()
[3, 2]
intRDD.filter(lambda x : x >= 5 or x < 3).collect()
[1, 2, 5, 5]
stringRDD.filter(lambda x : "ra" in x).collect()
['Orange', 'Grape']
intRDD.distinct().collect()
['Orange', 'Grape', 'Apple', 'Banana']

stringRDD.distinct().collect()
['Orange', 'Grape', 'Apple', 'Banana']
sRDD = intRDD.randomSplit([0.4, 0.6])
sRDD[0].collect()
[3, 5]
sRDD[1].collect()
[1, 2, 5]
gRDD = intRDD.groupBy(lambda x : "even" if(x % 2 == 0) else "odd").collect()
print(gRDD[0][0], sorted(gRDD[0][1]))
('even', [2])
print(gRDD[1][0], sorted(gRDD[1][1]))
('odd', [1, 3, 5, 5])
intRDD1 = sc.parallelize([3, 1, 2, 5, 5])
intRDD2 = sc.parallelize([5, 6])
intRDD3 = sc.parallelize([2, 7])
intRDD1.union(intRDD2).union(intRDD3).collect()
[3, 1, 2, 5, 5, 5, 6, 2, 7]
intRDD1.intersection(intRDD2).collect()
[5]
intRDD1.subtract(intRDD2).collect()
[2, 1, 3]
print(intRDD1.cartesian(intRDD2).collect())
[(3, 5), (3, 6), (1, 5), (1, 6), (2, 5), (2, 6), (5, 5), (5, 6), (5, 5), (5, 6)]

基本动作运算

1.读取元素

intRDD.first()
3
intRDD.take(2)
[3, 1]
intRDD.takeOrdered(3)
[1, 2, 3]
intRDD.takeOrdered(3, key = lambda x : -x)
[5, 5, 3]

2.统计功能

intRDD.stats()
(count: 5, mean: 3.2, stdev: 1.6, max: 5.0, min: 1.0)
intRDD.min()
1
intRDD.max()
5
intRDD.stdev()
1.6000000000000001
intRDD.count()
5
intRDD.sum()
16
intRDD.mean()
3.2

RDD Key-Value 基本“转换”运算

1.创建Key-Value RDD

kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
kvRDD1.keys().collect()
[3, 3, 5, 1]
kvRDD1.values().collect()
[4, 6, 6, 2]

2.使用filter筛选key运算

kvRDD1.filter(lambda keyValue : keyValue[0] < 5).collect()
[(3, 4), (3, 6), (1, 2)]

3.使用filter筛选Value运算

kvRDD1.filter(lambda keyValue : keyValue[1] < 5).collect()
[(3, 4), (1, 2)]

4.mapValues运算

kvRDD1.mapValues(lambda x : x * x).collect()
[(3, 16), (3, 36), (5, 36), (1, 4)]

5.sortByKey从小到大按照key排序

kvRDD1.sortByKey(ascending=True).collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]
kvRDD1.sortByKey().collect()
[(1, 2), (3, 4), (3, 6), (5, 6)]

6.sortByKey从大到小按照key排序

kvRDD1.sortByKey(ascending=False).collect()
[(5, 6), (3, 4), (3, 6), (1, 2)]

7.reduceByKey

kvRDD1.reduceByKey(lambda x, y : x + y).collect()
[(1, 2), (3, 10), (5, 6)]

多个RDD Key-Value"转换"运算

1.创建多个Key-Value RDD

kvRDD1 = sc.parallelize([(3, 4), (3, 6), (5, 6), (1, 2)])
kvRDD2 = sc.parallelize([(3, 8)])
kvRDD1.collect()
[(3, 4), (3, 6), (5, 6), (1, 2)]
kvRDD2.collect()
[(3, 8)]

2.Key-Value RDD join 运算

join运算可以把两个RDD按照相同的key值join起来

kvRDD1.join(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]

3.Key-Value leftOuterJoin 运算

和普通的join区别是leftjoin找不到会显示none

kvRDD1.leftOuterJoin(kvRDD2).collect()
[(1, (2, None)), (3, (4, 8)), (3, (6, 8)), (5, (6, None))]

4.Key-Value RDD rightOuterJoin运算

kvRDD1.rightOuterJoin(kvRDD2).collect()
[(3, (4, 8)), (3, (6, 8))]

5.Key-Value subtractBuKey运算

subtractByKey会删除相同的key值数据

kvRDD1.subtractByKey(kvRDD2).collect()
[(1, 2), (5, 6)]

Key-Value “动作”运算

1.Key_Value first运算

kvRDD1.first()
(3, 4)
kvRDD1.take(2)
[(3, 4), (3, 6)]

2.读取第一项数据的元素

kvFirst=kvRDD1.first()
kvFirst
(3, 4)
kvFirst[0]
3
kvFirst[1]
4

3.计算RDD中每一个Key值的项数

kvRDD1.countByKey()
defaultdict(int, {1: 1, 3: 2, 5: 1})

4.collectAsMap 创建Key-value的字典

KV = kvRDD1.collectAsMap()
KV
{1: 2, 3: 6, 5: 6}
type(KV)
dict

5. 使用对照表转换数据

KV[3]
6
KV[1]
2

6. Key-Value lookup运算

kvRDD1.lookup(3)
[4, 6]
kvRDD1.lookup(5)
[6]

Broadcast 广播变量

Broadcast 属于共享变量,共享变量可以节省内存和运行时间,提升并行处理的执行效率。共享变量包括Broadcast和 accumulator。
Broadcast广播变量使用规则如下:
(1)可以使用SparkContext.broadcast([value])创建。
(2)使用.value的方法来读取广播变量的值。
(3)Broadcast广播变量被创建后不能修改。

kvFruit = sc.parallelize([(1, "apple"), (2, "orange"), (3, "banana"), (4, "grape")])
fruitMap=kvFrult.collectAsMap()
print("对照表:" + str(fruitMap))
对照表:{1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
fruitMap = kvFrult.collectAsMap()
bcFruitMap = sc.broadcast(fruitMap)
print("字典 : " + str(fruitMap))
字典 : {1: 'apple', 2: 'orange', 3: 'banana', 4: 'grape'}
fruitIds = sc.parallelize([2, 4, 1, 3])
print("水果编号: " + str(fruitIds.collect()))
水果编号: [2, 4, 1, 3]
print("使用Broadcast 广播变量进行转换==>")
fruitNames = fruitIds.map(lambda x : bcFruitMap.value[x]).collect()
print("水果名称: " + str(fruitNames))
使用Broadcast 广播变量进行转换==>
水果名称: ['orange', 'grape', 'apple', 'banana']

accumulator 累加器

使用规则
accumulator累加器可以使用SparkContext.accumlator([value])创建
使用.add()进行累加
在task中, 例如for each循环中,不能读取累加器的值
只有循环外才能使用.value来读取累加器的值

intRDD = sc.parallelize([3, 1, 2, 5, 5])
total = sc.accumulator(0.0)
num = sc.accumulator(0)
intRDD.foreach(lambda i : [total.add(i), num.add(1)])
avg = total.value / num.value
print("total = " + str(total.value) + ", num = " + str(num.value) + ", avg = " + str(avg))
total = 16.0, num = 5, avg = 3.2

RDD Persistence持久化

intRddMemory = sc.parallelize([3, 1, 2, 5, 5])
intRddMemory.persist()
ParallelCollectionRDD[143] at parallelize at PythonRDD.scala:475
intRddMemory.is_cached
True
intRddMemory.unpersist()
ParallelCollectionRDD[143] at parallelize at PythonRDD.scala:475
intRddMemory.is_cached
False
intRddMemoryAndDisk = sc.parallelize([3, 1, 2, 5, 5])
intRddMemoryAndDisk.persist(StorageLevel.MEMORY_AND_DISK)
ParallelCollectionRDD[144] at parallelize at PythonRDD.scala:475
intRddMemoryAndDisk.is_cached
True

标签:kvRDD1,parallelize,RDD,collect,intRDD,sc,基本操作,lambda
From: https://www.cnblogs.com/leexiao/p/18095611

相关文章

  • 第九章 Ubuntu 操作系统设置与基本操作
    实验案例:Ubuntu操作系统的基本操作1、实验环境    公司的管理员为小王购买了一台预装了Ubuntu操作系统的笔记本式计算机,要求小王尽快熟悉Ubuntu操作系统的用户界面.并掌握图形界面和命令行界面中的一些基本操作。2、需求描述为第一块网卡设置静态IP地址.并能够与同网......
  • 3.MySQL数据库的基本操作-DQL 基本操作
    MySQL数据库的基本操作-DQL基本操作查询select语法格式select[all|distinct]<目标列的表达式1>[别名],<目标列的表达式2>[别名]...from<表名或视图名>[别名],<表名或视图名>[别名]...[where<条件表达式>][groupby<列名>[having<条件表达式>]][o......
  • 【C++ 08】vector 顺序表的常见基本操作
    文章目录前言......
  • SparkSQL与RDD的选择?
        对当下的企业级数据应用来说,SparkSQL的应用空间肯定要比单纯的写RDD处理大很多,因为SparkSQL比RDD好写的多,也更贴近业务需求和更友好的能处理数据,而且技术门槛也更低。        但RDD是Spark中所有的数据抽象的基础,最大的特点是对开发者而言暴露的是不带sch......
  • 线性表的12 种基本操作
    #include<stdio.h>#include<stdlib.h>#defineTRUE 1#defineFALSE0#defineOK  1#defineERROR0#defineLIST_INIT_SIZE100#defineLISTINCREMENT 10#defineINFEASIBLE-1#defineOVERFLOW -2typedefintElemType;typedefstruct{   E......
  • ElasticSearch - 基本操作
    前言本文记录ES的一些基本操作,就是对官方文档的一些整理,按自己的习惯重新排版,凑合着看。官方的更详细,建议看官方的。下文以books为索引名举例。新增添加单个文档(没有索引会自动创建)POSTbooks/_doc{"name":"SnowCrash","author":"NealStephenson","release_dat......
  • docker——容器的基本操作
    docker容器的基本操作run格式dockerrun[选项]镜像[命令][参数...]选项选项解释-d后台运行-i交互模式-t分配一个伪终端-p设置端口--rm运行完命令后,删除容器--name指定名称--dns指定dns(默认dns与主机一致)实例1.启动容器后自动终......
  • mysql存储过程查询结果循环遍历 判断 赋值 游标等基本操作
    时间:2018-03-2617:58:45一、首先说下本篇博客所实现功能的背景和功能是怎样的:   背景:因为公司项目开始迁移新平台项目,所以以前的平台老数据以及订单信息需要拆分表,而且需要业务逻辑来分析以前的订单表,来拆分成另外的几个新表,包括增加新的流水分析,以及更新其他用户或者......
  • 【STL】list的基本操作
    #include<bits/stdc++.h>usingnamespacestd;constintMAXN=2e5+10;boolcmp(intx,inty){ returnx>y;}signedmain(){ list<int>lst;//建立 for(inti=1;i<=6;i++){ lst.push_back(i); }//尾插 for(inti=10;i>=8;i--){ lst.push_front(i......
  • NC(netcat)基本操作
    NC(netcat)基本操作前提:凭空是无法打开端口,那么我们就可以使用nc这个工具开启我们想要开放的端口。想开什么端口就可以开什么端口让别人进入。nc的启用Windows端nc使用方法Kali端nc使用方法基本使用方法一、监听/聊天工具方法:kali上使用nc去连接Windows7上的8000端......