3.4 Spark SQL 应用 3.4.1 创建 DataFrame/DataSet 方式一:读取本地文件 ① 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。 vim /root/person.txt 1 2 内容如下: 1 zhangsan 20 2 lisi 29 3 wangwu 25 4 zhaoliu 30 5 tianqi 35 6 kobe 40 1 2 3 4 5 6 7 ② 打开 spark-shell spark/bin/spark-shell ##创建 RDD val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]] 1 2 3 4 5 6 ③ 定义 case class(相当于表的 schema) case class Person(id:Int, name:String, age:Int) 1 2 ④ 将 RDD 和 case class 关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person] 1 2 ⑤ 将 RDD 转换成 DataFrame val personDF = personRDD.toDF //DataFrame 1 2 ⑥ 查看数据和 schema personDF.show 1 2 ⑦ 注册表 personDF.createOrReplaceTempView("t_person") 1 2 ⑧ 执行 SQL spark.sql("select id,name from t_person where id > 3").show 1 2 ⑨ 也可以通过 SparkSession 构建 DataFrame val dataFrame=spark.read.text("hdfs://node1:8020/person.txt") dataFrame.show //注意:直接读取的文本文件没有完整schema信息 dataFrame.printSchema 1 2 3 4 方式二:读取 json 文件 val jsonDF= spark.read.json("file:///resources/people.json") 1 2 接下来就可以使用 DataFrame 的函数操作 jsonDF.show 1 2 注意:直接读取 json 文件有schema 信息,因为json文件本身含有Schema信息,SparkSQL 可以自动解析。 方式三:读取 parquet 文件 val parquetDF=spark.read.parquet("file:///resources/users.parquet") 1 2 接下来就可以使用 DataFrame 的函数操作 parquetDF.show 1 2 注意:直接读取 parquet 文件有 schema 信息,因为 parquet 文件中保存了列的信息。 3.4.2 两种查询风格:DSL 和 SQL DSL风格示例: personDF.select(personDF.col("name")).show personDF.select(personDF("name")).show personDF.select(col("name")).show personDF.select("name").show 1 2 3 4 5 SQL 风格示例: spark.sql("select * from t_person").show 1 2 总结: DataFrame 和 DataSet 都可以通过RDD来进行创建; 也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema; 通过 josn/parquet 会有完整的约束; 不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL! 3.4.3 Spark SQL 多数据源交互 读取 json 文件: spark.read.json("D:\\data\\output\\json").show() 1 2 读取 csv 文件: spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show() 1 2 读取 parquet 文件: spark.read.parquet("D:\\data\\output\\parquet").show() 1 2 读取 mysql 表: val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") spark.read.jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show() 1 2 3 4 5 6 写入 json 文件: personDF.write.json("D:\\data\\output\\json") 1 2 写入 csv 文件: personDF.write.csv("D:\\data\\output\\csv") 1 2 写入 parquet 文件: personDF.write.parquet("D:\\data\\output\\parquet") 1 2 写入 mysql 表: val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") personDF.write.mode(SaveMode.Overwrite).jdbc( "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
标签:2024.1,27,读取,show,日报,personDF,json,parquet,spark From: https://www.cnblogs.com/Arkiya/p/17991309