首页 > 其他分享 >Spark超全笔记 一站式搞定!!

Spark超全笔记 一站式搞定!!

时间:2024-08-22 09:24:24浏览次数:7  
标签:搞定 超全 show df age rdd print Spark select

spark

Spark

Spark和Hadoop的区别

sparkhadoop
基于内存的计算工具基础平台(HDFS/MapReduce/YARN)
RDD中间计算结果存储在内存MapReduce中间计算结果存储在HDFS磁盘
线程方式计算进程方式计算
迭代计算、交互式计算、流计算大规模数据的离线计算

Spark计算流程

在这里插入图片描述

  1. 将RDD计算交给yarn的RM管理
  2. RM随机找一个NM创建container执行计算任务(application master)
  3. application master和RM保持通讯请求计算资源, 找到其他的NM创建container
  4. 其他的NM中的container执行map和reduce阶段任务
  5. map阶段从hdfs上读取数据进行分布式计算
  6. reduce阶段将map阶段的计算结果合并后保存到hdfs上

Spark组成架构(spark的五大组件)

  1. spark core:编写RDD代码, 处理RDD基本数据类型的数据, 有许多处理RDD的算子(方法), 其他组件的底层数据类型
  2. spark sql:编写DataFrame/DataSet代码, 处理DataFrame/DataSet数据类型的数据, 直接提供SQL编写方式处理spark任务
  3. structured/spark streaming:流计算(实时计算)组件, 数据以数据流形式处理
  4. spark ml/mllib:机器学习组件, 使用算法进行数据处理, 推荐系统等
  5. spark graphx:图计算组件, 控制图、并行图操作和计算的一组算法和工具的集合

Spark内核调度流程

  1. 创建sc对象时, 创建driver进程, 调用三个scheduler类创建三个scheduler Backend, DAG scheduler和Task scheduler对象
  2. scheduler Backend和RM保持通讯, 申请计算资源, 获取executor进程的信息
  3. 执行action算子时,产生job, 将job交给DAG scheduler, 由DAG scheduler根据宽依赖划分stage阶段, 分析stage中的task, 生成task描述保存到taskset中, 将taskset交给Task scheduler
  4. Task scheduler给taskset中的每个task分配计算资源, 维护task和executor之间关系, 同时需要管理task计算任务队列顺序
  5. scheduler Backend将分配好资源的task线程任务分发给executor进行执行
    在这里插入图片描述

Spark并行度

作用:提高spark程序执行效率, 合理分配资源

  1. 资源并行度

资源指的是spark集群中的节点数和cpu资源
主要调整集群的executors节点数和cpu的cores核数

资源并行度又称为物理并行度

  • executor
  1. 节点数,集群服务器数量
  2. 理论上服务器数量越多,执行效率越高
  3. –num-executors参数调整节点数
  • CPU

核心数, 一个核执行一个task线程任务

  1. –executor-cores参数调整核心数
  2. 核心数决定多任务执行方式
    多核, 并行执行(同时执行多个task线程任务)
    单核, 并发执行(交替执行多个task线程任务)

内存, 内存大小决定执行效率

  1. –executor-memory参数调整内存大小
  1. 数据并行度
  1. 数据指的是RDD中存储的要处理的数据
  2. 可以调整RDD的分区数
  3. 分区数=task线程数
  4. 一个线程任务由一个核执行
  1. 数据并行度又称为逻辑并行度
  2. 官方声称理论上分区数等于spark集群的总核数, 实现一个核对应一个task线程任务
  3. 实际工作中,每个分区的数据量大小是不一样的, 如果核数等于分区数, 数据量小的核会计算完成处理等待状态(造成资源浪费), 分区数是集群总核数的2~3倍(合理使用集群资源)
    在这里插入图片描述

RDD

RDD的五大特性

分区、只读、依赖、缓存、checkpoint

RDD的创建

  • python数据转换为rdd
一般使用列表类型转换为RDD rdd = sc.parallelize([1,2,3])

from pyspark import SparkContext

sc = SparkContext()

# 使用sc对象的下的parallelize方法将python数据转化为rdd
# int_data = 123  不能转化
str_data = 'abc'
list_data = [1,2,3]
dict_data = {'a':1,'b':2}
tuple_data = (1,2,3)
set_data = {1,2,3}
int_data = 123

a1 = sc.parallelize('abc')
b1 = sc.parallelize([1,2,3])
c1 = sc.parallelize({'a':1,'b':2})
d1 = sc.parallelize((1,2,3))
e1 = sc.parallelize({1,2,3})
# f1 = sc.parallelize(123) # 报错 TypeError: 'int' object is not iterable

# print(type(a1))

a2 = a1.collect()
b2 = b1.collect()
c2 = b1.collect()
d2 = c1.collect()
e2 = e1.collect()
# f2 = f1.collect()


# print(type(a2))
print(a2)
print(b2)
print(c2)
print(d2)
print(e2)
# print(f2)

在这里插入图片描述

.collect() 用于将spark RDD转换为python列表

  1. 理解为将RDD数据收集起来 然后转换为python的列表形式 这样我们就可以打印查看内容,不然直接的RDD类型我们无法直接打印输出
  2. 在调用collect()函数之后,RDD中的所有元素都会被拉取到Spark集群的driver节点上,形成一个列表,然后可以对该列表进行进一步的处理或分析
    在这里插入图片描述
  • 文件数据转换为RDD
from pyspark import SparkContext


# 创建sc对象
sc = SparkContext()

# 读取文件数据转换成RDD, 文件中的一行数据对应rdd的一个元素
# 本地磁盘文件 file
rdd1 = sc.textFile(name='file:///root/data/employees.json')
print(rdd1.collect())

# 文件要先上传到HDFS中
# HDFS文件 hdfs
rdd2 = sc.textFile(name='hdfs://192.168.88.100:8020/data/words.txt')
print(rdd2.collect())


# 读取目录下的所有文件转换成RDD
# 本地磁盘目录
rdd3 = sc.textFile(name="file:///root/data")
print(rdd3.collect())
# HDFS目录, 默认读取HDFS上的文件
rdd4 = sc.textFile(name='/data')
# print(rdd4.collect())
  1. 读取文件数据: 指定文件路径
  2. 读取目录下的所有文件数据: 指定目录路径
  3. 路径:可以是本地磁盘路径 file://文件绝对路径
    也可以是HDFS路径(默认是HDFS) hdfs://ip:port/文件路径 port:8020(服务端端口号) 可以省略hdfs://ip:port
  • RDD分区设置

rdd的分区就是将数据拆分成多份,一个分区的数据就对应着一个task线程,cpu的一个核处理一个task线程

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()
# 查看默认分区数
print(sc.defaultParallelism)

# sc.parallelize(c=, numSlices=)
# numSlices: 设置分区数
rdd1 = sc.parallelize(c=[1, 2, 3, 4, 5])
print(rdd1.collect())
# 查看分区数据
print(rdd1.glom().collect())


rdd2 = sc.parallelize(c=[1, 2, 3, 4, 5], numSlices=4)
# 查看分区数据
print(rdd2.glom().collect())
print(rdd2.collect())

# sc.textFile(name=, minPartitions=)
# minPartitions: 设置最小分区数, 实际分区数有可能比设置的分区数要多1个分区
# 文件中的一行数据表示rdd的一个元素, 对数据进行分区时是不会将一行数据进行拆分
"""
1B=1字节, 1K=1024B=1024字节
单个文件分区规则:
文件字节数/分区数 = 分区字节数...余数
余数/分区字节数 = 结果值  结果值 大于10% 会多创建

目录下多个文件分区规则:
所有文件字节数/分区数 = 分区字节数...余数
每个文件字节数/分区字节数 = 分区数...余数
余数/分区字节数 = 结果值  结果值 大于10% 会多创建
"""

rdd3 = sc.textFile(name='/data/words.txt')
print(rdd3.glom().collect())
rdd4 = sc.textFile(name='/data/words.txt', minPartitions=4)
print(rdd4.glom().collect())

rdd5 = sc.textFile(name='/data', minPartitions=3)
# print(rdd5.glom().collect()) # 打印出一堆数据
  • 读取小文件数据转换为RDD

通过wholeTextFiles()方法读取目录小多个小文件数据时以文件为单位合并到分区中, 此时就减少了分区数

from pyspark import SparkContext

sc = SparkContext()

# 读取文件数据转换成RDD
# 读取多个小文件数据时, 一个文件数据对应一个分区, 一个分区对应一个task线程任务, 此时会产生多个task线程
# 每个分区的数据大小很小的, 此时会造成资源浪费问题, 可以通过wholeTextFiles方法解决此问题
rdd1 = sc.textFile('/data', minPartitions=3)
print(rdd1.glom().collect())

rdd2 = sc.wholeTextFiles('/data')
print(rdd2.glom().collect())

RDD常用算子

  • 算子分类
transformation 转换算子action 执行算子
rdd调用此类算子得到一个新的rddrdd调用此类算子得到最终结果, 不再是一个rdd
使用此类算子不会触发spark的job, 类似于多任务中定义了一个任务使用此类算子会触发spark的job, 类似与多任务中执行了一个任务
"""
transformation算子: 定义一个job
action算子: 执行一个job
"""
from threading import Thread


def func1():
    print("正在跳舞...")


def func2():
    print('正在唱歌...')


if __name__ == '__main__':
    # 定义线程对象执行多任务, transformation算子
    t1 = Thread(target=func1)
    t2 = Thread(target=func2)
    print(t1)
    print(t2)
    # 执行多任务, action算子
    t1.start()
    t2.start()
常用transformation算子

map算子

from  pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4])

def func1(x):
    print(f"x的值是:{x}")
    return x+1

rdd2 = rdd1.map(f=lambda x: x + 1)
print(rdd2)
print(type(rdd2))
print(rdd2.collect())

rdd3 = rdd2.map(f=func1)
print(rdd3.collect())

def func2(x):
    return [x]

rdd4 = rdd1.map(func2)
print(rdd4.collect())

# 需求: rdd1->['hello', 'world', 'spark', 'hello'] 转换成 rdd2
# ->
# [('hello', 1), ('world', 1), ('spark', 1), ('hello', 1)]
a = sc.parallelize(['hello', 'world', 'spark', 'hello'])

# 第一种做法
def func3(x):
    return (x,1)

rdd5 = a.map(func3)
print(rdd5.collect())

# 第二种做法
rdd6 = a.map(lambda x:(x,1))
print(rdd6.collect())

在这里插入图片描述

  1. map接受一个函数名(引用)
  2. 将rdd1中每个元素经过func函数处理后, 将func函数返回值保存到新的rdd2中
  3. rdd1->[1,2,3,4] 经过func x+1处理 得到rdd2->[2,3,4,5]
  4. 注意点: map算子不会改变原rdd元素经过func函数处理后的返回值结构rdd1->[1,2,3,4] 经过func list()处理 得到rdd2->[[1],[2],[3],[4]]

flatMap算子

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize([[1, 2, 3], [4, 5, 6]])

# 定义func1函数
def func1(x):
    print(f"x的值是:{x}")
    return x

# 调用map算子
rdd1_map = rdd1.map(f=func1)
print(rdd1_map.collect())
# 调用map算子
rdd1_map = rdd1.map(f=func1)
print(rdd1_map.collect())

# 给rdd1中每个子列表在末尾添加一个数字10
list1 = [1, 2, 3, 4]
list1.append(10)
print(list1)


def func2(x):
    x.append(10)
    return x

# append方法是没有返回值,是将None值返回, 将None值保存到新的rdd中
rdd1_append = rdd1.map(lambda x: x.append(10))
print(rdd1_append.collect())


rdd1_append2 = rdd1.map(func2)
print(rdd1_append2.collect())  # [[1, 2, 3, 10], [4, 5, 6, 10]]


# 调用flatMap
# append方法没有返回值, 将None值返回, None值类型是不可以拆分,
# 所以报错 TypeError: 'NoneType' object is not iterable
# rdd1_append3 = rdd1.flatMap(lambda x: x.append(10))
# print(rdd1_append3.collect())
rdd1_append3 = rdd1.flatMap(func2)
print(rdd1_append3.collect())

# 需求: rdd1->['hello,world,spark', 'hive,spark,world']
# 转换成rdd2->['hello', 'world', 'spark', 'hive', 'spark', 'world']
a = sc.parallelize(['hello,world,spark', 'hive,spark,world'])
rdd2 = a.flatMap(lambda x: x.split(','))
print(rdd2.collect())

在这里插入图片描述

  1. flatMap接受一个函数名(引用), 一般用于处理嵌套结构rdd, rdd中的每个元素是容器类型(str/list/set/tuple/dict)
  2. flatMap接受的函数需要返回一个容器类型, 容器才可以拆分
  3. 将rdd1中每个元素经过func函数处理后, 将func函数返回值拆分之后保存到新的rdd2中
  4. rdd1->[[1,2,3,4], [2,3,4,5]] 经过func append(10)处理 得到rdd2->[1,2,3,4,10,2,3,4,5,10]
  5. 注意点: flatMap算子会改变原rdd元素经过func函数处理后的返回值结构, 将返回值容器拆分开保存到新rdd中

filter算子 判断

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize([1, 2, 3, 4])

int1 = 1
print(int1 % 2 == 0)

rdd2 = rdd1.filter(lambda x:x%2 == 0)
print(rdd2.collect())

# 需求: 获取rdd1中年龄信息小于25岁的数据  获取rdd1中性别为男的数据
a = sc.parallelize([[1, '小明', 20, '男'],
                       [2, '小红', 16, '女'],
                       [3, '老王', 35, '男'],
                       [4, '催化', 24, '女']])

# 利用下标获取数据
rdd3 = a.filter(lambda x: x[2] < 25)
print(rdd3.collect())

rdd4 = a.filter(lambda x: x[3] == '男')
print(rdd4.collect())

在这里插入图片描述

  1. func传递匿名函数, 匿名函数实现对rdd中每个元素进行判断(返回True或False)
  2. rdd1中每个元素进过函数处理,保留函数为True对应的元素, 实现过滤rdd中的元素操作(数据清洗)

distinct算子 去重

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize([1, 2, 3, 4, 1, 5, 2, 6, 7])

rdd2 = rdd1.distinct()
print(rdd2.collect())

在这里插入图片描述

groupBy&mapValues算子

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()

rdd1 = sc.parallelize([1, 2, 3, 4, 1, 5, 2, 6, 7])

# 将rdd1中的数据分成奇数组(余数为1)和偶数组(余数为0)
# 计算rdd1中每个元素和2相除的余数, 余数作为新rdd的key值,余数对应的x值作为新rdd的value值
rdd2 = rdd1.groupBy(lambda x: x % 2)
print(rdd2.collect())

# mapValues: 对k-v数据格式的rdd的v值经过函数处理
rdd3 = rdd2.mapValues(lambda x: list(x))
print(rdd3.collect())

# 需求: 将rdd1中的数据按照性别进行分组
a = sc.parallelize([[1, '小明', 20, '男'],
                       [2, '小红', 16, '女'],
                       [3, '老王', 35, '男'],
                       [4, '翠花', 24, '女']])

rcc1 = a.groupBy(lambda x: x[3])
print(rcc1.collect())

rcc2 = rcc1.mapValues(lambda x: list(x))
print(rcc2.collect())

在这里插入图片描述

  1. 将rdd1中的元素经过func函数处理, 将func函数的返回值作为分组key, 将rdd1中的元素作为分组value, 实现分组操作
  2. groupBy算子返回k-v数据格式的rdd, k就是分组名,value就是组内的元素 [[‘a’,(1,2,3)],[‘b’,(4,5,6)]]
  3. 将k-v数据格式的rdd中的value值经过func函数处理, 将返回的新value值保存到新rdd2中

groupbykey&mapValues算子

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])

# groupbykey
# rdd2=rdd1.groupByKey(): 根据rdd中的key值对rdd的元素进行分组操作, 将相同key值对应的value值保存到一起
rdd_groupbykey = rdd1.groupByKey()
print(rdd_groupbykey.collect())
rdd_mapvalues = rdd_groupbykey.mapValues(lambda x: list(x))
print(rdd_mapvalues.collect())

在这里插入图片描述

没有预聚合操作

reducebykey算子

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])


# reducebykey
# 分组求和操作
# 将rdd根据key分组, 同一组中的value值进行求和操作
def func(x,y):
    print(f'x的值是:{x}')
    print(f'y的值是:{y}')
    return x + y

rdd_reducebykey1 = rdd1.reduceByKey(func)
rdd_reducebykey2 = rdd1.reduceByKey(func=lambda x, y: x + y)
rdd_reducebykey3 = rdd1.reduceByKey(lambda x, y: x + y)
print(rdd_reducebykey1.collect())
print(rdd_reducebykey2.collect())
print(rdd_reducebykey3.collect())

在这里插入图片描述

  1. 将rdd中的相同key值对应的所有value经过func函数聚合操作, 分组聚合
  2. 有预聚合操作

sortbykey算子

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])

# sortbykey
# rdd1.sortByKey(ascending=True, numPartitions=None, keyfunc=None):
# 根据key值对应rdd中的元素进行排序操作, 默认升序
# 升序
rdd_sortbykey1 = rdd1.sortByKey()
print(rdd_sortbykey1.collect())
# 降序
rdd_sortbykey2 = rdd1.sortByKey(ascending=False)
print(rdd_sortbykey2.collect())

在这里插入图片描述

sortby算子

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])

# sortby
# 根据func函数指定的rdd中的元素值对rdd进行排序操作, 默认升序
# # x->('a', 1)
rdd_sortby = rdd1.sortBy(keyfunc=lambda x: x[1], ascending=False)
print(rdd_sortby.collect())

在这里插入图片描述

join算子

from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])

#join
# 创建RDD
rdd2 = sc.parallelize([("a", 1), ("b", 4)])
rdd2 = sc.parallelize([("a", 2), ("a", 3)])

# join 返回交集
rdd_join = rdd1.join(other=rdd2)
print(rdd_join.collect())

# leftOuterJoin
# 保留左rdd中所有key值对应的value值, 右rdd中关联不上的用None值代替value值
rdd_leftouterjoin = rdd1.leftOuterJoin(other=rdd2)
print(rdd_leftouterjoin.collect())

在这里插入图片描述

常用action算子

collect()算子

from pyspark import SparkContext

# 创建sc对象
sc = SparkContext()
# 创建rdd
rdd = sc.parallelize([1, 2, 3, 4])

# collect() 收集rdd中所有元素数据,返回列表类型 使用之前需要评估rdd中的数据量(数据量大容易造成内存溢出问题)
print(rdd.collect())
print(type(rdd.collect()))

在这里插入图片描述

take()算子

收集指定num数的rdd中的元素数据,返回列表类型

# take()  收集指定num数的rdd中的元素数据,返回列表类型
print(rdd.take(num=2))
print(type(rdd.take(2)))

在这里插入图片描述

reduce()算子

将一维rdd中的每个元素经过func函数进行聚合操作

# reduce()  将一维rdd中的每个元素经过func函数进行聚合操作
result = rdd.reduce(lambda x, y: x + y)
print(result)

在这里插入图片描述

countByValue()算子

print(result.items()) # 以字典的形式返回
print(result.keys()) # 获取字典中的key出现的次数
​print(result.values()) # 获取字典中的value出现的次数

# countByValue()  统计rdd中每个元素出现的次数, 以key->rdd的元素, value->次数形式保存到字典中
result = rdd.countByValue()
print(result)           #以元组的形式返回
print(result.items())   # 以字典的形式返回
print(result.keys())    # 获取字典中的key出现的次数
print(result.values())  # 获取字典中的value出现的次数

在这里插入图片描述

count()算子

统计rdd中的元素个数

# count() 统计rdd中的元素个数
print(rdd.count())

在这里插入图片描述

k-v类型数据输出为字典数据

如果RDD中的键有重复,那么每个键在字典中只会出现一次。对于键值对中的重复键,collectAsMap()只会保留该键最后一次出现时对应的值

  • print(rdd_kv.collectAsMap())

# k-v类型数据输出为字典数据
rdd_kv = sc.parallelize([('a', 1), ('b', 2), ('a', 2), ('b', 5)])

# collectAsMap()  将kv格式的rdd保存到字典中, rdd中的key就是字典中的key,rdd中的value就是字典中的value
# 如果RDD中的键有重复,那么每个键在字典中只会出现一次。
# 对于键值对中的重复键,collectAsMap()只会保留该键最后一次出现时对应的值
print(rdd_kv.collectAsMap())

在这里插入图片描述

  • result = rdd_kv.countByKey()
# countByKey()  统计kv格式的rdd中key值出现的次数, 返回字典类型 rdd的key就是字典中的key, key出现的次数就是字典中的value
result = rdd_kv.countByKey()
print(result)
print(result.items())
print(result.keys())
print(result.values())

在这里插入图片描述

  • rdd_kv.saveAsTextFile(path=‘/data/data’)
# saveAsTextFile()
#将rdd保存到指定目录下的文件中,一个分区对应一个文件 注意: path是一个目录路径, 并且目录是不存在的, 可以保存到hdfs或本地磁盘中
# 如果目录存在发生报错: Output directory hdfs://node1:8020/data/data already exists
rdd_kv.saveAsTextFile(path='/data/data')

RDD缓存和checkpoint对比

缓存checkpoint
生命周期spark程序结束后缓存的rdd会自动被销毁spark程序结束后checkpoint的rdd不会被销毁
存储位置缓存的rdd是优先存储在内存中, 如果内存不足可以通过设置缓存级别存储在本地磁盘中checkpoint的rdd是存储在HDFS上, 实现了分块多副本
依赖关系缓存的rdd不会删除依赖关系, 缓存rdd丢失还可以通过依赖关系计算得到checkpoint的rdd会删除依赖关系, HDFS上的数据不会自动删除(分块多副本机制)

RDD依赖

  • 新rdd是由旧rdd计算得到的, 新旧rdd之间旧存在依赖关系(因果关系) (rdd1->rdd2->rdd3->rdd4)

  • 依赖分类

窄依赖宽依赖
父RDD中的一个分区数据最多只能被子RDD中的一个分区所使用父RDD中的一个分区数据被子RDD中的多个分区所使用
父RDD分区和子RDD分区是一对一关系父RDD分区和子RDD分区是一对多关系
触发窄依赖算子:map、flatmap、fliter触发宽依赖算子:groupby、groupbykey、sortby、sortbykey、reducebykey、distinct

在这里插入图片描述
|在这里插入图片描述

依赖管理

DAG有向无环图
  1. spark graphx组件中的一种图计算算法\
  2. 管理RDD之间的依赖关系, spark程序根据依赖关系执行
  3. DAG根据RDD之间的依赖关系进行计算步骤的划分, 一个计算步骤就是一个stage阶段
  4. DAG遇到宽依赖关系的RDD就会划分一个新的stage, 如果都是窄依赖关系的RDD, 它们是在同一个stage中
    在这里插入图片描述
    在这里插入图片描述
  5. spark的WEB UI界面查看有向无环图和计算流程
    APP -> spark程序(一个python文件/一个交互式界面)
    在这里插入图片描述
    job -> spark计算任务(由rdd的action算子触发得到的)
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
为什么要进行stage划分
  1. spark的task是以线程方式执行, 线程执行时会抢夺计算资源
  2. 划分stage之后, 同一个stage阶段中的多个线程可以同时计算
  3. 下一个stage阶段任务要同时计算时需要等上一个stage阶段的所有task线程任务执行完后再计算
  4. 遇到宽依赖关系的算子就会划分新的stage阶段
    在这里插入图片描述

SparkSQL

概念及特点

概念

  1. spark五大组件之一, 处理结构化数据
    DataFrame -> python spark
    DataSet -> java/scala spark
  2. 使用纯SQL或编程语言的DSL方式进行计算
  3. 使用SparkSQL组件处理数据比Spark Core更加简洁, 但是spark sql底层还是转换成RDD任务提交执行

特点

  1. 融合性
    sparkSQL支持纯sql代码编写, 类似于hiveSQL
    sparkSQL支持多种编程语言的DSL代码编写方式(将SQL中关键字转换成df对象的方法使用, 类似于pandas的df操作)
  2. 统一数据访问
    使用统一的API实现读写数据
    读:使用== ss.read.xxx() ==方式读取各种存储工具中的数据
    写:使用 == df.write.xxx() ==方式将df数据保存到各种存储工具中
  3. 兼容hive
    使用hiveSQL中的方法处理数据
    将sparkSQL的计算结果直接映射成hive数据表
  4. 标准化数据连接
    可以使用pycharm/datagrip工具连接各种数据库编写sql代码
    支持jdbc和odbc两种方式连接sparkSQL(读写HDFS数据, HDFS和Hive映射)

SparkSQL和HiveSQL比较

1. hiveSQL
通过hive工具HDFS上的文件数据映射成数据表
数据表的元数据是交给hive的metastore服务管理, 存储到对应的数据库中(mysql,derby)
hive提供sql编写代码方式执行代码, 底层通过MapReduce计算引擎执行任务
2. hive on spark模式
编写hiveSQL代码, 借助shark引擎转换成spark的rdd任务, 交给spark计算引擎执行任务
3. spark on hive模式
开发了sparkSQL组件
编写sparkSQL代码执行任务, 借助catalyst引擎将sparkSQL转换成rdd任务交给spark计算引擎执行
sparkSQL中的数据表元数据由hive的metastore服务管理

Spark的数据类型

RDDDataFrameDataSet
spark core组件处理的数据类型spark SQL组件中处理的数据类型spark SQL组件中处理的数据类型
也是spark中的核心数据类型, 其他的组件数据最终都是要转换成rdd任务执行]结构化数据, 将RDD的数据映射成数据表(表结构和数据)结构化数据, 将DataFrame的数据映射成表数据
python,java,scala,R都有RDDpython,java,scala,R都有DataFrameDataSet中的一个元素就是一个DataFrame
二维的RDD只有数据,没有表结构(字段名,字段类型)java,scala有DataSet

DataFrame数据类型

概念

  1. 以列形式构成的弹性分布式数据集合
  2. DataFrame是由行对象和表结构对象组成的二维结构化数据
    行对象 -> 一行数据就是一个行对象
    schema对象(表结构) -> 字段名和字段类型

各类获取方式

# df是由row对象和schema对象组成
from pyspark.sql import Row  # row对象的类


# 创建row对象, 一个row对象代表一行数据
row1 = Row(id=1, name='小明', age=18)
row2 = Row(id=2, name='小红', age=20)
print(row1)
print(row2)
print(type(row1))

# 获取对象的属性 对象名.属性名
print(row1.id)
print(row1.name)
print(row1.age)

# 获取对象的属性  对象名[属性名]
print(row2['age'])

# row对象的属性值不能修改 Exception: Row is read-only
# row1.id = 100
# print(row1.id)

from pyspark.sql.types import *  # 字段类型对象的类
# 创建schema对象  表结构->字段名和字段类型
# 通过StructType类创建schema对象
# 通过schema对象的add方法添加字段名/字段类型/是否允许为null
# field: 字段信息, 可以写字段名也可以写字段对象
# data_type: 字段类型, 类型对象, 从pyspark.sql.types中获取类型对象
# nullable: 字段是否允许为null, 默认True

schema1 = (StructType()
            .add(field='id', data_type=IntegerType(), nullable=True)
            .add(field ='name',data_type=StringType(),nullable=False)
            .add(field='age',data_type=FloatType()))

# 获取schema对象中的某个字段
print(schema1['id'])
print(schema1[1])

# 通过StructType类传递fields属性值创建schema对象
schema2 = StructType(fields=[StructField('id',IntegerType(),True),
                             StructField('name',StringType())])
print(schema2)

# add()方法中的field参数传递StructField对象创建schema对象
schema3 = (StructType().
           add(field=StructField('id',IntegerType(),True))
           .add(field=StructField('name',StringType(),True)))
print(schema3)

在这里插入图片描述

DataFrame创建

createDataFrame创建
schema对象 =StructType(). add(field=‘age’, data_type=IntegerType())

# 注意点
# 1、定义的schema信息中的字段个数要和数据中的个数保持一致
# 2、定义的schema信息中的字段类型要和数据中的类型保持一致

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row

# 创建ss对象
# SparkSession.builder: 类名.类属性名
# builder: 类属性名 等于Builder() ->Builder类的对象
# getOrCreate: Builder类的对象的方法
ss = SparkSession.builder.getOrCreate()  # 和sc差不多的入口

# df对象由row对象和schema组成
# row对象
row1 = Row(id=1, name='小明', age=18)
row2 = Row(id=2, name='小红', age=16)

# schema对象
schema = (StructType().
          add(field='id', data_type=IntegerType(), nullable=True).
          add(field='name', data_type=StringType(), nullable=False).
          add(field='age', data_type=IntegerType()))

# 创建df对象
# 调用sparksession对象中的createDataFrame方法
# [row1, row2] -> 二维数据结构 结构化数据
# df = ss.createDataFrame(data=[row1, row2], schema=schema)
df = ss.createDataFrame(data=[[1, '小明', 18],
                              [2, '小王', 19],
                              [3, '小李', 20]], schema=schema)
# 查看df数据
df.show()

# 查看df的表结构
df.printSchema()

在这里插入图片描述

RDD转DF
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# 创建ss对象
ss = SparkSession.builder.getOrCreate()
# 创建sc对象
# sparkContext不要加括号, 通过@property装饰器装饰的方法在调用时就可以不用加括号
sc = ss.sparkContext

# 创建rdd对象
# 二维数据结构的rdd
rdd = sc.parallelize([[1, '张三', 20],
                      [2, '李四', 22]])

# 一维数据结构的rdd
# rdd = sc.parallelize([1, '张三', 20])
print(rdd.collect())

# 调用rdd的toDF算子转成df对象
df1 = rdd.toDF(schema='id int,name string,age int')
print(df1)
df1.show()
df1.printSchema()

schema = (StructType()
          .add('id',IntegerType())
          .add('name',StringType(),nullable=False)
          .add('age',IntegerType()))

df2 = rdd.toDF(schema=schema)
df2.show()
df2.printSchema()

# df对象转换成rdd对象
# rdd中的每个元素就是df中每个row对象, df中的一行数据就是一个row对象
# 过@property装饰器装饰的方法在调用时就可以不用加括号

new_rdd = df2.rdd
print(df2.rdd.collect())
print(type(new_rdd))
print(new_rdd.collect())

# result列表类型 [row1, row2, ...]
result = new_rdd.collect()
for row in result:
    print(row)
    print(row.id)
    print(row.name)
    print(row.age)
    print(row['age'])
    print(row[1])


# x->Row(id=1, name='张三', age=20)
# x.id -> 获取row对象的id属性
rdd_map = new_rdd.map(lambda x:x.id)
print(rdd_map.collect())

# 停止程序
ss.stop()

在这里插入图片描述
在这里插入图片描述

DataFrame基本使用

基本DSL方法
  1. 查询 df.select()
  2. 字符串表达式 df.selectExpr(字符串表达式)
  3. 条件判断 df.where(‘age>23’)
  4. 分组聚合 df_select6.groupBy(‘gender’).avg(‘age’)
  5. 排序 df_select6.orderBy(‘age’, ascending=True)
  6. 限制行数 df_select6.limit(num=40)
  7. 打印 df_select6.show(n=30, truncate=False)
  8. 新增列操作 .withColumn(colName=‘new_age’, col=df_select6.age + 100).show()
  9. 修改列名操作 .alias(‘new_name’)) .withColumnRenamed(‘avg(age)’, ‘new_age’) .toDF(‘major’, ‘gender’, ‘avg_age’)
  10. 是否包含’人‘这个字 再通过filter判断 df.filter(df[‘对手’].contains(‘人’)).show()
  11. 以’熊’结尾的 df.filter(df[‘对手’].endswith(‘熊’)).show()
  12. 判断数据是否为空返回以‘灰’开头的like df.filter(df[‘对手’].like(‘灰%’)).show()
  13. 如果对手是灰熊 flag列返回1 不是则返回0 df.withColumn(‘flag’,F.when(df[‘对手’]==‘灰熊’,1).otherwise(0)).show()
# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
from pyspark.sql import SparkSession

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
res1 = rdd.take(5)
print(f'rdd文件数据:{res1}')

# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.take(5)
print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string'
)

# 查看转化的df数据
df.show()

print('======================== select ================================')
# select 字段名1, 字段名2, .. from df
# 查询所有字段数据
df_select1 = df.select('*')
df_select1.show()

# 查询指定字段数据
# 字段名
df_select2 = df.select('id')
df_select2.show()

# df[字段名]
df_select3 = df.select('id', df['name'])
df_select3.show()

# df.字段名
df_select4 = df.select('id', df['name'], df.age)
df_select4.show()

# 给字段设置别名
# select 字段名 as 新字段名 from df
# df[列名].alias('别名') -> 字段的alias方法
df_select5 = df.select('id', df['name'].alias('new_name'), df.age.alias('new_age'))
df_select5.show()

# 修改字段数据类型
# select cast(字段名 as 新数据类型) from df
df_select6 = df.select(df['id'].astype('int'), df['name'], 'gender', df.age.cast('int'),
                       'birthday', df.major, df.hobby, df.create_time)
df_select6.show()
df_select6.printSchema()

print('======================== selectExpr ================================')
# df.selectExpr(字符串表达式)
df_selectexpr1 = df_select6.selectExpr("id", "age*2 as new_age", "cast(age as string)", "name as new_name")
df_selectexpr1.show()
df_selectexpr1.printSchema()

print('======================== where ================================')
# df.where(条件表达式)
# 获取年龄大于23的学生数据
# select * from df where age > 23
df_where1 = df.where('age>23')
df_where1.show()
df_where2 = df.where(df['age']>23)
df_where2.show()


# 获取年龄大于23并且性别为男的学生数据
# select * from df where age>23 and gender='男'
df_where3 = df.where('age>23 and gender="男"')
df_where3.show()
# &->and |->or
df_where4 = df.where((df['age']>23) & (df['gender']=='男'))
df_where4.show()

print('======================== groupby ================================')
# 分组聚合操作
# select gender, avg(age) from df group by gender
df_groupby1 = df_select6.groupBy('gender').avg('age')
# df_groupby1 = df_select6.groupBy('gender').mean('age')
df_groupby1.show()

df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age', 'id')
df_groupby2.show()

# 分组聚合过滤 where等同于having
# select gender, avg(age) from df group by gender having avg(age)>23
df_groupby3 = df_select6.groupby(['major', 'gender']).avg('age').where('avg(age)>21.5')
df_groupby3.show()

print('======================== orderBy ================================')
# df_select6.sort()
# 一列数据排序
# select * from df order by age asc/desc
df_orderby1 = df_select6.orderBy('age', ascending=True)
df_orderby1.show()
# 多列数据排序
df_orderby2 = df_select6.sort(df_select6['age'], df_select6['id'], ascending=False)
df_orderby2.show()
df_orderby3 = df_select6.sort(['age', 'id'], ascending=[True, False])
df_orderby3.show()
df_orderby4 = df_select6.sort(df_select6['age'].desc(), df_select6['id'].asc())
df_orderby4.show()

print('======================== limit ================================')
# 限制df的行数
df_limit = df_select6.limit(num=40)
print(df_limit.count())
df_limit.show(n=100)

print('======================== show ================================')
# 打印df的条目数, 默认20条, 可以打印更多条数据
# limit是df只有num数量的条目数, show展示n数量的条目数
# truncate:默认是True, 是否取消显示省略
df_select6.show(n=30, truncate=False)

print('------------------withColumn 新增列操作-----------------------')
# df_select6.select('*', df_select6.age + 10).show()
# 在原df后边新增一列
df_select6.withColumn(colName='new_age', col=df_select6.age + 100).show()
df_select6.withColumn(colName='new_age', col=df_select6['age'] + 100).show()
# 新增一列常数列
from pyspark.sql.functions import lit
df_select6.withColumn(colName='new_age', col=lit(100)).show()


print('======================== 修改列名操作 ================================')
# df[列名].alias(新列名)
df_select6.select(df_select6['id'], df_select6['name'].alias('new_name')).show()
# withColumnRenamed(原列名, 新列名)
df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age').withColumnRenamed('avg(age)', 'new_age')
df_groupby2.show()
# df.toDF(新列名1, 新列名2, ...) -> 修改所有列的列名
df_groupby2 = df_select6.groupBy(['major', 'gender']).avg('age').toDF('major', 'gender', 'avg_age')
df_groupby2.show()

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row


ss = SparkSession.builder.getOrCreate()


# todo: 需求1-读取数据,转换每一行数据为Row
df = ss.read.csv('hdfs://192.168.88.100:8020/data/data.csv')
df.show()

df = df.withColumn('对手',df[1].cast(StringType()))
df = df.withColumn('胜负',df[2].cast(StringType()))
df = df.withColumn('主客场',df[3].cast(StringType()))
df = df.withColumn('命中',df[4].cast(IntegerType()))
df = df.withColumn('投篮数',df[5].cast(IntegerType()))
df = df.withColumn('投篮命中率',df[6].cast(FloatType()))
df = df.withColumn('3分命中率',df[7].cast(FloatType()))
df = df.withColumn('篮板',df[8].cast(IntegerType()))
df = df.withColumn('助攻',df[9].cast(IntegerType()))
df = df.withColumn('得分',df[10].cast(IntegerType()))
df.show()

# todo: 需求2:alias起一个别名
df.select(df['对手'].alias('比赛对手')).show(3)

# todo: 需求3:根据得分升序排列,并打印前5个对手和得分
df.select('对手','得分').orderBy(df['得分'].asc()).show(5)

# todo: between
# todo: 一个布尔表达式,如果该表达式的值在给定列之间,则计算为true。可用于筛选满足条件的Row
# todo: 需求4:筛选出得分在15-20之间数据(包含边界)

df1 = df.select('对手',df['得分']).show()
df2 = df.select('对手',df['得分'].between(15,20).alias('15-20得分'))
df2.filter(df2['15-20得分']==True).show()


# todo: 需求5:contains(other) 包含其他元素。 根据字符串匹配返回一个布尔列
df.filter(df['对手'].contains('人')).show()

# todo: endswith(other) boolen值 以other结尾的字符串
# todo: 需求6:以“熊”结尾的字符串
df.filter(df['对手'].endswith('熊')).show()

# todo: 需求9:判断数据是否为空返回以‘灰’开头的
# todo: like(other)
# todo: 类似于SQL中的like, 返回基于SQL
# todo: LIKE匹配的布尔列
df.filter(df['对手'].like('灰%')).show()
df.select('对手', '胜负', '主客场', '得分').where(df['对手'].like('灰%')).show()

# todo: otherwise(value)
# todo: 计算条件列表,并返回多个可能的结果表达式之一。
# todo: 需求10:增加标志列flag,将灰熊标志为1,其他对手标志为0
from pyspark.sql import functions as F
df.withColumn('flag',F.when(df['对手']=='灰熊',1).otherwise(0)).show()

# todo: when(condition, value)
# todo: 计算条件列表,并返回多个可能的结果表达式之一。 如果未调用Column.otherwise(),则对于不匹配的条件,将不返回None。
# todo: 需求11:查找得分大于25的对手,标记为1,否则标记为0,标记列名为‘score_flag’
df.select('对手','得分',F.when(df['得分']>25,1).otherwise(0).alias('score_flag')).show()

# todo: 需求11:# 求得分的均值
# todo: 聚合函数,等价于pandas中的df.groupby().agg()
a = df.agg({'得分':'mean'}).collect()
print(a)

# todo: 需求12:去重
# todo: 按行去重,返回去重后的DataFrame
df.select(['主客场', '命中']).distinct().show()

SQL语句

映射临时表 df.createTempView(‘words’)
sql写法 df_sql = ss.sql(“”“SQL语句”“”) df_sql.show()

# 使用DF的DSL方法实现对df数据处理
# DF的数据是结构化数据,所以DSL方法和SQL的关键字基本一致
from pyspark.sql import SparkSession

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
# res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,birthday string,'
           'major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

# SQL方式
# 将df映射成一张表
df.createTempView(name='stu')
sql_str = "select gender,avg(age) from stu group by gender"
# 执行sql语句, 返回df对象
df_groupby = ss.sql(sqlQuery=sql_str)
print(type(df_groupby))
df_groupby.show()
df_groupby.printSchema()

# DSL方式
df2 = df_groupby.select('gender', df_groupby['avg(CAST(age AS DOUBLE))'].alias('avg_age'))
df2.show()

在这里插入图片描述

高级DSL方法
  • 关联方法join 上下关联 union unionall unionByName
"""
df1.join(other=, on=, how=)
other:另外一个df对象
on:关联条件
how:关联方式 默认inner left right full

df1.crossJoin(other=) -> 交叉关联, 笛卡尔积

df1.union(other=) -> df上下合并并去重
df2.unionAll(other=) -> df上下合并不去重
df1.unionByName(other=) -> 根据列名上下合并, 列名不同的用null填充
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# 创建ss对象
ss = SparkSession.builder.getOrCreate()

# 创建sc对象
sc = ss.sparkContext

# 生成rdd
rdd1 = sc.parallelize([
    [1, 'zhangsan', 20],
    [2, 'lisi', 20],
    [3, 'wangwu', 22]
])

rdd2 = sc.parallelize([
    [3, 'zhaoliu', 20],
    [4, 'xiaoming', 21],
    [5, 'itcast', 22]
])

# 定义schema信息
schema_type = StructType(). \
    add('id', IntegerType()). \
    add('name', StringType()). \
    add('age', IntegerType(), False)

# rdd转换成df
df1 = rdd1.toDF(schema=schema_type)
df2 = rdd2.toDF(schema=schema_type)
df1.show()
df2.show()

# 交叉关联 -> 笛卡尔积
df1.crossJoin(df2).show()
#  交叉关联 -> 笛卡尔积
# other:另外一个df对象
df_inner1 = df1.join(other=df2)
df_inner1.show()

# 内连接
# on: 两个df相同的列名 -> 只保留一个相同列
df_inner2 = df1.join(other=df2, on='id', how='inner')
df_inner2.show()

# 左连接: 保留左df所有数据, 右df关联上的保留, 关连不上的用null填充
# on: df1.列名==df2.列名 -> 保留所有相同列
df_left = df1.join(df2, df1.id == df2.id, how='left')
df_left.show()

# 右连接
# on: df1[列名]==df2[列名] -> 保留所有相同列
df_right = df1.join(df2, df1['id'] == df2['id'], how='right')
df_right.show()

# 满外连接: 保留左右df的所有数据, 关联不上的就null填充
df_full = df1.join(df2, 'id', 'full')
df_full.show()

# 多个关联条件
df1.join(df2, ['id', 'name'], 'left').show()

# 上下关联
# 相同列名上下关联, 去重
df1.union(other=df2).show()
# 相同列名上下关联, 不去重
df1.unionAll(other=df2).show()
# 根据列名相同上下关联, 列名不同的用null填充
df1.unionByName(other=df2).show()
  • 缓存和checkpoint操作

1. 缓存 .persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
2. checkpoint操作 df.checkpoint()
从缓存或checkpoint中读取df
​如果缓存和checkpoint都在, 优先读取缓存

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import StorageLevel

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext
# 指定checkpoint目录路径
sc.setCheckpointDir('/dataframecheckpoint')

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
# res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
# res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,'
           'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

print('------------------------- 缓存 ---------------------------------')
# 对df进行缓存操作
# df.cache()
df.persist(storageLevel=StorageLevel.MEMORY_ONLY_2)
# df.unpersist()  # 手动释放缓存
print(df.is_cached)

print('------------------------- checkpoint ---------------------------------')
# 对df进行checkpoint操作
df.checkpoint()

# 从缓存或checkpoint中读取df
# 如果缓存和checkpoint都在, 优先读取缓存
df_select = df.select(df['id'].cast('int'), 'name', df.age.astype('int'))
df_select.show()
  • 数据清洗方法
  1. 去重操作 .dropDuplicates(subset=[‘age1’]).show()
  2. 缺失值处理操作
    删除.dropna(how=‘all’, subset=[‘name1’, ‘age1’]).show()
    填充 .fillna(value=‘小明’, subset=[‘name1’]).show()
  3. 删除列 .drop(‘name1’, ‘age2’).show()
  4. 替换操作 .replace(to_replace=20, value=888, subset=[‘age1’]).show()
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# 创建ss对象
ss = SparkSession.builder.getOrCreate()

# 创建sc对象
sc = ss.sparkContext

# 生成rdd
rdd1 = sc.parallelize([
    [1, 'zhangsan', 20],
    [2, 'lisi', 20],
    [3, 'wangwu', 22]
])

rdd2 = sc.parallelize([
    [3, 'zhaoliu', 20],
    [4, 'xiaoming', 21],
    [5, 'itcast', 22]
])

# 定义schema信息
schema_type = StructType(). \
    add('id', IntegerType()). \
    add('name', StringType()). \
    add('age', IntegerType(), False)

# rdd转换成df
df1 = rdd1.toDF(schema=schema_type)
df2 = rdd2.toDF(schema=schema_type)
df1.show()
df2.show()

# 满外连接
new_df = df1.join(df2, 'id', 'full').toDF('id', 'name1', 'age1', 'name2', 'age2')
new_df.show()

print('----------------------------- 去重操作 --------------------------------------')
# 删除重复数据
temp_df = new_df.unionAll(new_df)
temp_df.show()
# 所有列去重
temp_df.dropDuplicates().show()
# 根据指定的列进行去重
temp_df.dropDuplicates(subset=['age1']).show()

print('----------------------------- 缺失值处理操作 --------------------------------------')
# 缺失值处理
# 删除
new_df.dropna().show()
# how='all'表示仅在所有指定列中都为缺失值时才删除该行
# subset=['name1', 'age1']表示只在'name1'和'age1'这两列中查找和删除缺失值
new_df.dropna(how='all', subset=['name1', 'age1']).show()
# 填充
# 根据填充值的类型找到对应df类型列中的null进行填充
new_df.fillna(value=50).show()
new_df.fillna(value='小明').show()
# 对指定列中的null进行填充
new_df.fillna(value='小明', subset=['name1']).show()
# 对不同列中的null进行不同填充
new_df.fillna(value={'name1': '小明', 'name2': '小红'}).show()

print('----------------------------- 删除列操作 --------------------------------------')
# 删除列
# 删除一列
new_df.drop('name1').show()
# 删除多列
new_df.drop('name1', 'age2').show()

# 替换
print('----------------------------- 替换操作 --------------------------------------')
new_df.replace(to_replace='zhangsan', value='张三').show()
# 对指定的列进行值替换
new_df.replace(to_replace=20, value=888, subset=['age1']).show()
new_df.replace({20: 8888}).show()
  • 内置函数
让DSL能够调用SQL语法
from pyspark.sql import functions as F
  1. 字符串函数
  1. 拼接操作 .select(‘id’, F.concat(‘name’, ‘gender’).alias(‘name_gender’)).show() .select(‘id’, F.concat_ws(‘-’, ‘name’, ‘gender’).alias(‘name_gender’)).show()
  2. 分割操作 .select(‘id’, F.split(‘birthday’, ‘-’)).show()
  3. 截取操作 .select(‘id’, F.substring(‘birthday’, 2, 4).alias(‘year’)).show()
    截取部分 df.select(‘id’, F.substring_index(‘birthday’, ‘-’, 1/-1)).show()
  4. 正则替换操作 .select(‘id’, F.regexp_replace(‘birthday’, ‘-’, ‘/’)).show()
  5. 正则抽取操作 .select(‘id’, F.regexp_extract(‘birthday’, r’(.?)-(.?)-(.*)', 1)).show()
"""
F.concat(列名1, 列名2, ...): 将列对应的字符串进行拼接
F.concat_ws(拼接符, 列名1, 列名2, ...): 将列对应的字符串根据指定的拼接符拼接
F.split(字段名, 分割符): 将列根据指定的分割符进行分割, 返回列表
F.substring(字段名, 开始位置, 长度): 对列从开始位置截取指定长度的字符, 开始位置从1开始
F.substring_index(字段名, 截取符, 第n个截取符):
n: 正数, 从第n个截取符开始, 截取左侧部分的字符; 负数, 从第n个截取符开始, 截取右侧部分的字符
F.regexp_replace(字段名, 正则表达式, 替换后的值): 对指定列通过正则表达式进行替换
F.regexp_extract(字段名, 正则比倒是, 截取位置): 对指定列通过正则表达式截取指定位置的字符
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# from pyspark.sql.functions import split

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,'
           'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

print('----------------------------------- 拼接操作 -----------------------------------')
# concat()
df.select('id', F.concat('name', 'gender').alias('name_gender')).show()
# concat_ws()
df.select('id', F.concat_ws('-', 'name', 'gender').alias('name_gender')).show()

print('----------------------------------- 分割操作 -----------------------------------')
df.select('id', F.split('birthday', '-')).show()

df.select('id',
          F.split('birthday', '-')[0].alias('year'),
          F.split('birthday', '-')[1].alias('month'),
          F.split('birthday', '-')[2].alias('day')).show()

print('----------------------------------- 截取操作 -----------------------------------')
# substring(字段名, 开始位置, 长度)
df.select('id', F.substring('birthday', 0, 4).alias('year')).show()
df.select('id', F.substring('birthday', 1, 4).alias('year')).show()
df.select('id', F.substring('birthday', 2, 4).alias('year')).show()
# substring_index()
# 截取年份部分 正数->截取左侧部分
df.select('id', F.substring_index('birthday', '-', 1)).show()
# 截取年月部分
df.select('id', F.substring_index('birthday', '-', 2)).show()
# 截取天部分 负数->截取右侧部分
df.select('id', F.substring_index('birthday', '-', -1)).show()

print('----------------------------------- 正则替换操作 -----------------------------------')
# 将-替换成/
df.select('id', F.regexp_replace('birthday', '-', '/')).show()
# 将整数替换成/
df.select('id', F.regexp_replace('birthday', r'\d', '/')).show()

print('----------------------------------- 正则抽取操作 -----------------------------------')
# 获取第一组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 1)).show()
# 获取第二组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 2)).show()
# 获取第三组数据
df.select('id', F.regexp_extract('birthday', r'(.*?)-(.*?)-(.*)', 3)).show()
  1. 时间函数
  1. 获取当前时间 .select(F.current_date()).show()
  2. 获取当前日期时间 年月日时分秒毫秒 .select(F.current_timestamp()).show(truncate=False)
  3. 获取当前时间戳 秒 .select(F.unix_timestamp()).show(truncate=False)
  4. 日期时间格式化 .select(F.date_format(‘birthday’, ‘yyyy/MM/dd’)).show()
  5. unix时间转化 .select(F.from_unixtime(‘create_time’, ‘yyyy-MM-dd HH:mm:ss’)).show()
  6. 获取部分时间 F.year(‘birthday’) F.month(‘birthday’) F.dayofmonth(‘birthday’) F.quarter(‘birthday’)
  7. 时间运算
    加多少天 .select(F.date_add(‘birthday’, 10)).show()
    减多少天 .select(F.date_sub(‘birthday’, 10)).show()
    加多少个月 .select(F.add_months(‘birthday’, 2)).show()
    RDD 时间作差 .select(F.datediff(F.current_date(), ‘birthday’)).show()
    获取当前月的最后一天 .select(F.last_day(‘birthday’)).show()
    下一个星期日期 .select(F.next_day(‘birthday’, ‘Sun’)).show()
from pyspark.sql import SparkSession, functions as F

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,'
           'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

print('------------------------获取当前时间------------------------')
# 获取当前日期 年月日
df.select(F.current_date()).show()
# 获取当前日期时间 年月日时分秒毫秒
# show(truncate=False):以完整的格式显示选择的列的结果,不截断输出
df.select(F.current_timestamp()).show(truncate=False)
df.select(F.current_timestamp()).show()
# 获取当前时间戳 秒
df.select(F.unix_timestamp()).show(truncate=False)
df.select(F.unix_timestamp()).show()

print('------------------------日期时间格式化------------------------')
# 将日期时间格式数据进行格式化
# yyyy->年
# MM->月
# dd->日
# HH->小时
# mm->分钟
# ss->秒
df.select(F.date_format('birthday', 'yyyy/MM/dd')).show()
df.select(F.date_format('birthday', 'yyyy-MM')).show()

print('------------------------unix时间转化------------------------')
# 将unix时间戳转换成日期时间格式
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd HH:mm:ss')).show()
df.select(F.from_unixtime('create_time', 'yyyy-MM-dd')).show()

print('------------------------获取部分时间------------------------')
# 获取年份
df.select(F.year('birthday')).show()
# 获取月份
df.select(F.month('birthday')).show()
# 获取日
df.select(F.dayofmonth('birthday')).show()
# 获取季度
df.select(F.quarter('birthday')).show()

print('------------------------时间运算------------------------')
# 加多少天
df.select(F.date_add('birthday', 10)).show()
# 减多少天
df.select(F.date_sub('birthday', 10)).show()
# 加多少个
df.select(F.add_months('birthday', 2)).show()
# 时间作差
df.select(F.datediff(F.current_date(), 'birthday')).show()

# 获取当前月的最后一天
df.select(F.last_day('birthday')).show()
# 下一个星期日期
df.select(F.next_day('birthday', 'Sun')).show()
  1. 聚合函数
  1. 对某列直接聚合 返回一个值 .select(F.sum(‘age’), F.count(‘id’), F.countDistinct(‘id’)).show()
  2. 改变列的数据类型方便进行聚合计算 df.select(df.id.cast(‘int’), ‘name’, ‘gender’, df[‘age’].cast(‘int’), ‘birthday’, ‘major’, ‘hobby’,‘create_time’)
  3. 对一列数据进行分组聚合 .groupby(‘gender’).avg(‘age’).show() .groupby(‘gender’).agg(F.mean(‘age’).alias(‘mean_age’)).show()
  4. 对多列数据进行分组聚合 .groupby(‘gender’).agg({‘age’: ‘mean’, ‘id’: ‘count’}).show()
"""
sum->求和
max/min->最大/最小
mean/avg->求平均
count->计数
agg/aggregate->聚合操作, 结合聚合函数使用
"""
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,'
           'birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

# 对某列直接聚合, 返回一个值
df.select(F.sum('age'), F.count('id'), F.countDistinct('id')).show()

# 分组聚合
# df.id.cast('int')将id列转换为整数类型,
# 'name'、'gender'、'birthday'、'major'、'hobby'和'create_time'直接选择对应的列
# 而df['age'].cast('int')将age列转换为整数类型
df_select = df.select(df.id.cast('int'), 'name', 'gender',
                      df['age'].cast('int'),
                      'birthday', 'major', 'hobby','create_time')

# 对一列数据进行分组聚合
df_select.groupby('gender').avg('age').show()
df_select.groupby('gender').agg(F.mean('age').alias('mean_age')).show()

# 对多列数据进行分组聚合
# round(): 保留几位小数
df_select.groupby('gender').agg(F.round(F.mean('age'), 2).alias('mean_age'),
                                F.count('id').alias('count')).show()

df_select.groupby('gender').agg({'age': 'mean', 'id': 'count'}).show()

print(df_select.count())
  1. 其他函数
  1. 判断 .select(‘id’, ‘name’, ‘age’, F.when(F.col(‘age’) <= 30, ‘青年’) .when((df_select[‘age’] > 30) & (df_select[‘age’] <= 60),‘中年’) .otherwise(‘老年’).alias(‘age_stage’))
  2. 窗口函数 over(Window.partitionBy(‘字段’).orderBy(‘字段’))) from pyspark.sql import Window
  3. 排序函数() .over(Window.partitionBy(‘字段’).orderBy(‘字段’).desc())
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# 创建SparkSession对象
ss = SparkSession.builder.getOrCreate()

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()

df_select = df.select(df.id.cast('int'), 'name', 'gender', df['age'].cast('int'), 'birthday', 'major', 'hobby',
                      'create_time')

# SQL->case...when...
# when(条件表达式, 值)
# otherwise(以上所有条件不成立返回的值)
# F.col(列名)->获取某列数据
# age<=30->青年  30<age<=60->中年  age>60->老年
df_select.select('id', 'name', 'age',
                 F.when(F.col('age') <= 30, '青年')
                 .when((df_select['age'] > 30) & (df_select['age'] <= 60),'中年')
                 .otherwise('老年').alias('age_stage')).show()

# 窗口函数
# SQL->select 聚合函数 over() partition by 字段 order by 字段 desc from df
# over(Window.partitionBy('字段').orderBy('字段')))
from pyspark.sql import Window

# 聚合函数().over(Window.partitionBy('字段').orderBy('字段'))
df_select.select(df_select['id'], 'name', 'age', 'gender',
                 F.sum('age').over(Window.partitionBy('gender')).alias('sum_age')).show()
# 每组TOP-N
# 排序函数().over(Window.partitionBy('字段').orderBy('字段').desc())
temp_df = df_select.select('*', F.row_number()
                           .over(Window.partitionBy('gender').orderBy(F.col('age').desc()))
                           .alias('row_num'))
temp_df.show()
temp_df.where('row_num<=3').show()

SparkSession对象

  1. sparksession对象是通过SparkSQL入口类SparkSession创建得到的
  2. 可以在创建sparksession对象时选择spark部署方式,设置程序名称等一些其他参数配置
  3. SparkSQL中通过catalyst引擎将sql转换成rdd时,里面的优化器可以决定shuffle过程中的分区数(默认是200)
  4. 可以通过 spark.sql.shuffle.partitions 参数手动设置shuffle过程中的分区数
"""
sparksession对象是通过SparkSQL入口类SparkSession创建得到的
可以在创建sparksession对象时选择spark部署方式,设置程序名称等一些其他参数配置

SparkSQL中通过catalyst引擎将sql转换成rdd时,里面的优化器可以决定shuffle过程中的分区数(默认是200)
可以通过 spark.sql.shuffle.partitions 参数手动设置shuffle过程中的分区数
"""
from pyspark.sql import SparkSession, functions as F

# 创建sparksession对象
# 在builder和getOrCreate()之间配置其他参数
# master()->选择部署方式
# appName()->设置程序名称
# config(key, value)->设置其他参数
ss = (SparkSession.builder.
      master('yarn').
      appName('yarn_demo').
      config('spark.shuffle.sort.bypassMergeThreshold', '300').
      config('spark.sql.shuffle.partitions', '10').getOrCreate())

# 使用sparkcontext读取hdfs上的文件数据
sc = ss.sparkContext

# 将读取的文件数据转化为rdd
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')
res1 = rdd.collect()
# print(f'rdd文件数据:{res1}')
# 将每行字符串数据切割转为二维rdd
rdd_split = rdd.map(lambda x: x.split(','))
res2 = rdd_split.collect()
# print(f'切割后的rdd数据:{res2}')

# 将rdd转为df数据
df = rdd_split.toDF(
    schema='id string,name string,gender string,age string,birthday string,major string,hobby string,create_time string')
# 查看转化的df数据
df.show()


# 分组聚合操作会发生shuffle
# SparkSQL的shuffle过程, 分区数默认会调整为200个
df.groupby('gender').agg(F.avg('age').alias('avg_age')).show()

在这里插入图片描述

数据源和格式
  1. 读取数据
  1. json文件 df_json = ss.read.load(path=‘/data/stut.json’, format=‘json’)
  2. orc文件读取 df_orc = ss.read.orc(‘file:///export/server/spark/examples/src/main/resources/users.orc’) 本地磁盘文件
  3. parquet文件读取 df_parquet = ss.read.parquet(‘file:///export/server/spark/examples/src/main/resources/users.parquet’)
  4. mysql数据库读取 df_mysql = ss.read.jdbc(url=‘jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8’,table=‘uc_store’,properties={‘user’: ‘root’,‘password’: ‘123456’,‘driver’: ‘com.mysql.jdbc.Driver’})
"""
sparksession.read.xxx(path=,...)
sparksession.read.load(path=,format=,xxx)
可以读取HDFS或本地磁盘文件
"""

from pyspark.sql import SparkSession, functions as F

ss = SparkSession.builder.getOrCreate()

print('-------------------------------csv文件-------------------------------')
# path:文件路径, 默认hdfs上
# sep:分隔符, 默认逗号,
# inferSchema: 是否自动推断数据类型, 默认false
# schema: 指定表结构
df_csv = ss.read.csv(path='/data/stu.csv', sep=',', inferSchema=True,
                     schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
df_csv.printSchema()

print('-------------------------------txt文件-------------------------------')
# text文件中的数据类型是字符串类型
# 将数据保存到value列中, 文件中的一行数据就是df的一行数据
# df_txt = ss.read.text(paths='/data/stu.txt')
# df_txt.show()
df_txt = ss.read.load(path='/data/stu.txt', format='text')
df_txt.show()
df_txt.select(F.split('value', ',')[0].alias('id'),
              F.split('value', ',')[1].alias('name'),
              F.split('value', ',')[2].alias('gender')).show()

print('-------------------------------json文件-------------------------------')
# ss.read.json(path='/data/stut.json')
# 字典中的key就是列名
df_json = ss.read.load(path='/data/stut.json', format='json')
df_json.show()
df_json.printSchema()

print('-------------------orc文件读取---------------------')
# 本地磁盘文件
df_orc = ss.read.orc('file:///export/server/spark/examples/src/main/resources/users.orc')
df_orc.show()

print('-------------------parquet文件读取---------------------')
df_parquet = ss.read.parquet('file:///export/server/spark/examples/src/main/resources/users.parquet')
df_parquet.show()

print('-------------------mysql数据库读取---------------------')
# url: ip地址 端口号 数据库名
# table: 数据表
# column: 字段名
df_mysql = ss.read.jdbc(url='jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8',
                        table='uc_store',
                        properties={'user': 'root',
                                    'password': '123456',
                                    'driver': 'com.mysql.jdbc.Driver'})
# df_mysql.show()
# df_mysql.printSchema()

# 配置properties
properties = {
    'url': 'jdbc:mysql://node1:3306/shopnc_db?characterEncoding=UTF-8',
    'user': 'root',
    'password': '123456',
    'driver': 'com.mysql.jdbc.Driver',
    # 数据表
    # 'dbtable': 'uc_store'
    # 执行sql
    'query': 'select store_id, store_name from uc_store limit 10'
}
# **properties:将字典拆分成key=value形式, 关键字传参
df_mysql = ss.read.load(format='jdbc', **properties)
df_mysql.show()
  1. 保存数据
  1. text文件 df_text.write.save(path=‘/data/data_text’, format=‘text’, mode=‘overwrite’)
  2. csv文件 df.write.csv(path=‘/data/data_csv’, sep=‘:’, mode=‘overwrite’)
  3. orc文件 df.write.orc(‘hdfs://node1:8020/data_orc’, mode=‘overwrite’)
  4. parquet文件 df.write.parquet(‘hdfs://node1:8020/data_parquet’, mode=‘overwrite’)
  5. json文件 df.write.json(‘hdfs://node1:8020/data_json’, mode=‘overwrite’)
  6. mysql数据库 df.write.jdbc(url=‘jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8’,mode=‘overwrite’,properties={‘user’: ‘root’, ‘password’: ‘123456’, ‘driver’: ‘com.mysql.jdbc.Driver’})
"""
df.write.xxx(path=, mode=, xxx)
df.write.save(path=, format=, mode=, xxx)
路径是目录路径
"""
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

ss = SparkSession.builder.getOrCreate()

df = ss.createDataFrame([[1, '张三', 20, '男'],
                         [2, '李四', 20, '男']],
                        schema='id int,name string,age int,gender string')
df.show()

print('-------------------------text文件-------------------------')
# 将value字段中字符串类型数据写入到text文件中
df_text = df.select(F.concat_ws(',', 'id', 'name', 'age', 'gender').alias('value'))
df_text.show()
# 只能写入一次, 再次写入会报错
# df_text.write.text(path='/data/data_text')
# mode: 写入方式 append->追加写入 overwrite->覆盖写入
df_text.write.save(path='/data/data_text', format='text', mode='overwrite')

print('-------------------------csv文件-------------------------')
df.write.csv(path='/data/data_csv', sep=':', mode='overwrite')
# df.write.save(path='/data/data_csv', format='csv', sep=',', mode='append')

print('--------------------写入orc文件------------------')
df.write.orc('hdfs://node1:8020/data_orc', mode='overwrite')
print('--------------------写入parquet文件------------------')
df.write.parquet('hdfs://node1:8020/data_parquet', mode='overwrite')
print('--------------------写入json文件------------------')
df.write.json('hdfs://node1:8020/data_json', mode='overwrite')

print('--------------------写入mysql数据库------------------')
"""
options = {
    'url': 'jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8',
    'table': 'stu',
    'mode': 'overwrite',
    'user': 'root',
    'password': '123456',
    'driver': 'com.mysql.jdbc.Driver'
}
df.write.save(format='jdbc',**options)
"""
# 表不存在会自动创建, 也可以手动创建表(表结构要和df表结构一致)
df.write.jdbc(url='jdbc:mysql://node1:3306/BI_db?characterEncoding=UTF-8',
              table='stu',
              mode='overwrite',
              properties={'user': 'root', 'password': '123456', 'driver': 'com.mysql.jdbc.Driver'})

自定义函数

自定义函数分类

UDFUDAFUDTF
User-Defined-FunctionUser-Defined Aggregation Functionpython spark不能实现自定义
一对一, 一行数据经过自定义函数处理返回一行结果多对一, 多行数据经过自定义聚合函数处理返回一行结果一对多, 一行数据经过爆炸函数处理返回多行结果
concat/conca_ws/split/substring…sum/count/mean/max/min/…explode -> 将[值1, 值2, …]数据结构炸裂开, 返回多行结果
可以实现自定义可以实现自定义 -> 借助pandas中的series对象和dataframe对象User-Defined Table-Generating Function -> 爆炸函数

在这里插入图片描述

  • UDTF函数使用

df_explode = df_words.select(F.explode(‘words’).alias(‘words’))

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

ss = SparkSession.builder.getOrCreate()

df = ss.read.load('/data/words.txt',format='text')
df.show()

# todo: 1-DSL方式
# 将value字段中的字符串根据逗号进行分割, 返回列表类型
df_words = df.select(F.split('value',',').alias('words'))
df_words.show()

# 将[值1, 值2, ...]进行炸裂展开操作
df_explode = df_words.select(F.explode('words').alias('words'))
df_explode.show()

# 统计每个单词出现的次数 -> 分组聚合操作
df_count = df_explode.groupby('words').count()
df_count.show()

# todo: 2-SQL方式
df.createTempView('words')

df_sql = ss.sql("""with words as 
(select 
explode(split(value, ',')) as word 
from words) 
select 
word, count(*) as count 
from words group by word""")
df_sql.show()
  • 自定义UDF函数

自定义UDF函数实现步骤:

  1. 定义一个python函数, 实现一行一行处理df数据
  2. 将定义的python函数注册到spark中
    普通注册方式:
    变量名=ss.udf.register(新函数名, 自定义udf函数名, 自定义udf函数返回值类型)
    DSL和SQL方式都可以使用自定义UDF函数
    装饰器注册方式:
    from pyspark.sql.functions import udf
    @udf(returnType=)
    只能在DSL方式中使用自定义UDF函数
"""
自定义udf函数步骤:
① 自己定义一个python函数, 一对一关系
② 将自定义函数注册到spark中, 然后使用
普通注册方式:
变量名=ss.udf.register(新函数名, 自定义udf函数名, 自定义udf函数返回值类型)
可以在DSL方式和SQL方式中使用自定义udf函数
"""
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import re

ss = SparkSession.builder.getOrCreate()

# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
                     schema='name string,age int,gender string,phone string,'
                            'email string,city string,address string')
df_csv.show()

# 需求: 使用自定义udf函数, 获取email列中用户名信息和公司名信息, 需要使用正则表达式匹配

"""
str1 = '[email protected]'


def func1(x):
    result = re.match(r'(.*?)@(.*?)[.](.*)', x)
    user_name = result.group(1)
    company_name = result.group(2)
    return user_name, company_name


result = func1(str1)
print(result)
user_name, company_name = func1(str1)
print(user_name, company_name)
"""


# ①自定义udf函数
def func1(x):
    # 接收的是传入列名的每行数据值
    # print(f'x的值是->{x}')
    result = re.match(r'(.*?)@(.*?)[.](.*)', x)
    user_name = result.group(1)
    company_name = result.group(2)
    return user_name, company_name


# ②注册udf函数
# name:新函数名
# f:udf函数名
# returnType: udf函数返回值类型,
# 如果udf函数返回值是str类型, 可以不用写returnType参数, 默认是str类型
# 否则需要通过returnType参数指定udf函数返回值类型 -> spark中的数据类型, 返回值如果是嵌套数据都需要指定数据类型 -> ArrayType(StringType())
em_func = ss.udf.register(name='em_func', f=func1, returnType=ArrayType(StringType()))
# ③DSL方式中使用自定义udf函数 -> 使用的是注册的变量名
# 自定义udf函数中接受列名参数 -> 将列名中每行数据作为参数值传入到自定义udf函数中
df_csv.select('name', 'age', em_func('email')).show()
df_csv.select('name', 'age', em_func('email')[0].alias('user_name'), em_func('email')[1].alias('company_name')).show()
# SQL方式中使用自定义udf函数 -> 使用的是注册的新函数名
print('=' * 40)
df_csv.createTempView('stu')
ss.sql("select name, age, em_func(email) from stu").show()
ss.sql("select name, age, em_func(email)[0] as user_name from stu").show()

# 需求: 使用自定义udf函数, 对age列进行分组, age<30->青年 30<=age<60->中年 age>=60->老年, 保存到新列中

def age_group(x):
    if x < 30:
        return '青年'
    elif 30 <= x < 60:
        return '中年'
    else:
        return '老年'


# 将age_group函数注册到spark中
# 自定义udf函数返回值类型是str类型时可以不需要通过returnType参数说明
age_group = ss.udf.register('age_group', age_group)
df_csv.select('name', 'age', age_group('age').alias('age_group')).show()
  • 自定义UDAF函数

spark和pandas的df互相转换 ss

from pyspark.sql import SparkSession
import pandas as pd

# 创建ss对象
ss = SparkSession.builder.getOrCreate()

# 创建pandas的df对象
pandas_df = pd.DataFrame(data={
    'id': [1, 2, 3],
    'name': ['张三', '李四', '王五'],
    'age': [20, 22, 19]
})
print(pandas_df)

# 将pandas的df对象转换成spark的df对象
spark_df = ss.createDataFrame(data=pandas_df)
spark_df.show()
spark_df.printSchema()

# 将spark的df对象转换成pands的df对象
pandas_df2 = spark_df.toPandas()
print(type(pandas_df2))
print(pandas_df2)

自定义UDAF函数操作

  1. 需要先在python解释器中安装 pyspark[sql] 模块
  2. 安装pyspark[sql]模块实际上是安装以下两个模块
    py4j模块: spark底层是使用java开发的, 此模块可以将pandas的代码转换成java运行
    arrow模块: 可以通过spark.sql.execution.arrow.pyspark.enabled参数提高pandas/numpy中数据的传输速度

步骤

  1. 自己定义一个python函数, 多对一关系, 对df中的多行数据经过函数处理返回一行结果
  2. 使用装饰器注册方式将自定义UDAF函数注册到spark中 @pandas_udf
  3. 只能在DSL方式中使用
"""
自定义UDAF函数步骤:
① 自己定义一个python函数, 多对一关系, 对df中的多行数据经过函数处理返回一行结果
② 使用装饰器注册方式将自定义UDAF函数注册到spark中 @pandas_udf
只能在DSL方式中使用
"""
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *

ss = SparkSession.builder.getOrCreate()

# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
                     schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
df_groupby = df_csv.groupby('gender').avg('age')
df_groupby.show()

# 使用UDAF函数实现mean函数功能
# ①定义UDAF函数
"""
注意点:
①说明自定义UDAF函数参数的类型 参数名:数据类型
str1: str = 'hello'
②说明自定义UDAF函数返回值的类型 
def 函数名() -> 数据类型:
    pass
"""


# 函数的参数是series对象
# 函数的返回值是float类型
# ②使用装饰器注册方式将自定义UDAF函数注册到spark中
@pandas_udf(returnType=FloatType())
def avg_age(age: pd.Series) -> float:
    print(f"age的值是->{age}")
    result = age.mean()
    return result


# DSL方式
df_csv.select(avg_age('age')).show()

# UDAF函数不能直接在SQL方式中使用
# 但是可以将UDAF函数注册成UAF函数然后再在SQL方式中使用, 不需要说明返回值类型
avg_age = ss.udf.register('avg_age', avg_age)
df_csv.createTempView('stu')
df_sql = ss.sql('select avg_age(age) from stu group by gender')
df_sql.show()
  1. 函数的参数是series对象
  2. 函数的返回值是float类型
  3. 使用装饰器注册方式将自定义UDAF函数注册到spark中
  4. UDAF函数不能直接在SQL方式中使用
  5. 但是可以将UDAF函数注册成UAF函数然后再在SQL方式中使用, 不需要说明返回值类型

Spark on Hive 模式

  1. spark只是用于计算引擎, 不存储数据
  2. 数据读写默认使用的是hadoop的HDFS组件
  3. spark和hive可以兼容, 可以读写hive映射的数据表, 将元数据交给hive的metastore服务进行管理
    row对象->行数据, 存储在HDFS
    schema对象->表信息数据, 通过hive的metastore服务进行存储(MySQL)

交互式开发

  1. 启动hive的metastore服务 hive --service metastore &
  2. 启动sparkSQL spark.sql.warehouse.dir:数据存储位置
    spark-sql --master yarn --conf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse

脚本式开发

from pyspark.sql import SparkSession

# 创建ss对象
# spark.sql.warehouse.dir: 数据存储位置
# enableHiveSupport: 开启hive支持
ss = (SparkSession.builder.
      config('spark.sql.warehouse.dir', 'hdfs://192.168.88.100:8020/user/hive/warehouse').
      enableHiveSupport().getOrCreate())

df1 = ss.sql('show databases;')
df1.show()

# 创建数据库
ss.sql('create database if not exists itheima;')
# 创建数据表
ss.sql('create table if not exists itheima.tb1(id int,name string,gender string,age int);')
df = ss.createDataFrame(data=[[1, '张三', '男', 20],
                              [2, '李四', '男', 20]],
                        schema='id int,name string,gender string,age int')
ss.sql('use itheima;')  # 选择数据库
# 将df数据存储到对应数据库表中, 如果没有选择数据库, 存储在default数据库中
# format: 使用hive格式, 表存在时需要指定, 如果表不存在, 可以不需要指定
df.write.saveAsTable(name='tb1', mode='append', format='hive')
  1. 创建ss对象
  2. spark.sql.warehouse.dir: 数据存储位置
  3. enableHiveSupport: 开启hive支持

SparkSQL的catalyst引擎

  1. catalyst将SparkSQL的SQL代码转换成RDD任务, 由spark计算引擎执行RDD任务
  2. 解析器
    将SQL语句解析成语法树, 生成未解析的逻辑查询计划
  3. 分析器
    在语法树上绑定SQL函数以及数据类型, 生成解析的逻辑查询计划
  4. 优化器 -> 最核心部分
    使用谓词下推和列值裁剪方式优化逻辑查询计划, 生成优化的逻辑查询计划
    谓词下推 -> 调整SQL执行的顺序, 提高计算效率
    列值裁剪 -> 选择SQL需要使用的字段, 提高计算效率
  5. 执行器
    将逻辑查询计划转换为物理查询计划
    根据物理查询计划生成RDD代码, 提交给spark计算引擎执行

SparkSteaming

  • 读取、输出流数据
读取text文件  df = ss.readStream.option('maxFilesPerTrigger', 1).text(path='/data/data_text')
读取csv文件  df = ss.readStream.csv('/data/data_csv', sep=':', schema='id int,name string,age int,gender string')
读取json文件 df = ss.readStream.json('/data_json', schema='id int,name string,age int,gender string')
输出流数据  df.writeStream.start(format='console').awaitTermination()
  • 输出模式
appendcompleteupdate
只展示新增的数据行计算结果展示所有的数据行计算结果只展示新增的数据行计算结果
支持select,where等操作的df支持聚合操作的df(分组聚合, 聚合后排序)展示修改后的数据计算结果
没有聚合操作,等同于append; 有聚合操作就是展示修改后的数据计算结果
不支持排序操作的df
df.writeStream.start(format='console', outputMode='update').awaitTermination()

# outputMode
# append模式, 默认模式, where/select
# complete模式, 聚合/聚合排序
# update模式, 没有聚合等同于append模式, 不支持排序操作
  • 输出位置
file sinkkafka sinkforeach sinkforeachbatch sinkmemory sinkconsole sink
将流计算结果保存到文件中将流计算结果保存到kafka中将流计算结果的 == 每行数据 == 经过自定义函数处理将流计算结果转换成离线计算结果 == df == 经过自定义函数处理将流计算结果以数据表形式保存到== 内存 ==中, 可以使用SQL语句处理将流计算结果在控制台/终端展示
只支持append模式, 并且要求设置checkpoint目录支持append/complete/update模式, 要求设置checkpoint目录支持append/complete/update模式支持append/complete/update模式支持append/complete模式支持append/complete/update模式
自定义函数可以实现任意操作逻辑(对计算结果进行排序操作,聚合排序的计算结果保存到文件中)自定义函数可以实现任意操作逻辑(对计算结果进行排序操作,聚合排序的计算结果保存到文件中,计算结果保存到传统数据库)
定义函数中需要定义一个参数, 参数为df的行数据自定义函数中需要定义两个参数, 第一个参数df, 第二个参数batch_id不适用于数据量大的场景, 容易造成内存溢出, 测试场景测试场景

标签:搞定,超全,show,df,age,rdd,print,Spark,select
From: https://blog.csdn.net/weixin_72093863/article/details/141261042

相关文章

  • 计算机毕业设计Python深度学习游戏推荐系统 Django PySpark游戏可视化 游戏数据分析
    基于Spark的TapTap游戏数据分析系统技术栈:  -python  -django  -scrapy  -vue3  -spark  -element-plus  -echarts   功能板块:0.爬虫模块:  通过scrapy抓取taptap游戏网站数据,从分类页开始抓取全站游戏的数据1.首页......
  • 一个超全的go工具库Lancet
    文档官网https://www.golancet.cn安装使用go1.18及以上版本的用户,建议安装v2.x.x。因为v2.x.x应用go1.18的泛型重写了大部分函数。gogetgithub.com/duke-git/lancet/v2使用go1.18以下版本的用户,必须安装v1.x.x。目前最新的v1版本是v1.4.1。gogetgithub.c......
  • 文件加密管理软件超全盘点|我愿称这10款软件为加密天花板
    信息安全比任何时候都更加重要。无论是个人敏感信息、企业核心数据,还是创意无限的数字资产,都需要一把坚固的锁来保护。今天,就让我们一起盘点那些被誉为“加密天花板”的10款顶尖文件加密管理软件,它们不仅功能强大,而且操作便捷,让你的数据安全无忧!1.VeraCrypt作为TrueCrypt......
  • 如何写出高质量的论文?66AI论文一键轻松搞定
    在毕业论文写作中,如何高效、高质的进行论文写作是我们经常需要面对的问题。现在,随着AI技术的飞速进步,AI论文写作工具也愈发成熟,让我们得以告别那些漫长而疲惫的通宵写作之夜。作为一位AI工具的使用爱好者,在此分享这款经过我亲身试用、验证其可靠性的AI论文辅助神器。希望通过......
  • Spark MLlib 特征工程系列—特征提取LSH(BucketedRandomProjectionLSH)
    SparkMLlib特征工程系列—特征提取LSH(BucketedRandomProjectionLSH)在这篇文章中,我们将深入探讨Spark中的BucketedRandomProjectionLSH,这是一种用于近似最近邻搜索的技术。文章将覆盖其工作原理、应用场景、Scala代码示例、参数调优以及使用效果分析,确保内容全面、......
  • 计算机毕业设计django+hadoop+scrapy租房推荐系统 租房大屏可视化 租房爬虫 hadoop sp
    用到的技术:  1.python  2.django后端框架  3.django-simpleui,Django后台  4.vue前端  5.element-plus,vue的前端组件库  6.echarts前端可视化库  7.scrapy爬虫框架基于大数据的租房信息推荐系统包括以下功能:  数据爬取和清洗......
  • Spark MLlib 特征工程系列—特征转换Imputer
    SparkMLlib特征工程系列—特征转换Imputer什么是ImputerImputer是Spark中用于处理缺失数据的工具。在机器学习数据预处理中,缺失值是常见的问题。Imputer可以填充数值型数据中的缺失值,通过使用指定的策略(如均值、中位数)替换缺失值,从而提高数据质量并确保模型训练时......
  • Kettle PDI小白新手/进阶/必备 大数据基础之一数据清洗(ETL)基础进阶总结 1.6万字长文
    Kettle是一个开源的数据集成工具,主要用于ETL(抽取、转换、加载)过程。它的全名是PentahoDataIntegration(PDI),而Kettle是其早期的名字,Kettle在2006年被Pentaho收购后,正式更名为PentahoDataIntegration(PDI),因此现在更常被称为PDI。PDI仍然是Pentaho产品套件中的一个重要......
  • 一文搞定C语言文件常规I/O操作
    普通文件(OrdinaryFiles)普通文件,代指储存在硬盘中或外部媒体文件中的有序数据集。源文件(sourcefile),工程文件(objectfile),可执行文件(executablefile),乃至一组被处理的原始输入数据和输出结果均为普通文件。其中,源文件,工程文件等称之为程序文件,而对于输入输出的数据,则被......
  • Python解释器如何下载+如何安装+配置环境+踩坑 一文搞定【保姆级图文教程】
    如果你要学Python,那么第一件事情就是先去安装。因为你的电脑里面没有Python,编写好的.py文件就没有解释器可以运行,所以安装Python环境/解释器就是最重要的一件事。PS:本文仅为笔记,为个人整理而成,如有不足之处请多多指正。目录第一步:下载1.官方网站2.镜像源网站第二步......