SparkSQL&sparkDSL
1、SparkSQL
(1)、构建SparkSession spark2.x统一入口
如果要与hive进行交互,在建立spark入口时加上
.enableHiveSupport()
(1)首先添加依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.5</version>
</dependency>
(2)使用DSL之前首先要建立连接:
/**
* 构建SparkSession
spark2.x统一入口
*/
// 注意命名规范
val spark: SparkSession = SparkSession
.builder()
.appName("sparksession")
.master("local")
.getOrCreate()
// 如果需要获取sparkContext,可以直接获取
val sc: SparkContext = spark.sparkContext
// 通过sc构建RDD
val linesRDD: RDD[String] = sc.textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\students.txt")
// linesRDD.foreach(println)
// DataFrame类型 默认打印20条数据 1500100902,丰昊明,23,男,文科六班
val stuDF: DataFrame = spark
.read.format("csv")
.option("sep", ",")
.schema("id String,name String,age Int,gender String,clazz String")
.load("F:\\IdeaProjects\\Spark\\src\\main\\data\\students.txt")
//打印字段结构类型
stuDF.printSchema()
// 输出数据默认20条
stuDF.show()
stuDF.show(10,truncate = false)//打印10条
stuDF.show(10,truncate = 1,vertical = true)//完整显示数据结果
- DataFrame 同 Dataset[Row] 没有区别,是一样的
- Row对象:就是给每行数据增加列结构
- 在RDD基础上增加了列结构
2、SparkSQL和DSL区别:
sparkSQL:(类SQL)
1、SparkSQL在做关联时,默认将小表广播;
2、加载数据,加载后的数据可以进行使用DSL SQL使用,sparkSQL使用的情况下首先要进行生成一个临时视图表,例:stuDF.createOrReplaceTempView("stu_tb")
举例:
输入并输出
stuDF.createOrReplaceTempView("stu_tb")
spark.sql(
"""
|select *
|from emp
|""".stripMargin).show()
DSL(DSL:特定领域语言):
1.分组聚合操作会出产生shuffle,在sparkSQL中shuffle的默认分区数量是200,对于小数据来说分区太多了,可以进行配置,通过spark.sql.shuffle.partittions设置数量;
2.做两个表进行join操作的情况下,当关联的字段名相同时,使用列表达式会出现ambiguous错误;
3.DSL中的条件函数是when,没有if,if是关键字;
4.分数进行归一化处理,需要先转化为百分机制再处理
5.需要添加一行使用with column,修改别名使用with columnrenamed
DateFrame 1、查询可以使用字符串表达式 2、查询可以使用列表达式 (在使用之前需要列表达式导入隐式转换及函数,才可以使用其功能) 导入隐式转换及函数,为了使用sparkSQL或者DSL进行使用 import spark.implicits._ import org.apache.spark.sql.functions._
7.在DSL使用fliter过滤参数可以传列表达式,字符串表达式,以及函数
8.DSL使用having分组之后对聚合后的结果进行过滤
9.DSL中用于聚合操作使用agg()
10.limit 控制输出的数据条数 (在DSL中不太好实现分页)
11.DSL的union相当于SQL中的union all
12.如果需要额外增加新的一列数据可以直接使用withColumn
3、DSL与SQL中的广播
DSL中的手动广播:
stuDF.join(scoDF.hint("broadcast"), List("id"), "inner").show(6000)
SQL中的手动广播:
spark.sql(
"""
|
|select /*+broadcast(a) */ * from score as a join student as b on a.sId=b.id
|
""".stripMargin).show(1000000)
为什么在SQL中不能在分组聚合之后直接使用where,在DSL中可以?
因为在sql中where的执行顺序在from之后,在groupby之前,不能直接对分组后的结果进行过滤,可以使用子查询嵌套
在DSL中执行顺序就是代码的先后顺序,所以可以直接在分组聚合之后直接跟where
4、Spark-SQL 写代码方式:
-
1、在IDEA中将代码编写好打包上传到集群中运行(上线使用)
使用spark-submit提交
-
2、spark-shell (repl) 里面使用sqlContext 测试使用,简单任务使用
spark-shell --master yarn-client
不能使用yarn-cluster
-
3、spark-sql
spark-sql --master yarn-client
不能使用yarn-cluster
-
4、整合hive 使用hive的元数据
-
在hive的hive-site.xml修改一行配置
在使用之前都需要先启动元数据服务
<property> <name>hive.metastore.uris</name> <value>thrift://master:9083</value> </property>
-
将hive-site.xml 复制到spark conf目录下
cp /usr/local/soft/hive-1.2.1/conf/hive-site.xml /usr/local/soft/spark-2.4.5/conf/
-
启动hive元数据服务
hive --service metastore
(推荐 更方便查看日志)或
nohup hive --service metastore >> metastore.log 2>&1 &
-
将mysql 驱动包复制到spark jars目录下
cp /usr/local/soft/hive-1.2.1/lib/mysql-connector-java-5.1.49.jar /usr/local/soft/spark-2.4.5/jars/
-
整合好之后在spark-sql 里面就可以使用hive的表了
不能使用yarn-cluster模式
spark-sql --master yarn-client --conf spark.sql.shuffle.partitions=2
-
在spark-sql中设置运行参数
set spark.sql.shuffle.partitions=2;
-
-
5、RDDtoDF:
创建入口:导入隐式函数,创建样例类
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("Demo07RDDToDF")
.master("local")
.config("spark.sql.shuffle.partitions", "2")
.getOrCreate()
import spark.implicits._
val stuRDD: RDD[String] = spark.sparkContext.textFile("Spark/data/students.txt")
}
}
case class StuRDDToDF(id: String, name: String, age: Int, gender: String, clazz: String)
1、手动指定:toDF()
// RDD to DataFrame
// 1、手动指定列名
val stuRddToDF: DataFrame = stuRDD.map(line => {
val splits: Array[String] = line.split(",")
(splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
}).toDF("id", "name", "age", "gender", "clazz")
stuRddToDF.show()
2、使用样列类
// 2、使用样例类
val stuRddToDF2: DataFrame = stuRDD.map(line => {
val splits: Array[String] = line.split(",")
StuRDDToDF(splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
}).toDF()
stuRddToDF2.show()
3.DFtoRDD
// DF to RDD
// 直接调用.rdd方法即可得到一个 每一条数据都是Row对象的RDD
val rdd: RDD[Row] = stuRddToDF.rdd
标签:String,使用,hive,DSL,sparkSql,sql,spark
From: https://www.cnblogs.com/tanggf/p/16840531.html