首页 > 数据库 >sparkSql

sparkSql

时间:2022-10-30 09:58:18浏览次数:47  
标签:String 使用 hive DSL sparkSql sql spark

SparkSQL&sparkDSL

1、SparkSQL

(1)、构建SparkSession spark2.x统一入口

如果要与hive进行交互,在建立spark入口时加上

.enableHiveSupport()
(1)首先添加依赖:
		<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        
        
(2)使用DSL之前首先要建立连接:
/**

   * 构建SparkSession
      spark2.x统一入口
          */
           //      注意命名规范
        val spark: SparkSession = SparkSession
          .builder()
          .appName("sparksession")
          .master("local")
          .getOrCreate()

//    如果需要获取sparkContext,可以直接获取
            val sc: SparkContext = spark.sparkContext
            //    通过sc构建RDD
            val linesRDD: RDD[String] = sc.textFile("F:\\IdeaProjects\\Spark\\src\\main\\data\\students.txt")
            //    linesRDD.foreach(println)
            //  DataFrame类型 默认打印20条数据                                                    1500100902,丰昊明,23,男,文科六班
            val stuDF: DataFrame = spark
            .read.format("csv")
            .option("sep", ",")
            .schema("id String,name String,age Int,gender String,clazz String")
            .load("F:\\IdeaProjects\\Spark\\src\\main\\data\\students.txt")
            //打印字段结构类型
            stuDF.printSchema()
            //    输出数据默认20条
            stuDF.show()
            stuDF.show(10,truncate = false)//打印10条
            stuDF.show(10,truncate = 1,vertical = true)//完整显示数据结果


  • DataFrame 同 Dataset[Row] 没有区别,是一样的
  • Row对象:就是给每行数据增加列结构
  • 在RDD基础上增加了列结构

2、SparkSQL和DSL区别:

sparkSQL:(类SQL)

1、SparkSQL在做关联时,默认将小表广播;

2、加载数据,加载后的数据可以进行使用DSL SQL使用,sparkSQL使用的情况下首先要进行生成一个临时视图表,例:stuDF.createOrReplaceTempView("stu_tb")

举例:
输入并输出

stuDF.createOrReplaceTempView("stu_tb")

spark.sql(
  """
    |select *
    |from emp
    |""".stripMargin).show()
DSL(DSL:特定领域语言):

1.分组聚合操作会出产生shuffle,在sparkSQL中shuffle的默认分区数量是200,对于小数据来说分区太多了,可以进行配置,通过spark.sql.shuffle.partittions设置数量;

2.做两个表进行join操作的情况下,当关联的字段名相同时,使用列表达式会出现ambiguous错误;

3.DSL中的条件函数是when,没有if,if是关键字;

4.分数进行归一化处理,需要先转化为百分机制再处理

5.需要添加一行使用with column,修改别名使用with columnrenamed

DateFrame
1、查询可以使用字符串表达式
2、查询可以使用列表达式
(在使用之前需要列表达式导入隐式转换及函数,才可以使用其功能)
 导入隐式转换及函数,为了使用sparkSQL或者DSL进行使用
import spark.implicits._
import org.apache.spark.sql.functions._

7.在DSL使用fliter过滤参数可以传列表达式,字符串表达式,以及函数

8.DSL使用having分组之后对聚合后的结果进行过滤

9.DSL中用于聚合操作使用agg()

10.limit 控制输出的数据条数 (在DSL中不太好实现分页)

11.DSL的union相当于SQL中的union all

12.如果需要额外增加新的一列数据可以直接使用withColumn

3、DSL与SQL中的广播

DSL中的手动广播: 
    stuDF.join(scoDF.hint("broadcast"), List("id"), "inner").show(6000)
    
SQL中的手动广播:
    
      spark.sql(
       """
       |
       |select /*+broadcast(a)  */ * from score as a join student as b on a.sId=b.id
       |
       """.stripMargin).show(1000000)
     
为什么在SQL中不能在分组聚合之后直接使用where,在DSL中可以?
因为在sql中where的执行顺序在from之后,在groupby之前,不能直接对分组后的结果进行过滤,可以使用子查询嵌套
在DSL中执行顺序就是代码的先后顺序,所以可以直接在分组聚合之后直接跟where

4、Spark-SQL 写代码方式:

  • 1、在IDEA中将代码编写好打包上传到集群中运行(上线使用)

    ​ 使用spark-submit提交

  • 2、spark-shell (repl) 里面使用sqlContext 测试使用,简单任务使用

    spark-shell --master yarn-client

    不能使用yarn-cluster

  • 3、spark-sql

    spark-sql --master yarn-client

    不能使用yarn-cluster

  • 4、整合hive 使用hive的元数据

    • 在hive的hive-site.xml修改一行配置

      在使用之前都需要先启动元数据服务

      <property>
        <name>hive.metastore.uris</name>
        <value>thrift://master:9083</value>
      </property>
      
    • 将hive-site.xml 复制到spark conf目录下

      cp /usr/local/soft/hive-1.2.1/conf/hive-site.xml /usr/local/soft/spark-2.4.5/conf/

    • 启动hive元数据服务

      hive --service metastore(推荐 更方便查看日志)

      nohup hive --service metastore >> metastore.log 2>&1 &

    • 将mysql 驱动包复制到spark jars目录下

      cp /usr/local/soft/hive-1.2.1/lib/mysql-connector-java-5.1.49.jar /usr/local/soft/spark-2.4.5/jars/

    • 整合好之后在spark-sql 里面就可以使用hive的表了

      不能使用yarn-cluster模式

      spark-sql --master yarn-client --conf spark.sql.shuffle.partitions=2

      • 在spark-sql中设置运行参数

        set spark.sql.shuffle.partitions=2;

5、RDDtoDF:

创建入口:导入隐式函数,创建样例类

def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder()
      .appName("Demo07RDDToDF")
      .master("local")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()

    import spark.implicits._

    val stuRDD: RDD[String] = spark.sparkContext.textFile("Spark/data/students.txt")

  }
}
case class StuRDDToDF(id: String, name: String, age: Int, gender: String, clazz: String)
1、手动指定:toDF()
// RDD to DataFrame
    // 1、手动指定列名
    val stuRddToDF: DataFrame = stuRDD.map(line => {
      val splits: Array[String] = line.split(",")
      (splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
    }).toDF("id", "name", "age", "gender", "clazz")

    stuRddToDF.show()
2、使用样列类
 // 2、使用样例类
    val stuRddToDF2: DataFrame = stuRDD.map(line => {
      val splits: Array[String] = line.split(",")
      StuRDDToDF(splits(0), splits(1), splits(2).toInt, splits(3), splits(4))
    }).toDF()

    stuRddToDF2.show()
3.DFtoRDD
 // DF to RDD
    // 直接调用.rdd方法即可得到一个 每一条数据都是Row对象的RDD
    val rdd: RDD[Row] = stuRddToDF.rdd

标签:String,使用,hive,DSL,sparkSql,sql,spark
From: https://www.cnblogs.com/tanggf/p/16840531.html

相关文章

  • SparkSQL(二)
    【理解】SparkSQL执行流程接收到查询,既可以是SQL语句,也可以是DSL语法,以一个SQL语句为例:1、Parser,第三方类库Antlr实现。将sql字符串切分成Token,根据语义规则......
  • SparkSQL
    DataFrame创建DataFrame1.转换为DataFrame方式1将RDD[元组或列表]转换为DataFrame定义RDD,每个元素是Row类型将上面的RDD[Row]转换成DataFrame,df=spark.createDat......
  • SparkSQL
    DataFrameDataFrame是一种以RDD为基础的分布式数据集,类似于二维表格。与RDD的区别在于,前者带有schema元信息,即DataFrame。DataFrame也是懒执行的,但性能上比......
  • SparkSQL参数
    SparkSQL参数<1>表分区类参数--是否允许动态生成分区sethive.exec.dynamic.partition=true;--是否容忍指定分区全部动态生成sethive.exec.dynamic.partition.mode=......
  • SparkSQL on K8s 在网易传媒的落地实践
    随着云原生技术的发展和成熟,大数据基础设施积极拥抱云原生是业内发展的一大趋势。网易传媒在2021年成功将SparkSQL部署到了K8s集群,并实现与部分在线业务的混合部署,......
  • (4)SparkSQL中如何定义UDF和使用UDF
    SparkSQL中用户自定义函数,用法和SparkSQL中的内置函数类似;是saprkSQL中内置函数无法满足要求,用户根据业务需求自定义的函数。首先定义一个UDF函数:packagecom.udf;import......
  • 关于sparksql调优的一些操作
    1、查看执行计划 1、直接sql查看:explainselect...from... 2、ds.explain()2、执行计划的处理流程 sql代码->未决断的逻辑执行计划->根据元数据生成已决断的逻辑......
  • sparksql 优化
    最近把spark文档里面配置那一页看了一下,在这记录一些可用的配置,免得后续再去查文档地址:https://spark.apache.org/docs/3.0.1/configuration.htmlSpark文档运行环境......
  • sparksql 函数大全
    数学函数函数简介用法acosh反双曲余弦值SELECTacosh(0.5);0.9624236501192069SELECTacosh(3.5);1.9248473002384139asinh反双曲正弦SELECTasinh(1.45);......
  • sparksql概念补充
    Spark-sql概念补充基本概念        SparkSQL是基于RDD的,可以通过Schema信息来访问其中某个字段        RDD处理的不是结构化数据,所以不能进行类似HIve......