首页 > 数据库 >SparkSQL

SparkSQL

时间:2024-11-07 20:18:07浏览次数:6  
标签:DataFrame 查询 RDD SparkSQL SQL Spark os

一、概述

1.1、什么是sparksql

   Spark SQL 是 Spark 中用于处理结构化数据的模块,它提供了两个主要的编程抽象:DataFrame 和 DataSet,并且还可以作为分布式 SQL 查询引擎使用。Spark SQL 的设计目的是简化结构化数据的处理和查询,同时提高执行效率。

   传统的 Hive SQL 通过将 SQL 查询转换为 MapReduce 任务来执行,虽然简化了开发者的编程工作,但由于 MapReduce 的计算模型较为低效,导致执行速度较慢。为了解决这个问题,Spark SQL 应运而生。Spark SQL 将 SQL 查询转换为 RDD(弹性分布式数据集)操作,然后提交到集群执行。由于 RDD 的设计和优化,Spark SQL 的执行效率远高于传统的 MapReduce 模型,从而大大提升了数据处理和查询的速度。

1.2、sparksql的特点

1.2.1高性能:
        Spark SQL 利用了列式存储和压缩技术,结合查询计划的优化和代码生成等技术,能够在处理大规模数据时提供高效的查询和分析能力。
1.2.2多数据源支持:
        Spark SQL 支持多种数据源,包括关系型数据库、Hive、Parquet、Avro、JSON 等。它提供了一致的 API 和查询语言,使用户能够以统一的方式处理不同类型的 数据。
1.2.3SQL 支持:
        Spark SQL 提供了全面的 SQL 支持,用户可以使用标准的 SQL 语法进行查询、过滤和聚合等操作。这使得用户可以利用已有的 SQL 知识和经验来处理数据。
1.2.4DataFrame 和 DataSet 抽象:
        Spark SQL 引入了 DataFrame 和 DataSet 两个抽象概念,用于表示结构化数据。DataFrame 类似于关系型数据库中的表,支持查询、过滤和聚合操作。而 DataSet 是 DataFrame 的类型安全版本,通过编译时检查来避免常见的错误。
1.2.5内置函数和 UDF 支持:
        Spark SQL 提供了丰富的内置函数,用于数据转换、日期处理、字符串操作等。同时,它还支持用户自定义函数(UDF)和用户自定义聚合函数(UDAF),使用户能够根据自己的需求扩展功能。
1.2.6扩展性:
        Spark SQL 提供了许多可扩展的接口和机制,使用户能够根据自己的需求来扩展和定制功能。用户可以自定义数据源、自定义优化规则、自定义函数等。
1.2.7统一的编程模型:
        Spark SQL 与 Spark 的其他模块(如 Spark Streaming 和 MLlib)紧密集成,使用户能够使用统一的编程模型来处理不同类型的数据,从而简化开发和维护工作。
        总之,Spark SQL 具有高性能、多数据源支持、SQL 和 DataFrame/DataSet 抽象、可扩展性和统一的编程模型等特点,使用户能够以统一的方式高效处理大规模的结构化数据。

1.3、RDD

概念:

RDD 是 Spark 最基础的数据抽象,代表一个不可变的、分区的、容错的并行数据集合。RDD 可以从外部存储系统(如 HDFS、HBase、本地文件系统等)加载,也可以从其他 RDD 转换而来。

特点:

低级 API:RDD 提供了丰富的低级操作,如 map、filter、reduce、join 等。
强类型:RDD 是强类型的,但需要手动进行类型转换。
优化有限:RDD 没有内置的查询优化器,需要手动进行优化。
灵活:RDD 提供了最大的灵活性,适合复杂的自定义操作

1.4、DataFrame

概念:

DataFrame 是一种分布式的数据集,类似于关系型数据库中的表,带有结构化的信息。DataFrame 提供了丰富的 API,支持各种数据操作,如 select、filter、groupBy、join 等。

特点:

高级 API:DataFrame 提供了高级的 API,支持标准的 SQL 查询。
优化器:DataFrame 内置了 Catalyst 查询优化器,可以自动优化查询计划。
弱类型:DataFrame 是弱类型的,适用于处理结构化数据。
内存管理:DataFrame 利用了 Spark 的内存管理机制,可以在内存中高效地处理数据

1.5、DataSet

概念:

Dataset 是 DataFrame 的类型安全版本,提供了编译时类型检查。Dataset 结合了 RDD 的强类型和 DataFrame 的优化器。

特点:

强类型:Dataset 是强类型的,可以在编译时进行类型检查,避免运行时的类型错误。
优化器:Dataset 内置了 Catalyst 查询优化器,可以自动优化查询计划。
高级 API:Dataset 提供了高级的 API,支持标准的 SQL 查询和 DSL(领域特定语言)操作。
内存管理:Dataset 利用了 Spark 的内存管理机制,可以在内存中高效地处理数据。

二、sparksql的使用

使用pycharm书写sparksql代码,需要用到pyspark,若没有这个库,则需要下载

pip install pyspark==3.1.2 -i https://pypi.tuna.tsinghua.edu.cn/simple/

2.1、单词统计

但凡路径都要使用自己的,我这里使用的pyspark依托于Anaconda,所以路径会有所不同。

import os
from pyspark.sql import SparkSession

if __name__ == '__main__':
    # todo:0-设置系统环境变量
    os.environ['JAVA_HOME'] = 'D:/Java/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("单词统计").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.text("../../data/wordcount/word.txt")

    df.createOrReplaceTempView("words")  #给读出来的临时数据表一个名字

    spark.sql("""
        with t as (  
        select word from words lateral view explode(split(value," ")) as word where trim(word) != '    ')
        select word,count(1) num from t group by word order by count(1) desc
    """).show()  #  .show()是为了让结果呈现在控制台,不加就看不到结果

    spark.stop()

 2.2、电影评分统计

数据如下:

通过百度网盘分享的文件:电影数据
链接:https://pan.baidu.com/s/1KxjtjE_ylwRTD3ccqeGpdw?pwd=1234 
提取码:1234

需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。

import os
from pyspark.sql import SparkSession

"""
------------------------------------------
  Description : TODO:
  SourceFile : 03-关于电影评分的案例
  Author  : 懒大王
  Date  : 2024/11/4
-------------------------------------------
"""
if __name__ == '__main__':
    # todo:0-设置系统环境变量
    os.environ['JAVA_HOME'] = 'D:/Java/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    with SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
            "spark.sql.shuffle.partitions", 2).getOrCreate() as spark:

        sc = spark.sparkContext
        df = sc.textFile("../../data/movies.dat").map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2]))\
            .toDF(["movie_id","movie_name","movie_type"])
        df.createOrReplaceTempView("movies")

        df2 = sc.textFile("../../data/ratings.dat").map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2],line.split("::")[3]))\
            .toDF(["user_id","movie_id","mark","mark_time"])
        df2.createOrReplaceTempView("ratings")

        spark.sql("""
        select m.movie_name movie_name,avg(mark) avgMark,count(1) marknum from movies m join ratings r on m.movie_id =r.movie_id 
        group by m.movie_name having marknum >2000 order by marknum desc
        """).show()

标签:DataFrame,查询,RDD,SparkSQL,SQL,Spark,os
From: https://blog.csdn.net/xieyichun_/article/details/143605872

相关文章

  • 《SparkSQL--通过ThriftServer连接DataGrip》
    ThriftServer功能:类似于HiveServer2,负责解析客户端提交的SQL语句,转换成Spark的任务进行执行本质:Spark中的一个特殊的程序,利用程序的资源运行所有SQL,该程序除非手动关闭,否则一直运行 启动服务,该服务不会停止,一直在后台启动,假如启动不了,记得查看日志。/opt/installs/spark/sb......
  • SparkSql读取数据的方式
    一、读取普通文件 方式一:给定读取数据源的类型和地址spark.read.format("json").load(path)spark.read.format("csv").load(path)spark.read.format("parquet").load(path)方式二:直接调用对应数据源类型的方法spark.read.json(path)spark.read.csv(path)spark.read.pa......
  • JSON日志处理 | 基于SparkSql实现
    目录0 主要JSON处理函数1JSON处理函数使用2案例分析3小结0 主要JSON处理函数get_json_object:提取单个JSON字段json_tuple:同时提取多个JSON字段from_json:JSON字符串转结构化数据to_json:结构化数据转JSON字符串schema_of_json:推断JSONs......
  • SparkSQL与Hive查询不一致问题
    文章目录1.Hive版本2.问题背景3.问题现象4.原因分析1).分析原因可能是缓存2).发现文件存储特点3).子文件夹出现原因5.解决方式1).方法1修改配置2).方法2修改脚本6.总结1.Hive版本Hive1.2.1000.2.6.5.0-2922.问题背景交付项目上基本所有的脚本任务,都是使用h......
  • Spark(十一)SparkSQL 数据的加载和保存
    通用的加载和保存方式这里的通用指的是使用相同的API,根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式为parque1.加载数据spark.read.load是加载数据的通用方法,支持的数据源格式:scala>spark.read.csvjdbcloadoptionsparquet......
  • Spark(十)SparkSQL DataSet
    DataSetDataSet是具有强类型的数据集合,需要提供对应的类型信息1.创建DataSet使用样例类序列创建DataSetscala>caseclassperson(id:Int,name:String,age:Int)definedclasspersonscala>valcaseClassDS=Seq(person(1,"zhangsan",23)).toDS()caseClassDS:org.apa......
  • Spark(九)SparkSQL DataFrame
    DataFrameSparkSQL的DataFrameAPI允许我们使用DataFrame而不用必须去注册临时表或者生成SQL表达式,DataFrameAPI既有transformation操作也有action操作1.创建DataFrame从Spark数据源进行创建启动SparkShell[user@hadoop102spark-yarn]$bin/spark-shell查看Spark......
  • SparkSQL练习:对学生选课成绩进行分析计算
    题目内容:对学生选课成绩进行分析计算题目要求:(1)该系总共有多少学生;(2)该系共开设来多少门课程;(3)每个学生的总成绩多少;(4)每门课程选修的同学人数;(5)每位同学选修的课程门数;(6)该系DataBase课程共有多少人选修;(7)每位同学平均成绩;数据预览:每行数据包括以下三部分内容:学生姓名,所学......
  • SparkSQL日期时间模式详解
    datatime使用场景CSV/JSON数据源使用模式字符串来解析和格式化日期时间内容。日期时间函数用于转换StringType类型到DateType或TimestampType类型,反之亦然。例如,unix_timestamp,date_format,to_unix_timestamp,from_unixtime,to_date,to_timestamp,from_utc_timestam......
  • SparkSQL数值模式详解
    简介函数如to_number和to_char确实支持在字符串类型和十进制(数值)类型之间进行转换。这些函数接受格式字符串作为参数,这些格式字符串指示了如何在这两种类型之间映射。to_number:这个函数通常用于将字符串转换成数值类型。你需要提供一个格式字符串来指定如何解释字符串......