首页 > 数据库 >人人都懂Spark-SQL基础操作(Scala版)

人人都懂Spark-SQL基础操作(Scala版)

时间:2023-02-21 17:10:05浏览次数:33  
标签:SchemaRDD Scala hive hiveCtx SQL Spark row


Spark SQL

简单的说Spark SQL是spark用来操作结构化和半结构化数据的接口。本文来讲述一下它的一些基本操作。

Spark SQL的特性

  1. 无缝地将SQL查询和spark程序混合,与常规的Python/Java/scala代码高度整合,包含了连接RDD与SQL表、公开的自定义SQL函数接口等。
  2. 可以从各种结构化数据源中读取数据,如(JSON、HIVE等)
  3. 可以通过JDBC或者ODBC连接,Spark SQL包括有行业标准的JDBC和ODBC连接的服务器模式。

为了实现这些功能Spark SQL提供了一种特殊的RDD叫做SchemaRDD,这是一个存放row对象的RDD,每个RDD代表一行记录。

Spark SQL编译时可以包含hive支持(Apache Hive 是 Hadoop上的SQL引擎),也可以不包含,包含Hive支持的Spark SQL可以支持hive表访问、UDF(用户自定义函数)、SerDe(序列化格式和反序列化格式)、以及Hive查询语言(HiveQL/HQL)。

下面来说一下Spark SQL的操作。

初始化Spark SQL

Scala中SQL的import声明:

//导入 Spark SQL
import org.apache.spark.sql.hive.HiveContext
//如果不能使用hive依赖的话
import org.apahe.spark.sql.SQLContext

需要导入的隐式转换支持

//创建Spark SQL的HiveContext
val hiveCtx = ...
//如果不能使用hive依赖的话
import hiveCtx._

添加好import声明之后,需要创建出一个HiveContext对象,如果无法一如Hive依赖,就创建出一个SQLContext对象作为SQL的上下文环境,这两个类都要传入一个SparkContext对象作为运行的基础

创建SQL上下文环境

val sc = new SparkContext(...)
val hiveCtx = new HiveContext(sc)

有了HiveContext或SQLContext后我们就可以准备读取数据并进行查询了。

基本查询示例

读取并查询推文

想要在一张数据表上进行查询,需要调用HiveContext或者SQLContext中的sql()方法,要做的第一件事就是告诉Spark SQL要查询的数据是什么。

val input = hiveCtx.jsonFile(inputFile)
//注册输入的SchemaRDD
input.registerTempTable("tweets")
//依据retweetCount(转发计数)选出推文
val topTweets = hiveCtx.sql("SELECT text,retweetCount FROM
tweet ORDER BY retweetCount LIMIT 10")

SchemaRDD

我们在读取和执行查询的时候都会返回SchemaRDD,SchemaRDD和传统的数据库中的表的概念类似。SchemaRDD是一个由ROW对象组成的RDD,附带包含每列数据类型的结构信息。

row对象

row对象表示SchemaRDD中的记录,其本质就是一个定长的字段数组,在Scala中Row对象有一系列Getter方法,可以通过下标来获取每个字段的值。

例:在Scala中访问topTweet这个SchemaRDD中的text列(第一列)

val topTweetText = topTweets.map(row => row.getString(0))

读取和存储数据

Spark SQL支持很多种结构化数据源,可以让你跳过复杂的读取过程,轻松从各种数据源中读取到Row对象,这些数据源包括Hive表、JSON和Parquet文件。

Hive

从hive中读取数据时,Spark SQL支持任何Hive支持的存储格式(SerDe),包括文本文件,RCFiles,ORC,Parquet,Avro以及Protocol Buffer。

例:如何查询一张hive表。

import org.apache.spark.sql.hive.HiveContext

val hiveCtx = new HiveContext(sc)
val rows = hiveCtx.sql("SELECT key,value Frow mytable")
val keys = row.map(row => row.getInt(0))

Parquet

Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录,Parquet格式经常在Hadoop生态圈中被使用,他也支持Spark SQL的全部数据类型。

JSON

要读取JSON数据,只要调用hiveCtx中的jsonFile()方法即可,如果想获得从数据中推断出来的结构信息,可以在生成的SchemaRDD上调用printSchema方法。

在Scala中使用Spark SQL读取json数据。

val input = hiveCtx.jsonFile(inputFile)

基于RDD创建SchemaRDD

case class Happyperson(handle:String,favouriteBeverage:String)
...
//创建了一个人的对象,并且把它转换成SchemaRDD
val HappypersonRDD = sc.parallelize(List(Happyperson("holden","coffee")))
happyPeopleRDD.registerTempTable("happy_people")

小结

本文讲解了Spark利用Spark SQL进行结构化和半结构化数据处理的方式,除了上述的一些方法,操作RDD的方法也同样适用于SchemaRDD。

标签:SchemaRDD,Scala,hive,hiveCtx,SQL,Spark,row
From: https://blog.51cto.com/u_15969421/6076763

相关文章

  • 本地计算机上的MySQL服务启动后停止。某些服务在未由其他服务或程序使用时将自动
    遇到问题:解决方案1:按网友说的:删除服务后推重新初始化。mysqld--removemysqlmysqld--installmysql--defaults-file=C:\ProgramFiles\MySQL\my.ini后果是会数据会丢失。......
  • mysql char,mysql to_char
    场景:mysql相仿to_char()to_date()函数mysql类似to_char()to_date()函数mysql日期和字符相互转换方法date_format(date,'%Y-%m-%d')  -------------->oracle中的......
  • Mysql 获取当前日期前一天(DATE_SUB()函数和DATE_ADD()的用法)
    Mysql获取当前日期前一天DATE_SUB()函数的用法DATE_ADD()函数一、用途:函数调用形式:简单应用业务需要:  这是关于一个报名活动的需求,因为报名开始时间和活动结束时间......
  • PostgreSQL update set from 两表联合更新
    updatet_businesstbsetsystem_id=ir.application_idfromt_business_irregularirwheretb.affected_business=ir.application_nameupdatet_businesstbsetsyst......
  • MySQL安装及配置
    Mysql本质上是一个软件  8.x,5.x  本人采用5.7.31版本(既支持win又支持mac)下载地址:https://downloads.mysql.com/archives/community/本人经常使用的为:链接:https......
  • SQL Server数据库日常检查
    1.1代码检查从昨天到现在,SQL代理Job有没有运行失败的,会把运行失败的Job名字,步骤,运行时间,错误等级,错误原因罗列出来,方便查看。----1.1  Check Job Fail List From L......
  • Excel文件 利用MySQL/Python 实现自动处理数据的功能
    目录问题描述:解决方案:一、SQL查询二、SQL、python处理三、python处理四、优化python处理1.手动执行代码2.开机自动执行代码对比四种方案:总结:问题描述:在没有服务器存储数......
  • windows 安装 mysql-8.0.32 压缩包方式
    下载地址https://dev.mysql.com/downloads/mysql/5.0.html解压把解压好的文件夹放到D盘(注意所有路径中都不要包含中文路径)D:\mysql-8.0.32-winx64创建一个空文......
  • Centos7 安装MySQL8
    1.删除之前安装的MySQL包[root@localhost~]#rpm-qa|grepmariadbmariadb-libs-5.5.60-1.el7_5.x86_64[root@localhost~]#yumerase-ymariadb-libs-5.5.60-1.el......
  • idea中配置mybatis 映射文件模版及 mybatis plus 自定义sql
    本文为博主原创,未经允许不得转载:mybatisplus 使用过程中已经很大程度提升了我们开发的效率,因为它内部已经对单表的操作进行了完美的封装,但是关联表操作时,这时就需要自......