一、概述
1.1、什么是sparksql
Spark SQL 是 Spark 中用于处理结构化数据的模块,它提供了两个主要的编程抽象:DataFrame 和 DataSet,并且还可以作为分布式 SQL 查询引擎使用。Spark SQL 的设计目的是简化结构化数据的处理和查询,同时提高执行效率。
传统的 Hive SQL 通过将 SQL 查询转换为 MapReduce 任务来执行,虽然简化了开发者的编程工作,但由于 MapReduce 的计算模型较为低效,导致执行速度较慢。为了解决这个问题,Spark SQL 应运而生。Spark SQL 将 SQL 查询转换为 RDD(弹性分布式数据集)操作,然后提交到集群执行。由于 RDD 的设计和优化,Spark SQL 的执行效率远高于传统的 MapReduce 模型,从而大大提升了数据处理和查询的速度。
1.2、sparksql的特点
1.2.1高性能:
Spark SQL 利用了列式存储和压缩技术,结合查询计划的优化和代码生成等技术,能够在处理大规模数据时提供高效的查询和分析能力。
1.2.2多数据源支持:
Spark SQL 支持多种数据源,包括关系型数据库、Hive、Parquet、Avro、JSON 等。它提供了一致的 API 和查询语言,使用户能够以统一的方式处理不同类型的 数据。
1.2.3SQL 支持:
Spark SQL 提供了全面的 SQL 支持,用户可以使用标准的 SQL 语法进行查询、过滤和聚合等操作。这使得用户可以利用已有的 SQL 知识和经验来处理数据。
1.2.4DataFrame 和 DataSet 抽象:
Spark SQL 引入了 DataFrame 和 DataSet 两个抽象概念,用于表示结构化数据。DataFrame 类似于关系型数据库中的表,支持查询、过滤和聚合操作。而 DataSet 是 DataFrame 的类型安全版本,通过编译时检查来避免常见的错误。
1.2.5内置函数和 UDF 支持:
Spark SQL 提供了丰富的内置函数,用于数据转换、日期处理、字符串操作等。同时,它还支持用户自定义函数(UDF)和用户自定义聚合函数(UDAF),使用户能够根据自己的需求扩展功能。
1.2.6扩展性:
Spark SQL 提供了许多可扩展的接口和机制,使用户能够根据自己的需求来扩展和定制功能。用户可以自定义数据源、自定义优化规则、自定义函数等。
1.2.7统一的编程模型:
Spark SQL 与 Spark 的其他模块(如 Spark Streaming 和 MLlib)紧密集成,使用户能够使用统一的编程模型来处理不同类型的数据,从而简化开发和维护工作。
总之,Spark SQL 具有高性能、多数据源支持、SQL 和 DataFrame/DataSet 抽象、可扩展性和统一的编程模型等特点,使用户能够以统一的方式高效处理大规模的结构化数据。
1.3、RDD
概念:
RDD 是 Spark 最基础的数据抽象,代表一个不可变的、分区的、容错的并行数据集合。RDD 可以从外部存储系统(如 HDFS、HBase、本地文件系统等)加载,也可以从其他 RDD 转换而来。
特点:
低级 API:RDD 提供了丰富的低级操作,如 map、filter、reduce、join 等。
强类型:RDD 是强类型的,但需要手动进行类型转换。
优化有限:RDD 没有内置的查询优化器,需要手动进行优化。
灵活:RDD 提供了最大的灵活性,适合复杂的自定义操作
1.4、DataFrame
概念:
DataFrame 是一种分布式的数据集,类似于关系型数据库中的表,带有结构化的信息。DataFrame 提供了丰富的 API,支持各种数据操作,如 select、filter、groupBy、join 等。
特点:
高级 API:DataFrame 提供了高级的 API,支持标准的 SQL 查询。
优化器:DataFrame 内置了 Catalyst 查询优化器,可以自动优化查询计划。
弱类型:DataFrame 是弱类型的,适用于处理结构化数据。
内存管理:DataFrame 利用了 Spark 的内存管理机制,可以在内存中高效地处理数据
1.5、DataSet
概念:
Dataset 是 DataFrame 的类型安全版本,提供了编译时类型检查。Dataset 结合了 RDD 的强类型和 DataFrame 的优化器。
特点:
强类型:Dataset 是强类型的,可以在编译时进行类型检查,避免运行时的类型错误。
优化器:Dataset 内置了 Catalyst 查询优化器,可以自动优化查询计划。
高级 API:Dataset 提供了高级的 API,支持标准的 SQL 查询和 DSL(领域特定语言)操作。
内存管理:Dataset 利用了 Spark 的内存管理机制,可以在内存中高效地处理数据。
二、sparksql的使用
使用pycharm书写sparksql代码,需要用到pyspark,若没有这个库,则需要下载
pip install pyspark==3.1.2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
2.1、单词统计
但凡路径都要使用自己的,我这里使用的pyspark依托于Anaconda,所以路径会有所不同。
import os from pyspark.sql import SparkSession if __name__ == '__main__': # todo:0-设置系统环境变量 os.environ['JAVA_HOME'] = 'D:/Java/jdk' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/hadoop/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' spark = SparkSession.builder.master("local[2]").appName("单词统计").config( "spark.sql.shuffle.partitions", 2).getOrCreate() df = spark.read.text("../../data/wordcount/word.txt") df.createOrReplaceTempView("words") #给读出来的临时数据表一个名字 spark.sql(""" with t as ( select word from words lateral view explode(split(value," ")) as word where trim(word) != ' ') select word,count(1) num from t group by word order by count(1) desc """).show() # .show()是为了让结果呈现在控制台,不加就看不到结果 spark.stop()
2.2、电影评分统计
数据如下:
通过百度网盘分享的文件:电影数据
链接:https://pan.baidu.com/s/1KxjtjE_ylwRTD3ccqeGpdw?pwd=1234
提取码:1234
需求:统计评分次数大于2000次的所有电影中平均评分最高的Top10,结果显示电影名称、电影平均评分、电影评分次数。
标签:DataFrame,查询,RDD,SparkSQL,SQL,Spark,os From: https://blog.csdn.net/xieyichun_/article/details/143605872import os from pyspark.sql import SparkSession """ ------------------------------------------ Description : TODO: SourceFile : 03-关于电影评分的案例 Author : 懒大王 Date : 2024/11/4 ------------------------------------------- """ if __name__ == '__main__': # todo:0-设置系统环境变量 os.environ['JAVA_HOME'] = 'D:/Java/jdk' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ['HADOOP_HOME'] = 'D:/hadoop/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' with SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config( "spark.sql.shuffle.partitions", 2).getOrCreate() as spark: sc = spark.sparkContext df = sc.textFile("../../data/movies.dat").map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2]))\ .toDF(["movie_id","movie_name","movie_type"]) df.createOrReplaceTempView("movies") df2 = sc.textFile("../../data/ratings.dat").map(lambda line:(line.split("::")[0],line.split("::")[1],line.split("::")[2],line.split("::")[3]))\ .toDF(["user_id","movie_id","mark","mark_time"]) df2.createOrReplaceTempView("ratings") spark.sql(""" select m.movie_name movie_name,avg(mark) avgMark,count(1) marknum from movies m join ratings r on m.movie_id =r.movie_id group by m.movie_name having marknum >2000 order by marknum desc """).show()