首页 > 数据库 >sparksql概念补充

sparksql概念补充

时间:2022-08-26 18:56:54浏览次数:52  
标签:shuffle name 补充 age 概念 sparksql sql spark 内存

Spark-sql概念补充

基本概念
        SparkSQL是基于RDD的,可以通过Schema信息来访问其中某个字段
        RDD处理的不是结构化数据,所以不能进行类似HIve逻辑优化器的优化操作(条件传播)
        SparkSQL默认读取的类型都是 DataFrame 
Catalyst优化器
        1.解析SQL,并解析成AST(抽象语法树)
        2.在树里面加入元数据信息
            scan(peolple) scan(score)  =>  join  =>  filter  =>  peoject  => Aggregate(sum(v))
            id#1#l  名称 列下标 数据类型
        3.优化器 优化(hive 优化)   最后经过成本优化
            val qe = data.queryExecution
            println(qe)
            == Parsed Logical Plan == //解析逻辑执行计划
            'Project ['gaid, unresolvedalias('from_unixtime('submit_at, yyyyMMdd), None)]
            +- 'UnresolvedRelation `data`

            == Analyzed Logical Plan ==//分析逻辑计划
            gaid: string, from_unixtime(CAST(submit_at AS BIGINT), yyyyMMdd): string
            Project [gaid#10, from_unixtime(cast(submit_at#11 as bigint), yyyyMMdd, Some(Asia/Shanghai)) AS from_unixtime(CAST(submit_at AS BIGINT), yyyyMMdd)#14]
            +- SubqueryAlias `data`
            +- Relation[gaid#10,submit_at#11] csv

            == Optimized Logical Plan ==//优化逻辑计划
            Project [gaid#10, from_unixtime(cast(submit_at#11 as bigint), yyyyMMdd, Some(Asia/Shanghai)) AS from_unixtime(CAST(submit_at AS BIGINT), yyyyMMdd)#14]
            +- Relation[gaid#10,submit_at#11] csv

            data.explain()
            == Physical Plan ==//物理执行计划
            *(1) Project [gaid#10, from_unixtime(cast(dt#11 as bigint), yyyyMMdd, Some(Asia/Shanghai)) AS from_unixtime(CAST(dt AS BIGINT), yyyyMMdd)#27]
            +- *(1) FileScan csv [gaid#10,dt#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/data/aa/data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<gaid:string,dt:string>
DataSet(结构化数据 自定义的数据类) 转成DataSet:.toDS()
        特点:
            强类型API  自定义数据类  
            若类型API  可以直接访问类的属性 当做字符串
            可以直接在类中使用SQL( 类的属性 < 10) 当做一列
        DataSet[] 支持优化器优化
            执行计划中都是RDD[InternalRow] 数据结构  
            将数据结构保存下来,在集群中运行
            RDD[InternalRow] = dataset.queryExecution.tordd  //获取的是已经分析和解析过的 dataset 的执行计划中拿到的
            RDD[Persion] = dataset.rdd //将底层的RDD[InternalRow] 通过Decoder转成了和 dataset 一样的RDD
DataFrame(固定类型的列值,类似关系型数据库,行和列 Row +Schema )
        创建方式:1.toDF() 底层DatasetHolder  2.createDataFrame  3.read
DataSet DataFrame区别
        1.DataFrame就是DataSet      DataFrame = Dataset[ROW]   DataSet可以是任意的类型(自定义类)
        2.DataFrame弱类型(ROW)   DataSet强类型 
            df.map((row:Row)=>Row(row.get(0),row.get(1)))(RowEncoder.apply(df.schema)) //df输入是ROW输出也是ROW
            ds.map((persion:Persion)=>Persion(name,age)) //是具体的类型
        3.DataFrame:编译时候不安全    DataSet:编译时候安全的
    Row对象的操作
        1.Row必须配合Schema对象才有列名  val row = Row("aa",1)  row.getString(1)  支持样例类
DataFrameReader DataFrameWriter
        DataFrameReader  
            option:delimiter 分隔符,默认为逗号|nullValue 指定一个字符串代表 null 值|quote 引号字符,默认为双引号|
            header 第一行不作为数据内容,作为标题|inferSchema 自动推测字段类型|gnoreLeadingWhiteSpace 裁剪前面的空格|
            ignoreTrailingWhiteSpace 裁剪后面的空格|nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可|multiline  运行多列,超过62 columns时使用|
            encoding   指定編码,如:gbk  / utf-8  Unicode  GB2312|
        DataFrameWriter
            mode "error":目标存在报错  "append":添加  "overwrite":覆盖  "ignore":存在就不报错
            partitionBy分区字段   bucketBy桶表字段  sortBy排序子段
        json toJSON 将字符串转成json   
Spark 数据操作
        有类型转换算子
            flatMap() map() mapPartitions(iter=>{  }) 
            transform(dataset => dataset.withColumn("double","abcd")) //将一个dataset转换成另一个dataset
            as[Student]  //DataFrame 转换成 DataSet 
            filter(col("a")>10).show(10,false);//过滤条件
            groupByKey(perison=>perison.name) //需要把一列转换成key  转换以后才能进行agg cogroup reduce count
            randomSplit()  sample()
            ds.orderBy('false'.desc desc_nulls_first desc_nulls_last )   sd.sort('age'.asc)//排序
            distinct()//去除所有重复列  ds.dropDuplicates('')//按照某一列去重
            except() //除了   intersect//交集   union//并集
        无类型转换算子
            select //查询结果  可以在任何位置   selectExpr(sum(age)) //表达式格式
                import org.apache.spark.sql.functions._ select(expr("sum(age)"))
            withColumn("double","abcd")  withColumn("double","abcd"===NULL)//新建列    withColumnRenamed('name','name_new')//重命名列
            drop("")//删除某列    
            groupBy() //groupByKey生成的算子是有类型的   生成的算子是无类型的
        Column对象
            as ds.select('name' as 'new_name')//别名  as ds.select('name'.as[Int])//类型转换
            ds.withColumn('double','age'*(2))   //新增列    ds.where('aa' like "zhang%")//模糊查询
            ds.sort("age".desc).show()//排序   ds.where("name" isin("zhangsan","lisi") )//枚举
        缺失值
            类型:1.null、NaN等特殊类型的值  2."Null","NA"," "等字符串缺失值
            处理框架:DataFrameNaFuncions   .na.drop     .na.fill(0,List("name","age"))
            处理:执行schema类型处理空置,将缺失值转换对对应对象下的NaN
                select(when("PM" === "NA",Double.Nan).otherwise('PM' cast DoubleType) ).as("PM")
                .na.replace("NA",Map("NA" -> "NaN"))//原始类型必须和处理后的类型一致
        聚合操作 (返回的类型都是 RelationalGroupedDataset)
            ds.groupBy("name","age").agg(avg("pm") as "pmavg")
            ds.avg("pm").select(col("avg(pm)") as "pm_avg")
            rollup("name","age").agg(sum("amount") as "amount") //第一列必须存在group by name,age/name/null 
            cube("name","age").agg(sum("amount") as "amount")   //全排列  group by name,age/name/age/null

            grouping sets sql格式:"group by name,age grouping sets ((name,age),name,age,())"
            cube sql格式:"group by cube(name,age)"
            rollup sql格式:"group by rollup(name,age)"
            RelationalGroupedDataset
        连接操作
            data.join(data1,data.col("id")===data1.col("id"), )
            joinType:"inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti"
            cross:笛卡尔积  inner:交叉连接  full full_outer:全外连接  left_semi:只显示左边连接上的   left_anti:只显示左侧关联不上的
        udf
            val toStrUdf = udf(aa _)   .select(toStrUdf("name"))
        窗口函数
            val window = Window.partitionBy("name").orderBy("age")
            .select(dense_rank() over window as "rank").where("rank"<2)
            sql格式:("dense_rank() over(partation by name order by age) as 'rank' ")
            ROWS between 2000 preceding and 1000 following //前一天数据和有一条数据
            rank() 相同排名 名次相同,后面+2
            dense_rank() 相同排名 名次相同,后面+1
            row_number()  first_value() last_value()  lag()前多少条  lead()后多少条
数据倾斜处理
        1.etl处理  2.并行度  
        3.过滤倾斜key(需要null,聚合需求(随机分区预聚合,最终hash散列聚合) 
            拆分(单独拿出来处理,最终union起来)) 
        4.reduce join 转mapjoin  5.两阶段聚合  
        6.无前缀聚合(随机前缀:大表加随机前缀 小表*3)自定义Partation
Spark Shuffle 介绍
        mapreduceShuffle当中:
            1.如果有reduce阶段就一定会进行shuffle
            2.如果有shuffle,那就一定会按照key排序
        spark优化:
            1.不用排序的shuffle   2.需要排序的shuffle 

        hashShuffleManager:生成为解决文件数=executor*hash
        优化的hashShuffleManager:
            Consolidate机制: 复用 ShuffleBlockFile 文件通过索引标记偏移量  1.NIO方式,通过Socket读取数据 2.Netty去获取数据
        SortShuffleManager:(map端聚合就排序,map端不聚合尽量不排序)
            map(聚合操作) array(join算子)      内存排序分批10000条写入磁盘   会产生多个数据文件
                最终将多个文件进行合并  一个数据文件  一个索引文件
        SortShuffleManager-bypass:
            上同  但是不会进行排序操作 会进行文件的合并 触发机制:spark.shuffle.sort.bypassMergeThreshold 不是聚合类算子
Spark性能调优非参数
        尽可能用一个,缓存,避免重复(基于内存的文件系统tachyon)
        存储内存、执行内存
        避免使用shuffle算子  (map join)
        map预聚合算子(reduceByKey/aggregateByKey替代groupByKey)  rdd

        mapPartitions替代普通map
        foreachPartitions替代foreach
        filter之后进行coalesce
        repartitionAndSortWithinPartitions替代repartition与sort
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //需要用户自定义序列化函数 默认不开启
        优化数据结构:尽量使用简单类型。使用fastutil  堆外内存(负责类型数据)
Spark性能调优参数
        yarn Container内存分配:
            yarn.nodemanager.resource.memory-mb=64G    //nodemanager分配给Container的资源内存大小
            yarn.nodemanager.resource.cpu-vcores=16    //nodemanager分配给Container的资源cpu核数
            yarn.scheduler.maximum-allocation-mb=64G   //yarn Container能够使用的最大资源
            yarn.scheduler.minimum-allocation-mb=512M  //yarn Container能够使用的最小资源

        spark 静态资源内存分配:
            spark.shuffle.memoryFraction    分配给shuffle read task进行聚合操作的内存比例,默认是20%
            spark.executor.memory(Exexution&Storage 计算存储)   spark.executor.memoryOverhead(JVM的额外开销)
            spark.executor.memory(10) : spark.executor.memoryOverhead(1)

        spark 动态资源内存分配:
            spark.dynamicAllocation.enabled=true   //动态分配资源
            spark.shuffle.service.enabled=true     //Executors完成以后资源就关闭了 需要这个服务保存文件
            spark.dynamicAllocation.initialExecutors=true     //初始化Executors 数量
            spark.dynamicAllocation.minExecutors=true     //最小 Executors 数量
            spark.dynamicAllocation.maxExecutors=true     //最大 Executors 数量
            spark.dynamicAllocation.executorIdleTimeout=60s //Task执行完成等待这个时间就关掉
            spark.dynamicAllocation.schedulerBacklogTimeout  1s task等待时长

        数据拉取 缓冲区:
            spark.shuffle.file.buffer 32K  shuffle write 缓冲区大小
            spark.reducer.maxSizeInFlight  shuffle read 拉取数据大小
            spark.shuffle.io.maxRetries     拉取最大次数
            spark.shuffle.io.retryWait      拉取间隔时间
            spark.shuffle.memoryFraction    分配给shuffle read task进行聚合操作的内存比例
            //read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数 不需要map预聚合  自动开启bypass
            spark.shuffle.sort.bypassMergeThreshold   如果shuffle read task 小于200 则不会进行排序
            spark.shuffle.consolidateFiles            开启优化后的hashShuffleManager
            PROCESS_LOCAL(数据本地)、NODE_LOCAL(节点)、RACK_LOCAL(机架)、ANY
            spark.locality.wait(6,10)
            spark.locality.wait.peocess(60)
            spark.locality.wait.node(30)
            spark.locality.wait.rack(20)

        广播大变量(containner内存大小)
        执行计划:
            explain 物理执行计划 | explain(mode=extended) 逻辑和物理执行计划 | 
            explain (mode=codegen) 可执行java代码  |   explain (mode=formatted) 格式化物理执行计划

        内存计算:
            spark.mamory.fraction = storage+Execution/(storage+Execution+Other)
            spark.mamory.storageFraction = storage/storage+Execution
            storage堆内内存=(spark.executor.memory-300M)*spark.mamory.fraction*spark.mamory.storageFraction
            Execution内存 = spark.executor.memory-300M - storage堆内内存

        分区个数:
            spark.default.parallelism=200  rdd使用的
            spark.sql.shuffle.partitions=200 sparksql使用

        CBO 执行成本:数据本身的特点,操作的特点
            ANALYZE TABLE data COMPUTE STATISTICS /desc table data;//表信息统计 
            ANALYZE TABLE data COMPUTE STATISTICS FOR COLUMNS a,b,c / desc table data a;//列信息统计
            spark.sql.statistics.rawDataSize 解压到内存可能需要的空间  spark.sql.statistics.totalSize HDFS文件大小
            spark.sql.statistics.numRows 行数       //按照某一列进行存储
            spark.sql.cbo.enabled=true 
            spark.sql.cbo.joinReorder.enabled=true     //自动调整 inner join顺序
            spark.sql.cbo.joinReorder.dp.threshold=12   //多少表才进行调整,超过不调整

        桶表相关:
            sore merge bucket:两表分桶,桶的个数相同。join列=排序列=分桶列

        输出文件合并:
            spark.sql.files.maxParttionBytes=128M    //
            spark.files.openCostinByte=4M 文件开销不能太大  
Spark 3.X 新特性
        AQE:基于动态的统计信息,动态修改执行计划。
            1.Shuffle分区数量  2.join策略  3.动态优化倾斜join
            spark.sql.adaptive.enabled=true AQE是否开启
            spark.sql.adaptive.coalescePartitions.enabled=true 动态分区是否开启
            spark.sql.adaptive.coalescePartitions.initialPartitionNum=100 初始化并行度
            spark.sql.adaptive.coalescePartitions.minPartitionNum=10    //最小分区数
            spark.sql.adaptive.advisoryPartitionSizeInBytes=256M 合并后分区大小

            spark.sql.autoBroadcastJoinThreshold=10M broadcast的数据大小
            spark.sql.adaptive.skewJoin.enabled=true 是否开启数据倾斜

            spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 数据倾斜判断因子
            spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256M 数据倾斜判断大小 (>中位数大小 * 因子 & >当前值)

        动态分区裁剪:先扫描小表,在进行大表的分区裁剪
            spark.sql.optimizer.dynamicPartitionPruning.enabled=true 动态分区裁剪
        加速器感知调度(GPU 加速)  PostgreSQL(数据库)  

标签:shuffle,name,补充,age,概念,sparksql,sql,spark,内存
From: https://www.cnblogs.com/wuxiaolong4/p/16628841.html

相关文章

  • Spark-core概念补充
    Spark-core概念补充Spark相对于hadoop的优势        1.减少IO(如非必要不落盘)        2.并行度(多线程)        3.中间结果持久化(多次使用)......
  • 创建SparkSQL的项目
    创建项目方式和前面一样pom依赖不一样无需导入spark_core包,因为spark_sql中包含了spark_corepom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="h......
  • SparkSQL支持的数据源
    1.SparkSQL支持的数据源HiveScala内存中数据--集合支持从RDD读取数据作SQL操作支持从外部存储文件读取数据json,csv,普通结构文本文件支持从关系型数据库读取数据处理......
  • 第一章 模式识别的相关概念学习笔记
    1  相关概念1.1 什么是模式?可以被区分是否相似,存在于时间和空间中可观察的物体之中的信息。(模式不是事务本体,是从事物中获取的信息)1.2 模式的直观特性可观察性......
  • JavaScript基础回顾知识点记录7-事件补充说明2
    js中鼠标滚轮事件offsetWidth/offsetHeight-对象的可见宽度/高度clientWidth/clientHeight-内容的可见宽度/高度scrollWidth/scrollHeight......
  • 反射案例以及注解的概念
    反射案例需求:写一个"框架",可以帮我们创建任意类的对象,并且执行其中的任意方法实现:实现文件反射,步骤:将需要创建对象的全类名和需要......
  • K8s - Kubernetes重要概念介绍(Cluster、Master、Node、Pod、Controller、Service、Nam
    Kubernetes 是目前发展最快、市场占有率最高的容器编排引擎产品,并且还在快速地开发和迭代之中。我们在学习 Kubernetes 之前,需要理解它的几个重要概念,它们是组成 Kuber......
  • vue3 基础-应用app和组件基本概念
    这篇主要对vue最基础的应用程序Application和组件Components进行一个简要和直观的认知,具体要分析的代码如下:<!DOCTYPEhtml><htmllang="en"><head><metac......
  • 有关整型数据类型的基本概念梳理
    我们都知道C语言中拥有非常丰富的整型数据类型,如字符型char,短整型short,整型int......。它们每个所能表示的整数大小是不一样的,而其中也有一些复杂的转换关系,那么今天这......
  • maven依赖管理的概念、仓库的种类和彼此关系
    maven依赖管理的概念图解: 仓库的种类和彼此关系 ......