通用的加载和保存方式
- 这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parque
1. 加载数据
spark.read.load是加载数据的通用方法,支持的数据源格式:
scala> spark.read.
csv jdbc load options parquet table textFile
format json option orc schema text
- 如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
- format("…"):指定加载的数据类型,包括csv、jdbc、json、orc、parquet和textFile
- load("…"):在csv、jdbc、json、orc、parquet和textFile格式下需要传入加载数据的路径
- option("…"):在jdbc格式下需要传入JDBC相应参数,url、user、password和dbtable
直接在文件上进行查询
文件格式.'文件路径'
scala>spark.sql("select * from json.`hdfs://hadoop102:8020/datas/user.json`").show
2. 保存数据
df.write.save是保存数据的通用方法
scala> df.write.
bucketBy format jdbc mode options parquet save sortBy
csv insertInto json option orc partitionBy saveAsTable text
- 如果保存不同格式的数据,可以对不同的数据格式进行设定
scala>df.write.format("…")[.option("…")].save("…")
- format("…"):指定保存的数据类型,包括csv、jdbc、json、orc、parquet和textFile
- save ("…"):在csv、orc、parquet和textFile格式下需要传入保存数据的路径
- option("…"):在jdbc格式下需要传入JDBC相应参数,url、user、password和dbtable
保存操作可以使用SaveMode, 用来指明如何处理数据,使用mode()方法来设置
df.write.mode("append").json("/opt/module/data/output")
Parquet
- SparkSQL的默认数据源为Parquet格式,Parquet是一种能够有效存储嵌套数据的列式存储格式
- 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
- 修改配置项spark.sql.sources.default,可修改默认数据源格式
1. 加载数据
scala> val df = spark.read.load("examples/src/main/resources/users.parquet")
scala> df.show
2. 保存数据
scala> var df = spark.read.json("/opt/module/data/input/people.json")
//保存为 parquet 格式
scala> df.write.mode("append").save("/opt/module/data/output")
JSON
- SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]
- Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串
- 导入隐式转换
import spark.implicits._
{"name":"Michael"}
{"name":"Andy", "age":30}
[{"name":"Justin", "age":19},{"name":"Justin", "age":19}]
1. 加载JSON文件
val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)
2. 创建临时表
peopleDF.createOrReplaceTempView("people")
3. 数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13
AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+
CSV
1. 读取csv文件
spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()
2. 写入csv文件
personDF.write.csv("D:\\data\\output\\csv")
MySQL
1. 读取MySQL表
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
2. 写入MySQL表
val prop = new Properties()
prop.setProperty("user","root")
prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
标签:jdbc,scala,json,SparkSQL,格式,spark,csv,Spark,加载
From: https://www.cnblogs.com/shihongpin/p/18429645