快速开始
创建环境
Spark程序主要分为三个阶段:
- 创建环境
- 数据操作
- 关闭环境(在Streaming程序中为执行环境)
下面是批处理的Spark SQL API的创建环境的类:SparkSession,其目的是为了创建基本的Spark SQL的环境。
from pyspark.sql import SparkSession
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.getOrCreate()
注意:一定要有“\”
其中还可以指定操作,比如连接Mongodb的操作,支持Hive的操作,具体的写法类似于如下:
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
数据操作
创建DataFrames
创建DataFrames其实和python中的Datarame类似,也是一种表的表达方式。在Saprk中创建DataFrame的来源有很多,很多中API都是通过RDD来创建的,或者读取本地文件;但是在实际的任务中最常见的还是从数据库(Hive,Mongodb,MySQL)中创建DataFrame。由于这里是快速上手所以,该例子只使用read.json()
文件内容为:
{"name":"Kone"}
{"name":"Alices", "age":30}
{"name":"Bob", "age":19}
代码:
from pyspark.sql import SparkSession
if __name__ == '__main__':
spark=SparkSession.builder\
.appName("Spark SQL basic test")\
.config("spark.some.config.option", "some-value")\
.getOrCreate()
df=spark.read.json("your json file path")
df.show()
输出为:
+----+------+
| age| name|
+----+------+
|null| Kone|
| 30|Alices|
| 19| Bob|
+----+------+
在DataFrames上的操作
上文讲过DataFrame相当于一个表,则对表有很多结构化的操作,那么其实MySQL的相关操作都可在Spark DataFrame上实现。下面是几种常见的操作:
- 访问数据
在访问数据中存在两种访问:属性(df.age)和索引(df["age"])。在Spark中更加建议后一种访问方式:
df=spark.read.json(jsonPath)
df.select(df.age).show() # 使用属性来访问数据
df.select(df["age"]).show() # 使用索引来访问数据
df.select("name").show() # 使用select来访问数据
输出:
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
+------+
| name|
+------+
| Kone|
|Alices|
| Bob|
+------+
- 输出DataFrame的结构
df.printSchema()
输出:
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
- 对DataFrame的一列进行操作
df.select(df["name"], df["age"]+1).show()
输出:
+------+---------+
| name|(age + 1)|
+------+---------+
| Kone| null|
|Alices| 31|
| Bob| 20|
+------+---------+
- 对表进行筛选
下面筛选出name的以“K”开头的字符串。
df.filter(df["name"].startswith("K")).show()
输出:
+----+----+
| age|name|
+----+----+
|null|Kone|
+----+----+
- 聚集操作
原数据Json文件为:
{"name":"Kone"}
{"name":"Alices", "age":30}
{"name":"Bob", "age":19}
{"name":"Kven", "age":19}
{"name":"Dven", "age":18}
代码为:
df.groupBy(df["age"]).count().show()
输出:
+----+-----+
| age|count|
+----+-----+
| 19| 2|
|null| 1|
| 18| 1|
| 30| 1|
+----+-----+
使用反射推断模式
Spark SQL可以将Row对象的RDD转化为DataFrame,从而推断数据类型。行是通过将键值对列表作为kwargs传递给Row类来构造。
标签:指南,__,name,show,Python,age,df,SparkSQL,Spark From: https://www.cnblogs.com/ALINGMAOMAO/p/17118157.html