spark
Spark
Spark和Hadoop的区别
spark | hadoop |
---|---|
基于内存的计算工具 | 基础平台(HDFS/MapReduce/YARN) |
RDD中间计算结果存储在内存中 | MapReduce中间计算结果存储在HDFS磁盘中 |
线程方式计算 | 进程方式计算 |
迭代计算、交互式计算、流计算 | 大规模数据的离线计算 |
Spark计算流程
- 将RDD计算交给yarn的RM管理
- RM随机找一个NM创建container执行计算任务(application master)
- application master和RM保持通讯请求计算资源, 找到其他的NM创建container
- 其他的NM中的container执行map和reduce阶段任务
- map阶段从hdfs上读取数据进行分布式计算
- reduce阶段将map阶段的计算结果合并后保存到hdfs上
Spark组成架构(spark的五大组件)
- spark core:编写RDD代码, 处理RDD基本数据类型的数据, 有许多处理RDD的算子(方法), 其他组件的底层数据类型
- spark sql:编写DataFrame/DataSet代码, 处理DataFrame/DataSet数据类型的数据, 直接提供SQL编写方式处理spark任务
- structured/spark streaming:流计算(实时计算)组件, 数据以数据流形式处理
- spark ml/mllib:机器学习组件, 使用算法进行数据处理, 推荐系统等
- spark graphx:图计算组件, 控制图、并行图操作和计算的一组算法和工具的集合
Spark内核调度流程
- 创建sc对象时, 创建driver进程, 调用三个scheduler类创建三个scheduler Backend, DAG scheduler和Task scheduler对象
- scheduler Backend和RM保持通讯, 申请计算资源, 获取executor进程的信息
- 执行action算子时,产生job, 将job交给DAG scheduler, 由DAG scheduler根据宽依赖划分stage阶段, 分析stage中的task, 生成task描述保存到taskset中, 将taskset交给Task scheduler
- Task scheduler给taskset中的每个task分配计算资源, 维护task和executor之间关系, 同时需要管理task计算任务队列顺序
- scheduler Backend将分配好资源的task线程任务分发给executor进行执行
Spark并行度
作用:提高spark程序执行效率, 合理分配资源
- 资源并行度
资源指的是spark集群中的节点数和cpu资源
主要调整集群的executors节点数和cpu的cores核数
资源并行度又称为物理并行度
- executor
- 节点数,集群服务器数量
- 理论上服务器数量越多,执行效率越高
- –num-executors参数调整节点数
- CPU
核心数, 一个核执行一个task线程任务
- –executor-cores参数调整核心数
- 核心数决定多任务执行方式
多核, 并行执行(同时执行多个task线程任务)
单核, 并发执行(交替执行多个task线程任务)内存, 内存大小决定执行效率
- –executor-memory参数调整内存大小
- 数据并行度
- 数据指的是RDD中存储的要处理的数据
- 可以调整RDD的分区数
- 分区数=task线程数
- 一个线程任务由一个核执行
- 数据并行度又称为逻辑并行度
- 官方声称理论上分区数等于spark集群的总核数, 实现一个核对应一个task线程任务
- 实际工作中,每个分区的数据量大小是不一样的, 如果核数等于分区数, 数据量小的核会计算完成处理等待状态(造成资源浪费), 分区数是集群总核数的2~3倍(合理使用集群资源)
RDD
RDD的五大特性
分区、只读、依赖、缓存、checkpoint
RDD的创建
- python数据转换为rdd
一般使用列表类型转换为RDD rdd = sc.parallelize([1,2,3])
from pyspark import SparkContext
sc = SparkContext()
# 使用sc对象的下的parallelize方法将python数据转化为rdd
# int_data = 123 不能转化
str_data = 'abc'
list_data = [1,2,3]
dict_data = {'a':1,'b':2}
tuple_data = (1,2,3)
set_data = {1,2,3}
int_data = 123
a1 = sc.parallelize('abc')
b1 = sc.parallelize([1,2,3])
c1 = sc.parallelize({'a':1,'b':2})
d1 = sc.parallelize((1,2,3))
e1 = sc.parallelize({1,2,3})
# f1 = sc.parallelize(123) # 报错 TypeError: 'int' object is not iterable
# print(type(a1))
a2 = a1.collect()
b2 = b1.collect()
c2 = b1.collect()
d2 = c1.collect()
e2 = e1.collect()
# f2 = f1.collect()
# print(type(a2))
print(a2)
print(b2)
print(c2)
print(d2)
print(e2)
# print(f2)
.collect() 用于将spark RDD转换为python列表
- 理解为将RDD数据收集起来 然后转换为python的列表形式 这样我们就可以打印查看内容,不然直接的RDD类型我们无法直接打印输出
- 在调用collect()函数之后,RDD中的所有元素都会被拉取到Spark集群的driver节点上,形成一个列表,然后可以对该列表进行进一步的处理或分析
- 文件数据转换为RDD
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 读取文件数据转换成RDD, 文件中的一行数据对应rdd的一个元素
# 本地磁盘文件 file
rdd1 = sc.textFile(name='file:///root/data/employees.json')
print(rdd1.collect())
# 文件要先上传到HDFS中
# HDFS文件 hdfs
rdd2 = sc.textFile(name='hdfs://192.168.88.100:8020/data/words.txt')
print(rdd2.collect())
# 读取目录下的所有文件转换成RDD
# 本地磁盘目录
rdd3 = sc.textFile(name="file:///root/data")
print(rdd3.collect())
# HDFS目录, 默认读取HDFS上的文件
rdd4 = sc.textFile(name='/data')
# print(rdd4.collect())
- 读取文件数据: 指定文件路径
- 读取目录下的所有文件数据: 指定目录路径
- 路径:可以是本地磁盘路径 file://文件绝对路径
也可以是HDFS路径(默认是HDFS) hdfs://ip:port/文件路径 port:8020(服务端端口号) 可以省略hdfs://ip:port
- RDD分区设置
rdd的分区就是将数据拆分成多份,一个分区的数据就对应着一个task线程,cpu的一个核处理一个task线程
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 查看默认分区数
print(sc.defaultParallelism)
# sc.parallelize(c=, numSlices=)
# numSlices: 设置分区数
rdd1 = sc.parallelize(c=[1, 2, 3, 4, 5])
print(rdd1.collect())
# 查看分区数据
print(rdd1.glom().collect())
rdd2 = sc.parallelize(c=[1, 2, 3, 4, 5], numSlices=4)
# 查看分区数据
print(rdd2.glom().collect())
print(rdd2.collect())
# sc.textFile(name=, minPartitions=)
# minPartitions: 设置最小分区数, 实际分区数有可能比设置的分区数要多1个分区
# 文件中的一行数据表示rdd的一个元素, 对数据进行分区时是不会将一行数据进行拆分
"""
1B=1字节, 1K=1024B=1024字节
单个文件分区规则:
文件字节数/分区数 = 分区字节数...余数
余数/分区字节数 = 结果值 结果值 大于10% 会多创建
目录下多个文件分区规则:
所有文件字节数/分区数 = 分区字节数...余数
每个文件字节数/分区字节数 = 分区数...余数
余数/分区字节数 = 结果值 结果值 大于10% 会多创建
"""
rdd3 = sc.textFile(name='/data/words.txt')
print(rdd3.glom().collect())
rdd4 = sc.textFile(name='/data/words.txt', minPartitions=4)
print(rdd4.glom().collect())
rdd5 = sc.textFile(name='/data', minPartitions=3)
# print(rdd5.glom().collect()) # 打印出一堆数据
- 读取小文件数据转换为RDD
通过wholeTextFiles()方法读取目录小多个小文件数据时以文件为单位合并到分区中, 此时就减少了分区数
from pyspark import SparkContext
sc = SparkContext()
# 读取文件数据转换成RDD
# 读取多个小文件数据时, 一个文件数据对应一个分区, 一个分区对应一个task线程任务, 此时会产生多个task线程
# 每个分区的数据大小很小的, 此时会造成资源浪费问题, 可以通过wholeTextFiles方法解决此问题
rdd1 = sc.textFile('/data', minPartitions=3)
print(rdd1.glom().collect())
rdd2 = sc.wholeTextFiles('/data')
print(rdd2.glom().collect())
RDD常用算子
- 算子分类
transformation 转换算子 | action 执行算子 |
---|---|
rdd调用此类算子得到一个新的rdd | rdd调用此类算子得到最终结果, 不再是一个rdd |
使用此类算子不会触发spark的job, 类似于多任务中定义了一个任务 | 使用此类算子会触发spark的job, 类似与多任务中执行了一个任务 |
"""
transformation算子: 定义一个job
action算子: 执行一个job
"""
from threading import Thread
def func1():
print("正在跳舞...")
def func2():
print('正在唱歌...')
if __name__ == '__main__':
# 定义线程对象执行多任务, transformation算子
t1 = Thread(target=func1)
t2 = Thread(target=func2)
print(t1)
print(t2)
# 执行多任务, action算子
t1.start()
t2.start()
常用transformation算子
map算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4])
def func1(x):
print(f"x的值是:{x}")
return x+1
rdd2 = rdd1.map(f=lambda x: x + 1)
print(rdd2)
print(type(rdd2))
print(rdd2.collect())
rdd3 = rdd2.map(f=func1)
print(rdd3.collect())
def func2(x):
return [x]
rdd4 = rdd1.map(func2)
print(rdd4.collect())
# 需求: rdd1->['hello', 'world', 'spark', 'hello'] 转换成 rdd2
# ->
# [('hello', 1), ('world', 1), ('spark', 1), ('hello', 1)]
a = sc.parallelize(['hello', 'world', 'spark', 'hello'])
# 第一种做法
def func3(x):
return (x,1)
rdd5 = a.map(func3)
print(rdd5.collect())
# 第二种做法
rdd6 = a.map(lambda x:(x,1))
print(rdd6.collect())
- map接受一个函数名(引用)
- 将rdd1中每个元素经过func函数处理后, 将func函数返回值保存到新的rdd2中
- rdd1->[1,2,3,4] 经过func x+1处理 得到rdd2->[2,3,4,5]
- 注意点: map算子不会改变原rdd元素经过func函数处理后的返回值结构rdd1->[1,2,3,4] 经过func list()处理 得到rdd2->[[1],[2],[3],[4]]
flatMap算子
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize([[1, 2, 3], [4, 5, 6]])
# 定义func1函数
def func1(x):
print(f"x的值是:{x}")
return x
# 调用map算子
rdd1_map = rdd1.map(f=func1)
print(rdd1_map.collect())
# 调用map算子
rdd1_map = rdd1.map(f=func1)
print(rdd1_map.collect())
# 给rdd1中每个子列表在末尾添加一个数字10
list1 = [1, 2, 3, 4]
list1.append(10)
print(list1)
def func2(x):
x.append(10)
return x
# append方法是没有返回值,是将None值返回, 将None值保存到新的rdd中
rdd1_append = rdd1.map(lambda x: x.append(10))
print(rdd1_append.collect())
rdd1_append2 = rdd1.map(func2)
print(rdd1_append2.collect()) # [[1, 2, 3, 10], [4, 5, 6, 10]]
# 调用flatMap
# append方法没有返回值, 将None值返回, None值类型是不可以拆分,
# 所以报错 TypeError: 'NoneType' object is not iterable
# rdd1_append3 = rdd1.flatMap(lambda x: x.append(10))
# print(rdd1_append3.collect())
rdd1_append3 = rdd1.flatMap(func2)
print(rdd1_append3.collect())
# 需求: rdd1->['hello,world,spark', 'hive,spark,world']
# 转换成rdd2->['hello', 'world', 'spark', 'hive', 'spark', 'world']
a = sc.parallelize(['hello,world,spark', 'hive,spark,world'])
rdd2 = a.flatMap(lambda x: x.split(','))
print(rdd2.collect())
- flatMap接受一个函数名(引用), 一般用于处理嵌套结构rdd, rdd中的每个元素是容器类型(str/list/set/tuple/dict)
- flatMap接受的函数需要返回一个容器类型, 容器才可以拆分
- 将rdd1中每个元素经过func函数处理后, 将func函数返回值拆分之后保存到新的rdd2中
- rdd1->[[1,2,3,4], [2,3,4,5]] 经过func append(10)处理 得到rdd2->[1,2,3,4,10,2,3,4,5,10]
- 注意点: flatMap算子会改变原rdd元素经过func函数处理后的返回值结构, 将返回值容器拆分开保存到新rdd中
filter算子 判断
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize([1, 2, 3, 4])
int1 = 1
print(int1 % 2 == 0)
rdd2 = rdd1.filter(lambda x:x%2 == 0)
print(rdd2.collect())
# 需求: 获取rdd1中年龄信息小于25岁的数据 获取rdd1中性别为男的数据
a = sc.parallelize([[1, '小明', 20, '男'],
[2, '小红', 16, '女'],
[3, '老王', 35, '男'],
[4, '催化', 24, '女']])
# 利用下标获取数据
rdd3 = a.filter(lambda x: x[2] < 25)
print(rdd3.collect())
rdd4 = a.filter(lambda x: x[3] == '男')
print(rdd4.collect())
- func传递匿名函数, 匿名函数实现对rdd中每个元素进行判断(返回True或False)
- rdd1中每个元素进过函数处理,保留函数为True对应的元素, 实现过滤rdd中的元素操作(数据清洗)
distinct算子 去重
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize([1, 2, 3, 4, 1, 5, 2, 6, 7])
rdd2 = rdd1.distinct()
print(rdd2.collect())
groupBy&mapValues算子
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
rdd1 = sc.parallelize([1, 2, 3, 4, 1, 5, 2, 6, 7])
# 将rdd1中的数据分成奇数组(余数为1)和偶数组(余数为0)
# 计算rdd1中每个元素和2相除的余数, 余数作为新rdd的key值,余数对应的x值作为新rdd的value值
rdd2 = rdd1.groupBy(lambda x: x % 2)
print(rdd2.collect())
# mapValues: 对k-v数据格式的rdd的v值经过函数处理
rdd3 = rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())
# 需求: 将rdd1中的数据按照性别进行分组
a = sc.parallelize([[1, '小明', 20, '男'],
[2, '小红', 16, '女'],
[3, '老王', 35, '男'],
[4, '翠花', 24, '女']])
rcc1 = a.groupBy(lambda x: x[3])
print(rcc1.collect())
rcc2 = rcc1.mapValues(lambda x: list(x))
print(rcc2.collect())
- 将rdd1中的元素经过func函数处理, 将func函数的返回值作为分组key, 将rdd1中的元素作为分组value, 实现分组操作
- groupBy算子返回k-v数据格式的rdd, k就是分组名,value就是组内的元素 [[‘a’,(1,2,3)],[‘b’,(4,5,6)]]
- 将k-v数据格式的rdd中的value值经过func函数处理, 将返回的新value值保存到新rdd2中
groupbykey&mapValues算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
# groupbykey
# rdd2=rdd1.groupByKey(): 根据rdd中的key值对rdd的元素进行分组操作, 将相同key值对应的value值保存到一起
rdd_groupbykey = rdd1.groupByKey()
print(rdd_groupbykey.collect())
rdd_mapvalues = rdd_groupbykey.mapValues(lambda x: list(x))
print(rdd_mapvalues.collect())
没有预聚合操作
reducebykey算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
# reducebykey
# 分组求和操作
# 将rdd根据key分组, 同一组中的value值进行求和操作
def func(x,y):
print(f'x的值是:{x}')
print(f'y的值是:{y}')
return x + y
rdd_reducebykey1 = rdd1.reduceByKey(func)
rdd_reducebykey2 = rdd1.reduceByKey(func=lambda x, y: x + y)
rdd_reducebykey3 = rdd1.reduceByKey(lambda x, y: x + y)
print(rdd_reducebykey1.collect())
print(rdd_reducebykey2.collect())
print(rdd_reducebykey3.collect())
- 将rdd中的相同key值对应的所有value经过func函数聚合操作, 分组聚合
- 有预聚合操作
sortbykey算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
# sortbykey
# rdd1.sortByKey(ascending=True, numPartitions=None, keyfunc=None):
# 根据key值对应rdd中的元素进行排序操作, 默认升序
# 升序
rdd_sortbykey1 = rdd1.sortByKey()
print(rdd_sortbykey1.collect())
# 降序
rdd_sortbykey2 = rdd1.sortByKey(ascending=False)
print(rdd_sortbykey2.collect())
sortby算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
# sortby
# 根据func函数指定的rdd中的元素值对rdd进行排序操作, 默认升序
# # x->('a', 1)
rdd_sortby = rdd1.sortBy(keyfunc=lambda x: x[1], ascending=False)
print(rdd_sortby.collect())
join算子
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
#join
# 创建RDD
rdd2 = sc.parallelize([("a", 1), ("b", 4)])
rdd2 = sc.parallelize([("a", 2), ("a", 3)])
# join 返回交集
rdd_join = rdd1.join(other=rdd2)
print(rdd_join.collect())
# leftOuterJoin
# 保留左rdd中所有key值对应的value值, 右rdd中关联不上的用None值代替value值
rdd_leftouterjoin = rdd1.leftOuterJoin(other=rdd2)
print(rdd_leftouterjoin.collect())
常用action算子
collect()算子
from pyspark import SparkContext
# 创建sc对象
sc = SparkContext()
# 创建rdd
rdd = sc.parallelize([1, 2, 3, 4])
# collect() 收集rdd中所有元素数据,返回列表类型 使用之前需要评估rdd中的数据量(数据量大容易造成内存溢出问题)
print(rdd.collect())
print(type(rdd.collect()))
take()算子
收集指定num数的rdd中的元素数据,返回列表类型
# take() 收集指定num数的rdd中的元素数据,返回列表类型
print(rdd.take(num=2))
print(type(rdd.take(2)))
reduce()算子
将一维rdd中的每个元素经过func函数进行聚合操作
# reduce() 将一维rdd中的每个元素经过func函数进行聚合操作
result = rdd.reduce(lambda x, y: x + y)
print(result)
countByValue()算子
print(result.items()) # 以字典的形式返回
print(result.keys()) # 获取字典中的key出现的次数
print(result.values()) # 获取字典中的value出现的次数
# countByValue() 统计rdd中每个元素出现的次数, 以key->rdd的元素, value->次数形式保存到字典中
result = rdd.countByValue()
print(result) #以元组的形式返回
print(result.items()) # 以字典的形式返回
print(result.keys()) # 获取字典中的key出现的次数
print(result.values()) # 获取字典中的value出现的次数
count()算子
统计rdd中的元素个数
# count() 统计rdd中的元素个数
print(rdd.count())
k-v类型数据输出为字典数据
如果RDD中的键有重复,那么每个键在字典中只会出现一次。对于键值对中的重复键,collectAsMap()只会保留该键最后一次出现时对应的值
- print(rdd_kv.collectAsMap())
# k-v类型数据输出为字典数据
rdd_kv = sc.parallelize([('a', 1), ('b', 2), ('a', 2), ('b', 5)])
# collectAsMap() 将kv格式的rdd保存到字典中, rdd中的key就是字典中的key,rdd中的value就是字典中的value
# 如果RDD中的键有重复,那么每个键在字典中只会出现一次。
# 对于键值对中的重复键,collectAsMap()只会保留该键最后一次出现时对应的值
print(rdd_kv.collectAsMap())
- result = rdd_kv.countByKey()
# countByKey() 统计kv格式的rdd中key值出现的次数, 返回字典类型 rdd的key就是字典中的key, key出现的次数就是字典中的value
result = rdd_kv.countByKey()
print(result)
print(result.items())
print(result.keys())
print(result.values())
- rdd_kv.saveAsTextFile(path=‘/data/data’)
# saveAsTextFile()
#将rdd保存到指定目录下的文件中,一个分区对应一个文件 注意: path是一个目录路径, 并且目录是不存在的, 可以保存到hdfs或本地磁盘中
# 如果目录存在发生报错: Output directory hdfs://node1:8020/data/data already exists
rdd_kv.saveAsTextFile(path='/data/data')
RDD缓存和checkpoint对比
缓存 | checkpoint | |
---|---|---|
生命周期 | spark程序结束后缓存的rdd会自动被销毁 | spark程序结束后checkpoint的rdd不会被销毁 |
存储位置 | 缓存的rdd是优先存储在内存中, 如果内存不足可以通过设置缓存级别存储在本地磁盘中 | checkpoint的rdd是存储在HDFS上, 实现了分块多副本 |
依赖关系 | 缓存的rdd不会删除依赖关系, 缓存rdd丢失还可以通过依赖关系计算得到 | checkpoint的rdd会删除依赖关系, HDFS上的数据不会自动删除(分块多副本机制) |
RDD依赖
-
新rdd是由旧rdd计算得到的, 新旧rdd之间旧存在依赖关系(因果关系) (rdd1->rdd2->rdd3->rdd4)
-
依赖分类
窄依赖 | 宽依赖 |
---|---|
父RDD中的一个分区数据最多只能被子RDD中的一个分区所使用 | 父RDD中的一个分区数据被子RDD中的多个分区所使用 |
父RDD分区和子RDD分区是一对一关系 | 父RDD分区和子RDD分区是一对多关系 |
触发窄依赖算子:map、flatmap、fliter | 触发宽依赖算子:groupby、groupbykey、sortby、sortbykey、reducebykey、distinct |
|
依赖管理
DAG有向无环图
- spark graphx组件中的一种图计算算法\
- 管理RDD之间的依赖关系, spark程序根据依赖关系执行
- DAG根据RDD之间的依赖关系进行计算步骤的划分, 一个计算步骤就是一个stage阶段
- DAG遇到宽依赖关系的RDD就会划分一个新的stage, 如果都是窄依赖关系的RDD, 它们是在同一个stage中
- spark的WEB UI界面查看有向无环图和计算流程
APP -> spark程序(一个python文件/一个交互式界面)
job -> spark计算任务(由rdd的action算子触发得到的)
为什么要进行stage划分
- spark的task是以线程方式执行, 线程执行时会抢夺计算资源
- 划分stage之后, 同一个stage阶段中的多个线程可以同时计算
- 下一个stage阶段任务要同时计算时需要等上一个stage阶段的所有task线程任务执行完后再计算
- 遇到宽依赖关系的算子就会划分新的stage阶段
SparkSQL
概念及特点
概念
- spark五大组件之一, 处理结构化数据
DataFrame -> python spark
DataSet -> java/scala spark- 使用纯SQL或编程语言的DSL方式进行计算
- 使用SparkSQL组件处理数据比Spark Core更加简洁, 但是spark sql底层还是转换成RDD任务提交执行
特点
- 融合性
sparkSQL支持纯sql代码编写, 类似于hiveSQL
sparkSQL支持多种编程语言的DSL代码编写方式(将SQL中关键字转换成df对象的方法使用, 类似于pandas的df操作)- 统一数据访问
使用统一的API实现读写数据
读:使用== ss.read.xxx() ==方式读取各种存储工具中的数据
写:使用 == df.write.xxx() ==方式将df数据保存到各种存储工具中- 兼容hive
使用hiveSQL中的方法处理数据
将sparkSQL的计算结果直接映射成hive数据表- 标准化数据连接
可以使用pycharm/datagrip工具连接各种数据库编写sql代码
支持jdbc和odbc两种方式连接sparkSQL(读写HDFS数据, HDFS和Hive映射)
SparkSQL和HiveSQL比较
1. hiveSQL
通过hive工具HDFS上的文件数据映射成数据表
数据表的元数据是交给hive的metastore服务管理, 存储到对应的数据库中(mysql,derby)
hive提供sql编写代码方式执行代码, 底层通过MapReduce计算引擎执行任务
2. hive on spark模式
编写hiveSQL代码, 借助shark引擎转换成spark的rdd任务, 交给spark计算引擎执行任务
3. spark on hive模式
开发了sparkSQL组件
编写sparkSQL代码执行任务, 借助catalyst引擎将sparkSQL转换成rdd任务交给spark计算引擎执行
sparkSQL中的数据表元数据由hive的metastore服务管理
Spark的数据类型
RDD | DataFrame | DataSet |
---|---|---|
spark core组件处理的数据类型 | spark SQL组件中处理的数据类型 | spark SQL组件中处理的数据类型 |
也是spark中的核心数据类型, 其他的组件数据最终都是要转换成rdd任务执行] | 结构化数据, 将RDD的数据映射成数据表(表结构和数据) | 结构化数据, 将DataFrame的数据映射成表数据 |
python,java,scala,R都有RDD | python,java,scala,R都有DataFrame | DataSet中的一个元素就是一个DataFrame |
二维的RDD只有数据,没有表结构(字段名,字段类型) | java,scala有DataSet |
DataFrame数据类型
概念
- 以列形式构成的弹性分布式数据集合
- DataFrame是由行对象和表结构对象组成的二维结构化数据
行对象 -> 一行数据就是一个行对象
schema对象(表结构) -> 字段名和字段类型
各类获取方式
# df是由row对象和schema对象组成
from pyspark.sql import Row # row对象的类
# 创建row对象, 一个row对象代表一行数据
row1 = Row(id=1, name='小明', age=18)
row2 = Row(id=2, name='小红', age=20)
print(row1)
print(row2)
print(type(row1))
# 获取对象的属性 对象名.属性名
print(row1.id)
print(row1.name)
print(row1.age)
# 获取对象的属性 对象名[属性名]
print(row2['age'])
# row对象的属性值不能修改 Exception: Row is read-only
# row1.id = 100
# print(row1.id)
from pyspark.sql.types import * # 字段类型对象的类
# 创建schema对象 表结构->字段名和字段类型
# 通过StructType类创建schema对象
# 通过schema对象的add方法添加字段名/字段类型/是否允许为null
# field: 字段信息, 可以写字段名也可以写字段对象
# data_type: 字段类型, 类型对象, 从pyspark.sql.types中获取类型对象
# nullable: 字段是否允许为null, 默认True
schema1 = (StructType()
.add(field='id', data_type=IntegerType(), nullable=True)
.add(field ='name',data_type=StringType(),nullable=False)
.add(field='age',data_type=FloatType()))
# 获取schema对象中的某个字段
print(schema1['id'])
print(schema1[1])
# 通过StructType类传递fields属性值创建schema对象
schema2 = StructType(fields=[StructField('id',IntegerType(),True),
StructField('name',StringType())])
print(schema2)
# add()方法中的field参数传递StructField对象创建schema对象
schema3 = (StructType().
add(field=StructField('id',IntegerType(),True))
.add(field=StructField('name',StringType(),True)))
print(schema3)
DataFrame创建
createDataFrame创建
schema对象 =StructType(). add(field=‘age’, data_type=IntegerType())
# 注意点
# 1、定义的schema信息中的字段个数要和数据中的个数保持一致
# 2、定义的schema信息中的字段类型要和数据中的类型保持一致
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
# 创建ss对象
# SparkSession.builder: 类名.类属性名
# builder: 类属性名 等于Builder() ->Builder类的对象
# getOrCreate: Builder类的对象的方法
ss = SparkSession.builder.getOrCreate() # 和sc差不多的入口
# df对象由row对象和schema组成
# row对象
row1 = Row(id=1, name='小明', age=18)
row2 = Row(id=2, name='小红', age=16)
# schema对象
schema = (StructType().
add(field='id', data_type=IntegerType(), nullable=True).
add(field='name', data_type=StringType(), nullable=False).
add(field='age', data_type=IntegerType()))
# 创建df对象
# 调用sparksession对象中的createDataFrame方法
# [row1, row2] -> 二维数据结构 结构化数据
# df = ss.createDataFrame(data=[row1, row2], schema=schema)
df = ss.createDataFrame(data=[[1, '小明', 18],
[2, '小王', 19],
[3, '小李', 20]], schema=schema)
# 查看df数据
df.show()
# 查看df的表结构
df.printSchema()
RDD转DF
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 创建ss对象
ss = SparkSession.builder.getOrCreate()
# 创建sc对象
# sparkContext不要加括号, 通过@property装饰器装饰的方法在调用时就可以不用加括号
sc = ss.sparkContext
# 创建rdd对象
# 二维数据结构的rdd
rdd = sc.parallelize([[1, '张三', 20],
[2, '李四', 22]])
# 一维数据结构的rdd
# rdd = sc.parallelize([1, '张三', 20])
print(rdd.collect())
# 调用rdd的toDF算子转成df对象
df1 = rdd.toDF(schema='id int,name string,age int')
print(df1)
df1.show()
df1.printSchema()
schema = (StructType()
.add('id',IntegerType())
.add('name',StringType(),nullable=False)
.add('age',IntegerType()))
df2 = rdd.toDF(schema=schema)
df2.show()
df2.printSchema()
# df对象转换成rdd对象
# rdd中的每个元素就是df中每个row对象, df中的一行数据就是一个row对象
# 过@property装饰器装饰的方法在调用时就可以不用加括号
new_rdd = df2.rdd
print(df2.rdd.collect())
print(type(new_rdd))
print(new_rdd.collect())
# result列表类型 [row1, row2, ...]
result = new_rdd.collect()
for row in result:
print(row)
print(row.id)
print(row.name)
print(row.age)
print(row['age'])
print(row[1])
# x->Row(id=1, name='张三', age=20)
# x.id -> 获取row对象的id属性
rdd_map = new_rdd.map(lambda x:x.id)
print(rdd_map.collect())
# 停止程序
ss.stop()
DataFrame基本使用
基本DSL方法
- 查询 df.select()
- 字符串表达式 df.selectExpr(字符串表达式)
- 条件判断 df.where(‘age>23’)
- 分组聚合 df_select6.groupBy(‘gender’).avg(‘age’)
- 排序 df_select6.orderBy(‘age’, ascending=True)
- 限制行数 df_select6.limit(num=40)
- 打印 df_select6.show(n=30, truncate=False)
- 新增列操作 .withColumn(colName=‘new_age’, col=df_select6.age + 100).show()
- 修改列名操作 .alias(‘new_name’)) .withColumnRenamed(‘avg(age)’, ‘new_age’) .toDF(‘major’, ‘gender’, ‘avg_age’)
- 是否包含’人‘这个字 再通过filter判断 df.filter(df[‘对手’].contains(‘人’)).show()
- 以’熊’结尾的 df.filter(df[‘对手’].endswith(‘熊’)).show()
- 判断数据是否为空返回以‘灰’开头的like df.filter(df[‘对手’].like(‘灰%’)).show()
- 如果对手是灰熊 flag列返回1 不是则返回0 df.withColumn(‘flag’,F.when(df[‘对手’]==‘灰熊’,1).otherwise(0)).show()
# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
res1 = rdd.take(5)
print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.take(5)
print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string'
)
# 查看转化的df数据
df.show()
print('======================== select ================================')
# select 字段名1, 字段名2, .. from df
# 查询所有字段数据
df_select1 = df.select('*')
df_select1.show()
# 查询指定字段数据
# 字段名
df_select2 = df.select('id')
df_select2.show()
# df[字段名]
df_select3 = df.select('id', df['name'])
df_select3.show()
# df.字段名
df_select4 = df.select('id', df['name'], df.age)
df_select4.show()
# 给字段设置别名
# select 字段名 as 新字段名 from df
# df[列名].alias('别名') -> 字段的alias方法
df_select5 = df.select('id', df['name'].alias('new_name'), df.age.alias('new_age'))
df_select5.show()
# 修改字段数据类型
# select cast(字段名 as 新数据类型) from df
df_select6 = df.select(df['id'].astype('int'), df['name'], 'gender', df.age.cast('int'),
'birthday', df.major, df.hobby, df.create_time)
df_select6.show()
df_select6.printSchema()
print('======================== selectExpr ================================')
# df.selectExpr(字符串表达式)
df_selectexpr1 = df_select6.selectExpr("id", "age*2 as new_age", "cast(age as string)", "name as new_name")
df_selectexpr1.show()
df_selectexpr1.printSchema()
print('======================== where ================================')
# df.where(条件表达式)
# 获取年龄大于23的学生数据
# select * from df where age > 23
df_where1 = df.where('age>23')
df_where1.show()
df_where2 = df.where(df['age']>23)
df_where2.show()
# 获取年龄大于23并且性别为男的学生数据
# select * from df where age>23 and gender='男'
df_where3 = df.where('age>23 and gender="男"')
df_where3.show()
# &->and |->or
df_where4 = df.where((df['age']>23) & (df['gender']=='男'))
df_where4.show()
print('======================== groupby ================================')
# 分组聚合操作
# select gender, avg(age) from df group by gender
df_groupby1 = df_select6.groupBy('gender').avg('age')
# df_groupby1 = df_select6.groupBy('gender').mean('age')
df_groupby1.show()
df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age', 'id')
df_groupby2.show()
# 分组聚合过滤 where等同于having
# select gender, avg(age) from df group by gender having avg(age)>23
df_groupby3 = df_select6.groupby(['major', 'gender']).avg('age').where('avg(age)>21.5')
df_groupby3.show()
print('======================== orderBy ================================')
# df_select6.sort()
# 一列数据排序
# select * from df order by age asc/desc
df_orderby1 = df_select6.orderBy('age', ascending=True)
df_orderby1.show()
# 多列数据排序
df_orderby2 = df_select6.sort(df_select6['age'], df_select6['id'], ascending=False)
df_orderby2.show()
df_orderby3 = df_select6.sort(['age', 'id'], ascending=[True, False])
df_orderby3.show()
df_orderby4 = df_select6.sort(df_select6['age'].desc(), df_select6['id'].asc())
df_orderby4.show()
print('======================== limit ================================')
# 限制df的行数
df_limit = df_select6.limit(num=40)
print(df_limit.count())
df_limit.show(n=100)
print('======================== show ================================')
# 打印df的条目数, 默认20条, 可以打印更多条数据
# limit是df只有num数量的条目数, show展示n数量的条目数
# truncate:默认是True, 是否取消显示省略
df_select6.show(n=30, truncate=False)
print('------------------withColumn 新增列操作-----------------------')
# df_select6.select('*', df_select6.age + 10).show()
# 在原df后边新增一列
df_select6.withColumn(colName='new_age', col=df_select6.age + 100).show()
df_select6.withColumn(colName='new_age', col=df_select6['age'] + 100).show()
# 新增一列常数列
from pyspark.sql.functions import lit
df_select6.withColumn(colName='new_age', col=lit(100)).show()
print('======================== 修改列名操作 ================================')
# df[列名].alias(新列名)
df_select6.select(df_select6['id'], df_select6['name'].alias('new_name')).show()
# withColumnRenamed(原列名, 新列名)
df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age').withColumnRenamed('avg(age)', 'new_age')
df_groupby2.show()
# df.toDF(新列名1, 新列名2, ...) -> 修改所有列的列名
df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age').toDF('major', 'gender', 'avg_age')
df_groupby2.show()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
ss = SparkSession.builder.getOrCreate()
# todo: 需求1-读取数据,转换每一行数据为Row
df = ss.read.csv('hdfs://192.168.88.100:8020/data/data.csv')
df.show()
df = df.withColumn('对手',df[1].cast(StringType()))
df = df.withColumn('胜负',df[2].cast(StringType()))
df = df.withColumn('主客场',df[3].cast(StringType()))
df = df.withColumn('命中',df[4].cast(IntegerType()))
df = df.withColumn('投篮数',df[5].cast(IntegerType()))
df = df.withColumn('投篮命中率',df[6].cast(FloatType()))
df = df.withColumn('3分命中率',df[7].cast(FloatType()))
df = df.withColumn('篮板',df[8].cast(IntegerType()))
df = df.withColumn('助攻',df[9].cast(IntegerType()))
df = df.withColumn('得分',df[10].cast(IntegerType()))
df.show()
# todo: 需求2:alias起一个别名
df.select(df['对手'].alias('比赛对手')).show(3)
# todo: 需求3:根据得分升序排列,并打印前5个对手和得分
df.select('对手','得分').orderBy(df['得分'].asc()).show(5)
# todo: between
# todo: 一个布尔表达式,如果该表达式的值在给定列之间,则计算为true。可用于筛选满足条件的Row
# todo: 需求4:筛选出得分在15-20之间数据(包含边界)
df1 = df.select('对手',df['得分']).show()
df2 = df.select('对手',df['得分'].between(15,20).alias('15-20得分'))
df2.filter(df2['15-20得分']==True).show()
# todo: 需求5:contains(other) 包含其他元素。 根据字符串匹配返回一个布尔列
df.filter(df['对手'].contains('人')).show()
# todo: endswith(other) boolen值 以other结尾的字符串
# todo: 需求6:以“熊”结尾的字符串
df.filter(df['对手'].endswith('熊')).show()
# todo: 需求9:判断数据是否为空返回以‘灰’开头的
# todo: like(other)
# todo: 类似于SQL中的like, 返回基于SQL
# todo: LIKE匹配的布尔列
df.filter(df['对手'].like('灰%')).show()
df.select('对手', '胜负', '主客场', '得分').where(df['对手'].like('灰%')).show()
# todo: otherwise(value)
# todo: 计算条件列表,并返回多个可能的结果表达式之一。
# todo: 需求10:增加标志列flag,将灰熊标志为1,其他对手标志为0
from pyspark.sql import functions as F
df.withColumn('flag',F.when(df['对手']=='灰熊',1).otherwise(0)).show()
# todo: when(condition, value)
# todo: 计算条件列表,并返回多个可能的结果表达式之一。 如果未调用Column.otherwise(),则对于不匹配的条件,将不返回None。
# todo: 需求11:查找得分大于25的对手,标记为1,否则标记为0,标记列名为‘score_flag’
df.select('对手','得分',F.when(df['得分']>25,1).otherwise(0).alias('score_flag')).show()
# todo: 需求11:# 求得分的均值
# todo: 聚合函数,等价于pandas中的df.groupby().agg()
a = df.agg({'得分':'mean'}).collect()
print(a)
# todo: 需求12:去重
# todo: 按行去重,返回去重后的DataFrame
df.select(['主客场', '命中']).distinct().show()
SQL语句
映射临时表 df.createTempView(‘words’)
sql写法 df_sql = ss.sql(“”“SQL语句”“”) df_sql.show()
# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
# res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,'
'major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
# SQL方式
# 将df映射成一张表
df.createTempView(name='stu')
sql_str = "select gender,avg(age) from stu group by gender"
# 执行sql语句, 返回df对象
df_groupby = ss.sql(sqlQuery=sql_str)
print(type(df_groupby))
df_groupby.show()
df_groupby.printSchema()
# DSL方式
df2 = df_groupby.select('gender', df_groupby['avg(CAST(age AS DOUBLE))'].alias('avg_age'))
df2.show()
高级DSL方法
- 关联方法join 上下关联 union unionall unionByName
"""
df1.join(other=, on=, how=)
other:另外一个df对象
on:关联条件
how:关联方式 默认inner left right full
df1.crossJoin(other=) -> 交叉关联, 笛卡尔积
df1.union(other=) -> df上下合并并去重
df2.unionAll(other=) -> df上下合并不去重
df1.unionByName(other=) -> 根据列名上下合并, 列名不同的用null填充
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 创建ss对象
ss = SparkSession.builder.getOrCreate()
# 创建sc对象
sc = ss.sparkContext
# 生成rdd
rdd1 = sc.parallelize([
[1, 'zhangsan', 20],
[2, 'lisi', 20],
[3, 'wangwu', 22]
])
rdd2 = sc.parallelize([
[3, 'zhaoliu', 20],
[4, 'xiaoming', 21],
[5, 'itcast', 22]
])
# 定义schema信息
schema_type = StructType(). \
add('id', IntegerType()). \
add('name', StringType()). \
add('age', IntegerType(), False)
# rdd转换成df
df1 = rdd1.toDF(schema=schema_type)
df2 = rdd2.toDF(schema=schema_type)
df1.show()
df2.show()
# 交叉关联 -> 笛卡尔积
df1.crossJoin(df2).show()
# 交叉关联 -> 笛卡尔积
# other:另外一个df对象
df_inner1 = df1.join(other=df2)
df_inner1.show()
# 内连接
# on: 两个df相同的列名 -> 只保留一个相同列
df_inner2 = df1.join(other=df2, on='id', how='inner')
df_inner2.show()
# 左连接: 保留左df所有数据, 右df关联上的保留, 关连不上的用null填充
# on: df1.列名==df2.列名 -> 保留所有相同列
df_left = df1.join(df2, df1.id == df2.id, how='left')
df_left.show()
# 右连接
# on: df1[列名]==df2[列名] -> 保留所有相同列
df_right = df1.join(df2, df1['id'] == df2['id'], how='right')
df_right.show()
# 满外连接: 保留左右df的所有数据, 关联不上的就null填充
df_full = df1.join(df2, 'id', 'full')
df_full.show()
# 多个关联条件
df1.join(df2, ['id', 'name'], 'left').show()
# 上下关联
# 相同列名上下关联, 去重
df1.union(other=df2).show()
# 相同列名上下关联, 不去重
df1.unionAll(other=df2).show()
# 根据列名相同上下关联, 列名不同的用null填充
df1.unionByName(other=df2).show()
- 缓存和checkpoint操作
1. 缓存 .persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
2. checkpoint操作 df.checkpoint()
从缓存或checkpoint中读取df
如果缓存和checkpoint都在, 优先读取缓存
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import StorageLevel
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 指定checkpoint目录路径
sc.setCheckpointDir('/dataframecheckpoint')
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
# res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,'
'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
print('------------------------- 缓存 ---------------------------------')
# 对df进行缓存操作
# df.cache()
df.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
# df.unpersist() # 手动释放缓存
print(df.is_cached)
print('------------------------- checkpoint ---------------------------------')
# 对df进行checkpoint操作
df.checkpoint()
# 从缓存或checkpoint中读取df
# 如果缓存和checkpoint都在, 优先读取缓存
df_select = df.select(df['id'].cast('int'), 'name', df.age.astype('int'))
df_select.show()
- 数据清洗方法
- 去重操作 .dropDuplicates(subset=[‘age1’]).show()
- 缺失值处理操作
删除.dropna(how=‘all’, subset=[‘name1’, ‘age1’]).show()
填充 .fillna(value=‘小明’, subset=[‘name1’]).show()- 删除列 .drop(‘name1’, ‘age2’).show()
- 替换操作 .replace(to_replace=20, value=888, subset=[‘age1’]).show()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# 创建ss对象
ss = SparkSession.builder.getOrCreate()
# 创建sc对象
sc = ss.sparkContext
# 生成rdd
rdd1 = sc.parallelize([
[1, 'zhangsan', 20],
[2, 'lisi', 20],
[3, 'wangwu', 22]
])
rdd2 = sc.parallelize([
[3, 'zhaoliu', 20],
[4, 'xiaoming', 21],
[5, 'itcast', 22]
])
# 定义schema信息
schema_type = StructType(). \
add('id', IntegerType()). \
add('name', StringType()). \
add('age', IntegerType(), False)
# rdd转换成df
df1 = rdd1.toDF(schema=schema_type)
df2 = rdd2.toDF(schema=schema_type)
df1.show()
df2.show()
# 满外连接
new_df = df1.join(df2, 'id', 'full').toDF('id', 'name1', 'age1', 'name2', 'age2')
new_df.show()
print('----------------------------- 去重操作 --------------------------------------')
# 删除重复数据
temp_df = new_df.unionAll(new_df)
temp_df.show()
# 所有列去重
temp_df.dropDuplicates().show()
# 根据指定的列进行去重
temp_df.dropDuplicates(subset=['age1']).show()
print('----------------------------- 缺失值处理操作 --------------------------------------')
# 缺失值处理
# 删除
new_df.dropna().show()
# how='all'表示仅在所有指定列中都为缺失值时才删除该行
# subset=['name1', 'age1']表示只在'name1'和'age1'这两列中查找和删除缺失值
new_df.dropna(how='all', subset=['name1', 'age1']).show()
# 填充
# 根据填充值的类型找到对应df类型列中的null进行填充
new_df.fillna(value=50).show()
new_df.fillna(value='小明').show()
# 对指定列中的null进行填充
new_df.fillna(value='小明', subset=['name1']).show()
# 对不同列中的null进行不同填充
new_df.fillna(value={'name1': '小明', 'name2': '小红'}).show()
print('----------------------------- 删除列操作 --------------------------------------')
# 删除列
# 删除一列
new_df.drop('name1').show()
# 删除多列
new_df.drop('name1', 'age2').show()
# 替换
print('----------------------------- 替换操作 --------------------------------------')
new_df.replace(to_replace='zhangsan', value='张三').show()
# 对指定的列进行值替换
new_df.replace(to_replace=20, value=888, subset=['age1']).show()
new_df.replace({20: 8888}).show()
- 内置函数
让DSL能够调用SQL语法
from pyspark.sql import functions as F
- 字符串函数
- 拼接操作 .select(‘id’, F.concat(‘name’, ‘gender’).alias(‘name_gender’)).show() .select(‘id’, F.concat_ws(‘-’, ‘name’, ‘gender’).alias(‘name_gender’)).show()
- 分割操作 .select(‘id’, F.split(‘birthday’, ‘-’)).show()
- 截取操作 .select(‘id’, F.substring(‘birthday’, 2, 4).alias(‘year’)).show()
截取部分 df.select(‘id’, F.substring_index(‘birthday’, ‘-’, 1/-1)).show()- 正则替换操作 .select(‘id’, F.regexp_replace(‘birthday’, ‘-’, ‘/’)).show()
- 正则抽取操作 .select(‘id’, F.regexp_extract(‘birthday’, r’(.?)-(.?)-(.*)', 1)).show()
"""
F.concat(列名1, 列名2, ...): 将列对应的字符串进行拼接
F.concat_ws(拼接符, 列名1, 列名2, ...): 将列对应的字符串根据指定的拼接符拼接
F.split(字段名, 分割符): 将列根据指定的分割符进行分割, 返回列表
F.substring(字段名, 开始位置, 长度): 对列从开始位置截取指定长度的字符, 开始位置从1开始
F.substring_index(字段名, 截取符, 第n个截取符):
n: 正数, 从第n个截取符开始, 截取左侧部分的字符; 负数, 从第n个截取符开始, 截取右侧部分的字符
F.regexp_replace(字段名, 正则表达式, 替换后的值): 对指定列通过正则表达式进行替换
F.regexp_extract(字段名, 正则比倒是, 截取位置): 对指定列通过正则表达式截取指定位置的字符
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# from pyspark.sql.functions import split
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,'
'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
print('----------------------------------- 拼接操作 -----------------------------------')
# concat()
df.select('id', F.concat('name', 'gender').alias('name_gender')).show()
# concat_ws()
df.select('id', F.concat_ws('-', 'name', 'gender').alias('name_gender')).show()
print('----------------------------------- 分割操作 -----------------------------------')
df.select('id', F.split('birthday', '-')).show()
df.select('id',
F.split('birthday', '-')[0].alias('year'),
F.split('birthday', '-')[1].alias('month'),
F.split('birthday', '-')[2].alias('day')).show()
print('----------------------------------- 截取操作 -----------------------------------')
# substring(字段名, 开始位置, 长度)
df.select('id', F.substring('birthday', 0, 4).alias('year')).show()
df.select('id', F.substring('birthday', 1, 4).alias('year')).show()
df.select('id', F.substring('birthday', 2, 4).alias('year')).show()
# substring_index()
# 截取年份部分 正数->截取左侧部分
df.select('id', F.substring_index('birthday', '-', 1)).show()
# 截取年月部分
df.select('id', F.substring_index('birthday', '-', 2)).show()
# 截取天部分 负数->截取右侧部分
df.select('id', F.substring_index('birthday', '-', -1)).show()
print('----------------------------------- 正则替换操作 -----------------------------------')
# 将-替换成/
df.select('id', F.regexp_replace('birthday', '-', '/')).show()
# 将整数替换成/
df.select('id', F.regexp_replace('birthday', r'\d', '/')).show()
print('----------------------------------- 正则抽取操作 -----------------------------------')
# 获取第一组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 1)).show()
# 获取第二组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 2)).show()
# 获取第三组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 3)).show()
- 时间函数
- 获取当前时间 .select(F.current_date()).show()
- 获取当前日期时间 年月日时分秒毫秒 .select(F.current_timestamp()).show(truncate=False)
- 获取当前时间戳 秒 .select(F.unix_timestamp()).show(truncate=False)
- 日期时间格式化 .select(F.date_format(‘birthday’, ‘yyyy/MM/dd’)).show()
- unix时间转化 .select(F.from_unixtime(‘create_time’, ‘yyyy-MM-dd HH:mm:ss’)).show()
- 获取部分时间 F.year(‘birthday’) F.month(‘birthday’) F.dayofmonth(‘birthday’) F.quarter(‘birthday’)
- 时间运算
加多少天 .select(F.date_add(‘birthday’, 10)).show()
减多少天 .select(F.date_sub(‘birthday’, 10)).show()
加多少个月 .select(F.add_months(‘birthday’, 2)).show()
RDD 时间作差 .select(F.datediff(F.current_date(), ‘birthday’)).show()
获取当前月的最后一天 .select(F.last_day(‘birthday’)).show()
下一个星期日期 .select(F.next_day(‘birthday’, ‘Sun’)).show()
from pyspark.sql import SparkSession, functions as F
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,'
'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
print('------------------------获取当前时间------------------------')
# 获取当前日期 年月日
df.select(F.current_date()).show()
# 获取当前日期时间 年月日时分秒毫秒
# show(truncate=False):以完整的格式显示选择的列的结果,不截断输出
df.select(F.current_timestamp()).show(truncate=False)
df.select(F.current_timestamp()).show()
# 获取当前时间戳 秒
df.select(F.unix_timestamp()).show(truncate=False)
df.select(F.unix_timestamp()).show()
print('------------------------日期时间格式化------------------------')
# 将日期时间格式数据进行格式化
# yyyy->年
# MM->月
# dd->日
# HH->小时
# mm->分钟
# ss->秒
df.select(F.date_format('birthday', 'yyyy/MM/dd')).show()
df.select(F.date_format('birthday', 'yyyy-MM')).show()
print('------------------------unix时间转化------------------------')
# 将unix时间戳转换成日期时间格式
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd HH:mm:ss')).show()
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd')).show()
print('------------------------获取部分时间------------------------')
# 获取年份
df.select(F.year('birthday')).show()
# 获取月份
df.select(F.month('birthday')).show()
# 获取日
df.select(F.dayofmonth('birthday')).show()
# 获取季度
df.select(F.quarter('birthday')).show()
print('------------------------时间运算------------------------')
# 加多少天
df.select(F.date_add('birthday', 10)).show()
# 减多少天
df.select(F.date_sub('birthday', 10)).show()
# 加多少个
df.select(F.add_months('birthday', 2)).show()
# 时间作差
df.select(F.datediff(F.current_date(), 'birthday')).show()
# 获取当前月的最后一天
df.select(F.last_day('birthday')).show()
# 下一个星期日期
df.select(F.next_day('birthday', 'Sun')).show()
- 聚合函数
- 对某列直接聚合 返回一个值 .select(F.sum(‘age’), F.count(‘id’), F.countDistinct(‘id’)).show()
- 改变列的数据类型方便进行聚合计算 df.select(df.id.cast(‘int’), ‘name’, ‘gender’, df[‘age’].cast(‘int’), ‘birthday’, ‘major’, ‘hobby’,‘create_time’)
- 对一列数据进行分组聚合 .groupby(‘gender’).avg(‘age’).show() .groupby(‘gender’).agg(F.mean(‘age’).alias(‘mean_age’)).show()
- 对多列数据进行分组聚合 .groupby(‘gender’).agg({‘age’: ‘mean’, ‘id’: ‘count’}).show()
"""
sum->求和
max/min->最大/最小
mean/avg->求平均
count->计数
agg/aggregate->聚合操作, 结合聚合函数使用
"""
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,'
'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
# 对某列直接聚合, 返回一个值
df.select(F.sum('age'), F.count('id'), F.countDistinct('id')).show()
# 分组聚合
# df.id.cast('int')将id列转换为整数类型,
# 'name'、'gender'、'birthday'、'major'、'hobby'和'create_time'直接选择对应的列
# 而df['age'].cast('int')将age列转换为整数类型
df_select = df.select(df.id.cast('int'), 'name', 'gender',
df['age'].cast('int'),
'birthday', 'major', 'hobby','create_time')
# 对一列数据进行分组聚合
df_select.groupby('gender').avg('age').show()
df_select.groupby('gender').agg(F.mean('age').alias('mean_age')).show()
# 对多列数据进行分组聚合
# round(): 保留几位小数
df_select.groupby('gender').agg(F.round(F.mean('age'), 2).alias('mean_age'),
F.count('id').alias('count')).show()
df_select.groupby('gender').agg({'age': 'mean', 'id': 'count'}).show()
print(df_select.count())
- 其他函数
- 判断 .select(‘id’, ‘name’, ‘age’, F.when(F.col(‘age’) <= 30, ‘青年’) .when((df_select[‘age’] > 30) & (df_select[‘age’] <= 60),‘中年’) .otherwise(‘老年’).alias(‘age_stage’))
- 窗口函数 over(Window.partitionBy(‘字段’).orderBy(‘字段’))) from pyspark.sql import Window
- 排序函数() .over(Window.partitionBy(‘字段’).orderBy(‘字段’).desc())
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
df_select = df.select(df.id.cast('int'), 'name', 'gender', df['age'].cast('int'), 'birthday', 'major', 'hobby',
'create_time')
# SQL->case...when...
# when(条件表达式, 值)
# otherwise(以上所有条件不成立返回的值)
# F.col(列名)->获取某列数据
# age<=30->青年 30<age<=60->中年 age>60->老年
df_select.select('id', 'name', 'age',
F.when(F.col('age') <= 30, '青年')
.when((df_select['age'] > 30) & (df_select['age'] <= 60),'中年')
.otherwise('老年').alias('age_stage')).show()
# 窗口函数
# SQL->select 聚合函数 over() partition by 字段 order by 字段 desc from df
# over(Window.partitionBy('字段').orderBy('字段')))
from pyspark.sql import Window
# 聚合函数().over(Window.partitionBy('字段').orderBy('字段'))
df_select.select(df_select['id'], 'name', 'age', 'gender',
F.sum('age').over(Window.partitionBy('gender')).alias('sum_age')).show()
# 每组TOP-N
# 排序函数().over(Window.partitionBy('字段').orderBy('字段').desc())
temp_df = df_select.select('*', F.row_number()
.over(Window.partitionBy('gender').orderBy(F.col('age').desc()))
.alias('row_num'))
temp_df.show()
temp_df.where('row_num<=3').show()
SparkSession对象
- sparksession对象是通过SparkSQL入口类SparkSession创建得到的
- 可以在创建sparksession对象时选择spark部署方式,设置程序名称等一些其他参数配置
- SparkSQL中通过catalyst引擎将sql转换成rdd时,里面的优化器可以决定shuffle过程中的分区数(默认是200)
- 可以通过 spark.sql.shuffle.partitions 参数手动设置shuffle过程中的分区数
"""
sparksession对象是通过SparkSQL入口类SparkSession创建得到的
可以在创建sparksession对象时选择spark部署方式,设置程序名称等一些其他参数配置
SparkSQL中通过catalyst引擎将sql转换成rdd时,里面的优化器可以决定shuffle过程中的分区数(默认是200)
可以通过 spark.sql.shuffle.partitions 参数手动设置shuffle过程中的分区数
"""
from pyspark.sql import SparkSession, functions as F
# 创建sparksession对象
# 在builder和getOrCreate()之间配置其他参数
# master()->选择部署方式
# appName()->设置程序名称
# config(key, value)->设置其他参数
ss = (SparkSession.builder.
master('yarn').
appName('yarn_demo').
config('spark.shuffle.sort.bypassMergeThreshold', '300').
config('spark.sql.shuffle.partitions', '10').getOrCreate())
# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')
# 将rdd转为df数据
df = rdd_split.toDF(
schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()
# 分组聚合操作会发生shuffle
# SparkSQL的shuffle过程, 分区数默认会调整为200个
df.groupby('gender').agg(F.avg('age').alias('avg_age')).show()
数据源和格式
- 读取数据
- json文件 df_json = ss.read.load(path=‘/data/stut.json’, format=‘json’)
- orc文件读取 df_orc = ss.read.orc(‘file:///export/server/spark/examples/src/main/resources/users.orc’) 本地磁盘文件
- parquet文件读取 df_parquet = ss.read.parquet(‘file:///export/server/spark/examples/src/main/resources/users.parquet’)
- mysql数据库读取 df_mysql = ss.read.jdbc(url=‘jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8’,table=‘uc_store’,properties={‘user’: ‘root’,‘password’: ‘123456’,‘driver’: ‘com.mysql.jdbc.Driver’})
"""
sparksession.read.xxx(path=,...)
sparksession.read.load(path=,format=,xxx)
可以读取HDFS或本地磁盘文件
"""
from pyspark.sql import SparkSession, functions as F
ss = SparkSession.builder.getOrCreate()
print('-------------------------------csv文件-------------------------------')
# path:文件路径, 默认hdfs上
# sep:分隔符, 默认逗号,
# inferSchema: 是否自动推断数据类型, 默认false
# schema: 指定表结构
df_csv = ss.read.csv(path='/data/stu.csv', sep=',', inferSchema=True,
schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
df_csv.printSchema()
print('-------------------------------txt文件-------------------------------')
# text文件中的数据类型是字符串类型
# 将数据保存到value列中, 文件中的一行数据就是df的一行数据
# df_txt = ss.read.text(paths='/data/stu.txt')
# df_txt.show()
df_txt = ss.read.load(path='/data/stu.txt', format='text')
df_txt.show()
df_txt.select(F.split('value', ',')[0].alias('id'),
F.split('value', ',')[1].alias('name'),
F.split('value', ',')[2].alias('gender')).show()
print('-------------------------------json文件-------------------------------')
# ss.read.json(path='/data/stut.json')
# 字典中的key就是列名
df_json = ss.read.load(path='/data/stut.json', format='json')
df_json.show()
df_json.printSchema()
print('-------------------orc文件读取---------------------')
# 本地磁盘文件
df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')
df_orc.show()
print('-------------------parquet文件读取---------------------')
df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')
df_parquet.show()
print('-------------------mysql数据库读取---------------------')
# url: ip地址 端口号 数据库名
# table: 数据表
# column: 字段名
df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8',
table='uc_store',
properties={'user': 'root',
'password': '123456',
'driver': 'com.mysql.jdbc.Driver'})
# df_mysql.show()
# df_mysql.printSchema()
# 配置properties
properties = {
'url': 'jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8',
'user': 'root',
'password': '123456',
'driver': 'com.mysql.jdbc.Driver',
# 数据表
# 'dbtable': 'uc_store'
# 执行sql
'query': 'select store_id, store_name from uc_store limit 10'
}
# **properties:将字典拆分成key=value形式, 关键字传参
df_mysql = ss.read.load(format='jdbc', **properties)
df_mysql.show()
- 保存数据
- text文件 df_text.write.save(path=‘/data/data_text’, format=‘text’, mode=‘overwrite’)
- csv文件 df.write.csv(path=‘/data/data_csv’, sep=‘:’, mode=‘overwrite’)
- orc文件 df.write.orc(‘hdfs://node1:8020/data_orc’, mode=‘overwrite’)
- parquet文件 df.write.parquet(‘hdfs://node1:8020/data_parquet’, mode=‘overwrite’)
- json文件 df.write.json(‘hdfs://node1:8020/data_json’, mode=‘overwrite’)
- mysql数据库 df.write.jdbc(url=‘jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8’,mode=‘overwrite’,properties={‘user’: ‘root’, ‘password’: ‘123456’, ‘driver’: ‘com.mysql.jdbc.Driver’})
"""
df.write.xxx(path=, mode=, xxx)
df.write.save(path=, format=, mode=, xxx)
路径是目录路径
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
ss = SparkSession.builder.getOrCreate()
df = ss.createDataFrame([[1, '张三', 20, '男'],
[2, '李四', 20, '男']],
schema='id int,name string,age int,gender string')
df.show()
print('-------------------------text文件-------------------------')
# 将value字段中字符串类型数据写入到text文件中
df_text = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
df_text.show()
# 只能写入一次, 再次写入会报错
# df_text.write.text(path='/data/data_text')
# mode: 写入方式 append->追加写入 overwrite->覆盖写入
df_text.write.save(path='/data/data_text', format='text', mode='overwrite')
print('-------------------------csv文件-------------------------')
df.write.csv(path='/data/data_csv', sep=':', mode='overwrite')
# df.write.save(path='/data/data_csv', format='csv', sep=',', mode='append')
print('--------------------写入orc文件------------------')
df.write.orc('hdfs://node1:8020/data_orc', mode='overwrite')
print('--------------------写入parquet文件------------------')
df.write.parquet('hdfs://node1:8020/data_parquet', mode='overwrite')
print('--------------------写入json文件------------------')
df.write.json('hdfs://node1:8020/data_json', mode='overwrite')
print('--------------------写入mysql数据库------------------')
"""
options = {
'url': 'jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8',
'table': 'stu',
'mode': 'overwrite',
'user': 'root',
'password': '123456',
'driver': 'com.mysql.jdbc.Driver'
}
df.write.save(format='jdbc',**options)
"""
# 表不存在会自动创建, 也可以手动创建表(表结构要和df表结构一致)
df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8',
table='stu',
mode='overwrite',
properties={'user': 'root', 'password': '123456', 'driver': 'com.mysql.jdbc.Driver'})
自定义函数
自定义函数分类
UDF | UDAF | UDTF |
---|---|---|
User-Defined-Function | User-Defined Aggregation Function | python spark不能实现自定义 |
一对一, 一行数据经过自定义函数处理返回一行结果 | 多对一, 多行数据经过自定义聚合函数处理返回一行结果 | 一对多, 一行数据经过爆炸函数处理返回多行结果 |
concat/conca_ws/split/substring… | sum/count/mean/max/min/… | explode -> 将[值1, 值2, …]数据结构炸裂开, 返回多行结果 |
可以实现自定义 | 可以实现自定义 -> 借助pandas中的series对象和dataframe对象 | User-Defined Table-Generating Function -> 爆炸函数 |
- UDTF函数使用
df_explode = df_words.select(F.explode(‘words’).alias(‘words’))
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
ss = SparkSession.builder.getOrCreate()
df = ss.read.load('/data/words.txt',format='text')
df.show()
# todo: 1-DSL方式
# 将value字段中的字符串根据逗号进行分割, 返回列表类型
df_words = df.select(F.split('value',',').alias('words'))
df_words.show()
# 将[值1, 值2, ...]进行炸裂展开操作
df_explode = df_words.select(F.explode('words').alias('words'))
df_explode.show()
# 统计每个单词出现的次数 -> 分组聚合操作
df_count = df_explode.groupby('words').count()
df_count.show()
# todo: 2-SQL方式
df.createTempView('words')
df_sql = ss.sql("""with words as
(select
explode(split(value, ',')) as word
from words)
select
word, count(*) as count
from words group by word""")
df_sql.show()
- 自定义UDF函数
自定义UDF函数实现步骤:
- 定义一个python函数, 实现一行一行处理df数据
- 将定义的python函数注册到spark中
普通注册方式:
变量名=ss.udf.register(新函数名, 自定义udf函数名, 自定义udf函数返回值类型)
DSL和SQL方式都可以使用自定义UDF函数
装饰器注册方式:
from pyspark.sql.functions import udf
@udf(returnType=)
只能在DSL方式中使用自定义UDF函数
"""
自定义udf函数步骤:
① 自己定义一个python函数, 一对一关系
② 将自定义函数注册到spark中, 然后使用
普通注册方式:
变量名=ss.udf.register(新函数名, 自定义udf函数名, 自定义udf函数返回值类型)
可以在DSL方式和SQL方式中使用自定义udf函数
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import re
ss = SparkSession.builder.getOrCreate()
# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
schema='name string,age int,gender string,phone string,'
'email string,city string,address string')
df_csv.show()
# 需求: 使用自定义udf函数, 获取email列中用户名信息和公司名信息, 需要使用正则表达式匹配
"""
str1 = '[email protected]'
def func1(x):
result = re.match(r'(.*?)@(.*?)[.](.*)', x)
user_name = result.group(1)
company_name = result.group(2)
return user_name, company_name
result = func1(str1)
print(result)
user_name, company_name = func1(str1)
print(user_name, company_name)
"""
# ①自定义udf函数
def func1(x):
# 接收的是传入列名的每行数据值
# print(f'x的值是->{x}')
result = re.match(r'(.*?)@(.*?)[.](.*)', x)
user_name = result.group(1)
company_name = result.group(2)
return user_name, company_name
# ②注册udf函数
# name:新函数名
# f:udf函数名
# returnType: udf函数返回值类型,
# 如果udf函数返回值是str类型, 可以不用写returnType参数, 默认是str类型
# 否则需要通过returnType参数指定udf函数返回值类型 -> spark中的数据类型, 返回值如果是嵌套数据都需要指定数据类型 -> ArrayType(StringType())
em_func = ss.udf.register(name='em_func', f=func1, returnType=ArrayType(StringType()))
# ③DSL方式中使用自定义udf函数 -> 使用的是注册的变量名
# 自定义udf函数中接受列名参数 -> 将列名中每行数据作为参数值传入到自定义udf函数中
df_csv.select('name', 'age', em_func('email')).show()
df_csv.select('name', 'age', em_func('email')[0].alias('user_name'), em_func('email')[1].alias('company_name')).show()
# SQL方式中使用自定义udf函数 -> 使用的是注册的新函数名
print('=' * 40)
df_csv.createTempView('stu')
ss.sql("select name, age, em_func(email) from stu").show()
ss.sql("select name, age, em_func(email)[0] as user_name from stu").show()
# 需求: 使用自定义udf函数, 对age列进行分组, age<30->青年 30<=age<60->中年 age>=60->老年, 保存到新列中
def age_group(x):
if x < 30:
return '青年'
elif 30 <= x < 60:
return '中年'
else:
return '老年'
# 将age_group函数注册到spark中
# 自定义udf函数返回值类型是str类型时可以不需要通过returnType参数说明
age_group = ss.udf.register('age_group', age_group)
df_csv.select('name', 'age', age_group('age').alias('age_group')).show()
- 自定义UDAF函数
spark和pandas的df互相转换 ss
from pyspark.sql import SparkSession
import pandas as pd
# 创建ss对象
ss = SparkSession.builder.getOrCreate()
# 创建pandas的df对象
pandas_df = pd.DataFrame(data={
'id': [1, 2, 3],
'name': ['张三', '李四', '王五'],
'age': [20, 22, 19]
})
print(pandas_df)
# 将pandas的df对象转换成spark的df对象
spark_df = ss.createDataFrame(data=pandas_df)
spark_df.show()
spark_df.printSchema()
# 将spark的df对象转换成pands的df对象
pandas_df2 = spark_df.toPandas()
print(type(pandas_df2))
print(pandas_df2)
自定义UDAF函数操作
- 需要先在python解释器中安装 pyspark[sql] 模块
- 安装pyspark[sql]模块实际上是安装以下两个模块
py4j模块: spark底层是使用java开发的, 此模块可以将pandas的代码转换成java运行
arrow模块: 可以通过spark.sql.execution.arrow.pyspark.enabled参数提高pandas/numpy中数据的传输速度
步骤
- 自己定义一个python函数, 多对一关系, 对df中的多行数据经过函数处理返回一行结果
- 使用装饰器注册方式将自定义UDAF函数注册到spark中 @pandas_udf
- 只能在DSL方式中使用
"""
自定义UDAF函数步骤:
① 自己定义一个python函数, 多对一关系, 对df中的多行数据经过函数处理返回一行结果
② 使用装饰器注册方式将自定义UDAF函数注册到spark中 @pandas_udf
只能在DSL方式中使用
"""
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *
ss = SparkSession.builder.getOrCreate()
# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
df_groupby = df_csv.groupby('gender').avg('age')
df_groupby.show()
# 使用UDAF函数实现mean函数功能
# ①定义UDAF函数
"""
注意点:
①说明自定义UDAF函数参数的类型 参数名:数据类型
str1: str = 'hello'
②说明自定义UDAF函数返回值的类型
def 函数名() -> 数据类型:
pass
"""
# 函数的参数是series对象
# 函数的返回值是float类型
# ②使用装饰器注册方式将自定义UDAF函数注册到spark中
@pandas_udf(returnType=FloatType())
def avg_age(age: pd.Series) -> float:
print(f"age的值是->{age}")
result = age.mean()
return result
# DSL方式
df_csv.select(avg_age('age')).show()
# UDAF函数不能直接在SQL方式中使用
# 但是可以将UDAF函数注册成UAF函数然后再在SQL方式中使用, 不需要说明返回值类型
avg_age = ss.udf.register('avg_age', avg_age)
df_csv.createTempView('stu')
df_sql = ss.sql('select avg_age(age) from stu group by gender')
df_sql.show()
- 函数的参数是series对象
- 函数的返回值是float类型
- 使用装饰器注册方式将自定义UDAF函数注册到spark中
- UDAF函数不能直接在SQL方式中使用
- 但是可以将UDAF函数注册成UAF函数然后再在SQL方式中使用, 不需要说明返回值类型
Spark on Hive 模式
- spark只是用于计算引擎, 不存储数据
- 数据读写默认使用的是hadoop的HDFS组件
- spark和hive可以兼容, 可以读写hive映射的数据表, 将元数据交给hive的metastore服务进行管理
row对象->行数据, 存储在HDFS
schema对象->表信息数据, 通过hive的metastore服务进行存储(MySQL)
交互式开发
- 启动hive的metastore服务 hive --service metastore &
- 启动sparkSQL spark.sql.warehouse.dir:数据存储位置
spark-sql --master yarn --conf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse
脚本式开发
from pyspark.sql import SparkSession
# 创建ss对象
# spark.sql.warehouse.dir: 数据存储位置
# enableHiveSupport: 开启hive支持
ss = (SparkSession.builder.
config('spark.sql.warehouse.dir', 'hdfs://192.168.88.100:8020/user/hive/warehouse').
enableHiveSupport().getOrCreate())
df1 = ss.sql('show databases;')
df1.show()
# 创建数据库
ss.sql('create database if not exists itheima;')
# 创建数据表
ss.sql('create table if not exists itheima.tb1(id int,name string,gender string,age int);')
df = ss.createDataFrame(data=[[1, '张三', '男', 20],
[2, '李四', '男', 20]],
schema='id int,name string,gender string,age int')
ss.sql('use itheima;') # 选择数据库
# 将df数据存储到对应数据库表中, 如果没有选择数据库, 存储在default数据库中
# format: 使用hive格式, 表存在时需要指定, 如果表不存在, 可以不需要指定
df.write.saveAsTable(name='tb1', mode='append', format='hive')
- 创建ss对象
- spark.sql.warehouse.dir: 数据存储位置
- enableHiveSupport: 开启hive支持
SparkSQL的catalyst引擎
- catalyst将SparkSQL的SQL代码转换成RDD任务, 由spark计算引擎执行RDD任务
- 解析器
将SQL语句解析成语法树, 生成未解析的逻辑查询计划- 分析器
在语法树上绑定SQL函数以及数据类型, 生成解析的逻辑查询计划- 优化器 -> 最核心部分
使用谓词下推和列值裁剪方式优化逻辑查询计划, 生成优化的逻辑查询计划
谓词下推 -> 调整SQL执行的顺序, 提高计算效率
列值裁剪 -> 选择SQL需要使用的字段, 提高计算效率- 执行器
将逻辑查询计划转换为物理查询计划
根据物理查询计划生成RDD代码, 提交给spark计算引擎执行
SparkSteaming
- 读取、输出流数据
读取text文件 df = ss.readStream.option('maxFilesPerTrigger', 1).text(path='/data/data_text')
读取csv文件 df = ss.readStream.csv('/data/data_csv', sep=':', schema='id int,name string,age int,gender string')
读取json文件 df = ss.readStream.json('/data_json', schema='id int,name string,age int,gender string')
输出流数据 df.writeStream.start(format='console').awaitTermination()
- 输出模式
append | complete | update |
---|---|---|
只展示新增的数据行计算结果 | 展示所有的数据行计算结果 | 只展示新增的数据行计算结果 |
支持select,where等操作的df | 支持聚合操作的df(分组聚合, 聚合后排序) | 展示修改后的数据计算结果 |
没有聚合操作,等同于append; 有聚合操作就是展示修改后的数据计算结果 | ||
不支持排序操作的df |
df.writeStream.start(format='console', outputMode='update').awaitTermination()
# outputMode
# append模式, 默认模式, where/select
# complete模式, 聚合/聚合排序
# update模式, 没有聚合等同于append模式, 不支持排序操作
- 输出位置
file sink | kafka sink | foreach sink | foreachbatch sink | memory sink | console sink |
---|---|---|---|---|---|
将流计算结果保存到文件中 | 将流计算结果保存到kafka中 | 将流计算结果的 == 每行数据 == 经过自定义函数处理 | 将流计算结果转换成离线计算结果 == df == 经过自定义函数处理 | 将流计算结果以数据表形式保存到== 内存 ==中, 可以使用SQL语句处理 | 将流计算结果在控制台/终端展示 |
只支持append模式, 并且要求设置checkpoint目录 | 支持append/complete/update模式, 要求设置checkpoint目录 | 支持append/complete/update模式 | 支持append/complete/update模式 | 支持append/complete模式 | 支持append/complete/update模式 |
自定义函数可以实现任意操作逻辑(对计算结果进行排序操作,聚合排序的计算结果保存到文件中) | 自定义函数可以实现任意操作逻辑(对计算结果进行排序操作,聚合排序的计算结果保存到文件中,计算结果保存到传统数据库) | ||||
定义函数中需要定义一个参数, 参数为df的行数据 | 自定义函数中需要定义两个参数, 第一个参数df, 第二个参数batch_id | 不适用于数据量大的场景, 容易造成内存溢出, 测试场景 | 测试场景 |