说明:
本章主要分享Spark自定义函数的使用,catalyst以及sparksql与hive的联动
自定义函数
分类
UDF:一对一关系,输出一行数据得到一行结果,可以自定义
UDAF:聚合函数,多对一关系,输入多行数据经过函数以后输出一行计算结果,通常与groupBy联合使用
UDTF:一对多的关系,输入一行数据经过函数以后输出多行数据(一行变为多行)
使用
自定义函数需要在Spark中注册后使用
两种注册方式:
普通注册: ss.udf.register(注册到spark的函数名, 自定义udf函数名, 自定义函数返回值类型)
-
既可以在DSL方法中使用, 也可以在sql语句中使用
# UDF(user defined function) 自定义函数
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import re
ss = SparkSession.builder.getOrCreate()
# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
def func(email):
# 接受参数可以定义多个,根据需要处理的字段进行定义
# 每一次接收一行中的一个指定字段内容
print(email)
# 正则表达式匹配, 三个分组, 通过分组获取需要的信息
r = re.match('(\w+)@(\w+)[.](\w+)', email)
# 获取第一组数据, name
name = r.group(1)
# 获取第二组数据, company
company = r.group(2)
# 返回name和company的列表
return [name, company]
# 自定义udf函数注册到spark
# 第一个参数 注册到spark的函数名
# 第二个参数 自定义的函数名
# 第三个参数 指定函数返回值的类型, 默认字符串类型
# ArrayType(StringType()): 在spark中列表类型定义成数组类型, 数组中元素值类型也需要指定
em_func = ss.udf.register('em_func', func, returnType=ArrayType(StringType()))
df_res = df_csv.select(em_func('email')[0].alias('name'),em_func('email')[1].alias('company'))
df_res.show()
df_csv.createTempView('stu')
df_res2 = ss.sql('select em_func(email)[0] as name,em_func(email)[1] as company from stu')
df_res2.show()
装饰器注册: @udf(返回值类型)
-
装饰器函数可以没有参数, 默认返回值类型是 StringType() @udf
-
只能在DSL方法中使用
# UDF(user define function) 自定义函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import re
ss = SparkSession.builder.getOrCreate()
# 读取学生数据
df_csv = ss.read.csv('hdfs://node1:8020/data/stu.csv', sep=',',
schema='name string,age int,gender string,phone string,email string,city string,address string')
df_csv.show()
@F.udf(returnType=ArrayType(StringType()))
def func2(address):
res = address.split(' ')
return res
df_res = df_csv.select(func2('address')[0].alias('detail_address'), func2('address')[1].alias('code'))
df_res.show()
Spark&Hive的交互操作
明确
python代码方式操作的是Dataframe数据(row和schema)
SQL工具操作 row数据保存在hdfs上 schema 通过metastore保存在mysql上
使用纯sql进行数据的计算 spark on hive(把hive的metastore服务拿过来给spark做元数据管理)
SparkSQL使用方式
交互式开发
流程:先启动hive的metastore服务,在启动sparksql
nohup hive --service metastore &
spark-sql
spark-sql --conf spark.sql.warehouse.dir=hdfs://node1:8020/user/hive/warehouse
spark-sql --master yarn --name spark_on_hive
脚本式开发
from pyspark.sql import SparkSession
# 创建SparkSession对象
ss = SparkSession.builder. \
config('spark.sql.warehouse.dir', 'hdfs://192.168.88.100:8020/user/hive/warehouse'). \
enableHiveSupport(). \
getOrCreate()
# 编写sql语句
df = ss.sql('show databases;')
df.show()
# 创建数据库
ss.sql('create database if not exists itheima;')
df.show()
# 创建数据表
ss.sql('create table if not exists itheima.tb1(id int,name string,gender string,age int);')
SparkSQL引擎-catalyst
概念
catalyst是spark sql的调度核心,遵循传统数据库查询解析步骤,对sql进行解析,转换为逻辑查询计划,物理查询计划,最终转化为Spark的DAG后再执行
spark sql语句通过catalyst转化成最终执行的rdd代码进行执行
组件介绍
catalyst-Parser
将SQL语句解析为语法树(AST),也就是未解析的逻辑查询计划。Parser简单来说是将SQL字符串切分成一个一个Token,再根据一定语义规则解析为一棵语法树。(ANTLR实现)
catalyst-Analyzer
对逻辑查询计划进行属性和关系关联检验,也就是通过定义的一系列规则将未解析的逻辑查询计划借助catalog去解析,如将之前提到的未解析的逻辑查询计划转换成解析后的逻辑查询计划。(再次遍历整个语法树,对树上的每个节点进行数据类型绑定以及函数绑定)
catalyst-Optimizer
通过逻辑查询优化将逻辑查询计划转化为优化的逻辑查询计划,优化器是整个Catalyst的核心。
catalyst-QueryPlanner
将逻辑查询计划转换为物理查询计划,调整数据分布,最后将物理查询计划转换为执行计划进入Spark执行任务
标签:知识点,06,string,df,ss,sql,Spark,csv,spark From: https://blog.csdn.net/a666b777/article/details/140253497