首页 > 数据库 >sparkSQL原理和使用——一般在生产中,基本都是使用hive做数据仓库存储数据,然后用spark从hive读取数据进行处理

sparkSQL原理和使用——一般在生产中,基本都是使用hive做数据仓库存储数据,然后用spark从hive读取数据进行处理

时间:2023-06-01 12:34:03浏览次数:50  
标签:读取数据 val 1981 SparkSession sal 数据仓库 hive Spark spark

一、spark SQL概述

1.1 什么是spark SQL

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。类似于hive的作用。

1.2 spark SQL的特点

1、容易集成:安装Spark的时候,已经集成好了。不需要单独安装。
2、统一的数据访问方式:JDBC、JSON、Hive、parquet文件(一种列式存储文件,是SparkSQL默认的数据源,hive中也支持)
3、完全兼容Hive。可以将Hive中的数据,直接读取到Spark SQL中处理。
一般在生产中,基本都是使用hive做数据仓库存储数据,然后用spark从hive读取数据进行处理。
4、支持标准的数据连接:JDBC、ODBC
5、计算效率比基于mr的hive高,而且hive2.x版本中,hive建议使用spark作为执行引擎

二、spark SQL基本原理

2.1 DataFrame和DataSet基本概念

2.1.1 DataFrame

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,里面有表的结构以及数据,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,
例如:
结构化数据文件
hive中的表
外部数据库或现有RDDs
DataFrame API支持的语言有Scala,Java,Python和R。

比起RDD,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

2.1.2 DataSet

Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

2.2 创建DataFrame的方式

2.2.1 SparkSession对象

Apache Spark 2.0引入了SparkSession,其为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。
在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext。然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。
要注意一点,在我用的这个spark版本中,直接使用new SQLContext() 来创建SQLContext对象,会显示该方式已经被弃用了(IDEA中会显示已弃用),建议使用SparkSession来获取SQLContext对象。

2.2.2 通过case class样本类

这种方式在scala中比较常用,因为case class是scala的特色

/**
表 t_stu 的结构为:
id name age
*/

object CreateDF {
  def main(args: Array[String]): Unit = {
    //这是最新的获取SQLContext对象的方式
    //2、创建SparkSession对象,设置master,appname
    val spark = SparkSession.builder().master("local").appName("createDF case class").getOrCreate()
    //3、通过spark获取sparkContext对象,读取数据
    val lines = spark.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //4、将数据映射到case class中,也就是数据映射到表的对应字段中
    val tb = lines.map(t=>emp(t(0).toInt,t(1),t(2).toInt))
    //这里必须要加上隐式转换,否则无法调用 toDF 函数
    import spark.sqlContext.implicits._

    //5、生成df
    val df2 = tb.toDF()

    //相当于select name from t_stu
    df1.select($"name").show()

    //关闭spark对象
    spark.stop()
  }
}

/*1、定义case class,每个属性对应表中的字段名以及类型
     一般生产中为了方便,会全部定义为string类型,然后有需要的时候
     才根据实际情况将string转为需要的类型
   这一步相当于定义表的结构
*/
case class emp(id:Int,name:String,age:Int)

总结步骤为:

1、定义case class,用来表结构
2、创建sparkSession对象,用来读取数据
3、将rdd中的数据和case class映射
4、调用 toDF 函数将rdd转为 DataFrame

2.2.3 通过StructType类

这种方式java比较常用

package SparkSQLExer

import org.apache
import org.apache.spark
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

/**
  * 创建dataschema方式2:
  * 通过spark session对象创建,表结构通过StructType创建
  */
object CreateDF02 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create schema").getOrCreate()

    //1、通过StructType创建表结构schema,里面表的每个字段使用 StructField定义
    val tbSchema = StructType(List(
        StructField("id",DataTypes.IntegerType),
        StructField("name",DataTypes.StringType),
        StructField("age",DataTypes.IntegerType)
      ))

    //2、读取数据
    var lines = sparkS.sparkContext.textFile("G:\\test\\t_stu.txt").map(_.split(","))

    //3、将数据映射为ROW对象
    val rdd1 = lines.map(t=>Row(t(0).toInt,t(1),t(2).toInt))

    //4、创建表结构和表数据映射,返回的就是df
    val df2 = sparkS.createDataFrame(rdd1, tbSchema)

    //打印表结构
    df2.printSchema()

    sparkS.stop()

  }

}

总结步骤为:

1、通过StructType创建表结构schema,里面表的每个字段使用 StructField定义
2、通过sparkSession.sparkContext读取数据
3、将数据映射格式为Row对象
4、将StructType和数据Row对象映射,返回df

2.2.4 使用json等有表格式的文件类型

package SparkSQLExer

import org.apache.spark.sql.SparkSession

/**
  * 创建df方式3:通过有格式的文件直接导入数据以及表结构,比如json格式的文件
  * 返回的直接就是一个DF
  */
object CreateDF03 {
  def main(args: Array[String]): Unit = {
    val sparkS = SparkSession.builder().master("local").appName("create df through json").getOrCreate()

    //读取json方式1:
    val jsonrdd1= sparkS.read.json("path")

    //读取json方式2:
    val jsonrdd1= sparkS.read.format("json").load("path")

    sparkS.stop()
  }
}

这种方式比较简单,就是直接读取json文件而已
sparkS.read.xxxx读取任意文件时,返回的都是DF对象

2.3 操作DataFrame

2.3.1 DSL语句

DSL语句其实就是将sql语句的一些操作转为类似函数的方式去调用,比如:

df1.select("name").show

例子:

为了方便,直接在spark-shell里操作了,
spark-shell --master spark://bigdata121:7077

1、打印表结构
scala> df1.printSchema
root
|-- empno: integer (nullable = true)
|-- ename: string (nullable = true)
|-- job: string (nullable = true)
|-- mgr: integer (nullable = true)
|-- hiredate: string (nullable = true)
|-- sal: integer (nullable = true)
|-- comm: integer (nullable = true)
|-- deptno: integer (nullable = true)

2、显示当前df的表数据或者查询结果的数据
scala> df1.show
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980/12/17| 800|   0|    20|
| 7499| ALLEN| SALESMAN|7698| 1981/2/20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 1981/2/22|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7654|MARTIN| SALESMAN|7698| 1981/9/28|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782| CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788| SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839|  KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7844|TURNER| SALESMAN|7698|  1981/9/8|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 1987/5/23|1100|   0|    20|
| 7900| JAMES|    CLERK|7698| 1981/12/3| 950|   0|    30|
| 7902|  FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
| 7934|MILLER|    CLERK|7782| 1982/1/23|1300|   0|    10|
+-----+------+---------+----+----------+----+----+------+

3、执行select, 相当于select xxx form  xxx where xxx
scala> df1.select("ename","sal").where("sal>2000").show
+------+----+
| ename| sal|
+------+----+
| SMITH| 800|
| ALLEN|1600|
|  WARD|1250|
| JONES|2975|
|MARTIN|1250|
| BLAKE|2850|
| CLARK|2450|
| SCOTT|3000|
|  KING|5000|
|TURNER|1500|
| ADAMS|1100|
| JAMES| 950|
|  FORD|3000|
|MILLER|1300|
+------+----+

4、对某些列进行操作
对某个指定进行操作时,需要加上$符号,然后后面才能操作
$代表 取出来以后,再做一些操作。
注意:这个 $ 的用法在ideal中无法正常使用,解决方法下面说
scala> df1.select($"ename",$"sal",$"sal"+100).show
+------+----+-----------+
| ename| sal|(sal + 100)|
+------+----+-----------+
| SMITH| 800|        900|
| ALLEN|1600|       1700|
|  WARD|1250|       1350|
| JONES|2975|       3075|
|MARTIN|1250|       1350|
| BLAKE|2850|       2950|
| CLARK|2450|       2550|
| SCOTT|3000|       3100|
|  KING|5000|       5100|
|TURNER|1500|       1600|
| ADAMS|1100|       1200|
| JAMES| 950|       1050|
|  FORD|3000|       3100|
|MILLER|1300|       1400|
+------+----+-----------+

5、过滤行
scala> df1.filter($"sal">2000).show
+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|  1981/4/2|2975|   0|    20|
| 7698|BLAKE|  MANAGER|7839|  1981/5/1|2850|   0|    30|
| 7782|CLARK|  MANAGER|7839|  1981/6/9|2450|   0|    10|
| 7788|SCOTT|  ANALYST|7566| 1987/4/19|3000|   0|    20|
| 7839| KING|PRESIDENT|7839|1981/11/17|5000|   0|    10|
| 7902| FORD|  ANALYST|7566| 1981/12/3|3000|   0|    20|
+-----+-----+---------+----+----------+----+----+------+

6、分组以及计数
scala> df1.groupBy($"deptno").count.show
+------+-----+                                                                  
|deptno|count|
+------+-----+
|    20|


标签:读取数据,val,1981,SparkSession,sal,数据仓库,hive,Spark,spark
From: https://blog.51cto.com/u_11908275/6393215

相关文章

  • Hive扩展内容
    一个SQL语句的分析SELECTa.Key,SUM(a.Cnt)ASCntFROM(SELECTKey,COUNT(*)ASCntFROMTableNameGROUPBYKey,CASEWHENKey='KEY001'THENHash(Random())%50ELSE0END)aGROUPBYa.Key;这个SQL其实是一个解决数据倾......
  • Hive高级函数实战
    函数的基本操作和mysql一样的,hive也是一个主要做统计的工具,所以为了满足各种各样的统计需要,它也内置了相当多的函数showfunctions;#查看所有内置函数descfunctionfunctionName;#查看指定函数的描述信息descfunctionextendedfunctionName;#显示函数的扩展内容Hiv......
  • 【博学谷学习记录】超强总结,用心分享 | hive分区与分桶的区别
    【博学谷IT技术支持】公众号:积雷山摩云洞,欢迎关注!!!概念分区表:将数据分散到多个子目录中,在执行查询是,可以根据条件加快查询效率分桶表:是相对分区更细的颗粒度划分,分桶表是将表查分到不同的文件中,根据数据表某列的hash值进行分区,对某列数据分区就是对该列属性值的hash值取模,......
  • tornado 分页读取数据库 实时下载csv
    class downloadHandler(RequestHandler):deffetdata(self,inde):    withMogoContext()asmongo:      res=list(mongo.db['datas'].find().limit(10).skip(inde*10))      fordinres:        yieldddefget(self):  ......
  • Hive核心实战
    Hive中数据库的操作showdatabases;#查看数据库列表usedefault;#选择数据库createdatabasemydb1;#创建数据库createdatabasemydb2location'/user/hive/mydb2';#指定hdfs目录的位置dropdatabasemydb1;#删除数据库default是默认数据库,默认就在这个库里面......
  • Hive中的表类型
    在Mysql中没有表类型这个概念,因为它就只有一种表。但是Hive中是有多种表类型的,我们可以分为四种,内部表、外部表、分区表、桶表下面来一个一个学习一下这些类型的表内部表内部表也可以称为受控表,它是Hive中的默认表类型,表数据默认存储在warehouse目录中。在加载数据的过程中,......
  • Hive基础使用
    Hive的使用方式可以在Shell命令行下操作Hive,或者使用JDBC代码的方式操作命令行方式针对命令行这种方式,其实还有两种使用第一个是使用bin目录下的hive命令,这个是从hive一开始就支持的使用方式后来又出现一个beeline命令,它是通过HiveServer2服务连接hive,是一个轻量级的客户端......
  • Hive - 言出法随
            --显示当前数据库selectcurrent_database();--设置hive属性在命令行显示当前数据库sethive.cli.print.current.db=true; ......
  • Hive的用户自定义函数实现步骤与流程
    参考答案:1、如何构建UDF?用户创建的UDF使用过程如下:第一步:继承UDF或者UDAF或者UDTF,实现特定的方法;第二步:将写好的类打包为jar,如hivefirst.jar;第三步:进入到Hive外壳环境中,利用addjar/home/hadoop/hivefirst.jar注册该jar文件;第四步:为该类起一个别名,createtemporaryfunctionmylen......
  • Hive的分区、分桶
    Hive的分区表、分桶表一、 Hive库的分区表1.1概述Hive中的表对应为HDFS上的指定目录,在查询数据时候,默认会对全表进行扫描,这样时间和性能的消耗都非常大。分区为HDFS上表目录的子目录,数据按照分区存储在子目录中。如果查询的where字句的中包含分区条件,则直接从该分区......