首页 > 其他分享 >spark中的聚合操作和分组操作

spark中的聚合操作和分组操作

时间:2023-02-13 22:45:19浏览次数:49  
标签:count 聚合 df 分组 override 操作 spark def

聚合操作

注意:任何的聚合操作都有默认的分组,聚合是在分组的基础上进行的。比如,对整体进行求和,那么分组就是整体。所以,在做聚合操作之前,一定要明确是在哪个分组上进行聚合操作
注意:聚合操作,本质上是一个多对一(一对一是多对一的特殊情况)的操作。特别注意的是这个’一‘,可以是一个值(mean, sum等),同样也可以是一个对象(list, set等对象)

聚合函数

除了DataFrame的某些操作或者通过.stat访问方法,所有的聚合操作都是以函数的方式出现的。大多数聚合函数可以在org.apache.spark.sql.functions中找到

  • count函数
    使用的方向:
    • 对指定列进行计数
    • 使用count(*)或者count(1)对所有列进行计数
  • countDistinct(统计不同的值得数量)
  • approx_count_distinct
    对统计的精度要求不高使用它,注意:approx_count_distinct带了另一个参数,该参数指定可容忍的最大误差。本例中我们指定了一个相当大的误差率,因此得到的答案与正确值差距很大,但执行速度更快,比countDistinct函数执行耗时更少。档处理更大的数据集的时候,这种提升会更加明显。

聚合输出复杂类型

spark的聚合还可以将某列上的数值聚合到一个list中,或者将唯一值聚合到set集合中。
案例:将国家列直接生成list列和set列

    val path="/Volumes/Data/BigData_code/data/retail-data/all/*.csv"
    //读取数据
    val df = spark.read.format("csv").option("header", "true").option("inferSchema", "true")
      .load(path).coalesce(5)
    df.cache()
    df.createOrReplaceTempView("dfTable")
    df.show()
    //将Country聚合成set列和list列
    df.agg(collect_set("Country").as("CountrySet"), collect_list("Country").as("CountryList")).show()

分组操作

  • 使用表达式分组
  • 使用Map进行分组
      //使用表达式分组
      df.groupBy("InvoiceNo").agg(
        count("Quantity").as("quan"),     //使用函数方式
        expr("count(Quantity)")     //使用字符串表达式
      ).show()
      //使用Map进行分组
      df.groupBy("InvoiceNo").agg("Quantity"->"count", "Quantity"->"stddev_pop").show()
    
  • window函数
    window函数的使用,请看这篇博客:https://blog.csdn.net/weixin_38653290/article/details/83962789

分组集---(挖个坑)P133

用户自定义的聚合函数

使用UDAF来计算输入数据组(与单行相对)的自定义计算。
若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法:

  • inputSchema用于指定输入参数,输入参数类型为StructType
  • bufferSchema用于指定UDAF中间结果,中间结果类型为StructType。
  • dataType用于指定返回结果,返回结果的类型为DataType。
  • deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果。
  • initialize初始化聚合缓冲区的初始值
  • update描述应如何根据给定行更新内部缓冲区。
  • merge描述应如何合并两个缓冲区
  • evaluate将生成聚合最终结果
    例子:实现自定义聚合函数BoolAnd,它将返回所有行是否为true
class BoolAnd extends UserDefinedAggregateFunction{
  //指定输入参数
  override def inputSchema: StructType = StructType(
    StructField("Value", BooleanType)::Nil
  )
  //用于指定UDAF中间结果,中间结果使用StructType
  override def bufferSchema: StructType = StructType(
    StructField("value", BooleanType)::Nil
  )
  //用于指定返回结果,返回结果为DataType
  override def dataType: DataType = BooleanType
  //此UDAF对某个输入是否会返回相同的结果
  override def deterministic: Boolean = true
  //初始化聚合缓冲区的初始值
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0)=true
  }
  //描述如何根据给定行更新内部缓冲区
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0)=buffer.getAs[Boolean](0)&&input.getAs[Boolean](0)
  }
  //描述如何聚合两个内部缓冲区
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0)=buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  //生成聚合的最终结果
  override def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

实例化BoolAnd类,并将其注册为一个函数:

    //准备数据
    val df = spark.range(1).selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
      .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
    df.show()
    //实例化类,注册为udaf
    val ba = new BoolAnd
    spark.udf.register("booland", ba)
    df.select(ba(col("t")), expr("booland(f)")).show()

标签:count,聚合,df,分组,override,操作,spark,def
From: https://www.cnblogs.com/ALINGMAOMAO/p/17118122.html

相关文章

  • spark读写文件
    valpath="/Volumes/Data/BigData_code/data/retail-data/by-day/2010-12-01.csv"spark.read.format("csv").option("header","true").option("inferSchema","t......
  • spark的DataFrame的schema模式:读时模式, 指定模式
    读时模式valpath="/Volumes/Data/BigData_code/data/"//读取json生成dataframevaldf=spark.read.format("json").load(path+"flight-data/json/2015-......
  • spark DataFrame聚合操作
    在聚合操作中,需要指定键或分组方式,以及指定如何转换一列或多列数据的聚合函数。s除了处理任意类型的值之外,Spark还可以创建以下分组类型:最简单的分组通过在select语句......
  • python pandas库总结-数据分析和操作工具
    参考:https://pandas.pydata.org/Input/output相关函数pandas.read_excel—将Excel文件读入pandas数据框支持读取xls,xlsx,xlsm,xlsb,odf,ods和odt文件扩展名,支持单......
  • STM32CubeMX操作
    1.更改库安装路径    2.下载库  这里下载F1 ......
  • 9.4 操作系统和高级编程语言使硬件抽象化
    通过使用操作系统提供的系统调用,程序员就没必要编写直接控制硬件的程序了。通过使用高级编程语言,有时甚至也无需考虑系统调用的存在。这是因为操作系统和高级编程语言能够......
  • 9.2 要意识到操作系统的存在
    制作应用的程序员们意识到一点:制作的不是硬件,而是利用操作系统功能的应用。代码清单9-1表示的是,在Windows操作系统下,用C语言制作一个具有表示当前时间功能的应用。time......
  • 9.1 操作系统功能的历史
    操作系统的原型:仅具有加载和运行功能的监控程序。通过事先启动监控程序,程序员就可以根据需要的各种程序加载到内存中运行。如下图: 在利用监控程序编写程序的过程中,发......
  • 9.5Windows操作系统的特征
    Windows操作系统的主要特征如下所示。   (1)32位操作系统(也有64位版本)    (2)通过API函数集来提供系统调用    (3)提供采用了图形用户界面的用户界面 ......
  • 9.4操作系统和高级编程语言使硬件抽象化
       通过使用操作系统提供的系统调用,程序员就没必要编写直接控 制硬件的程序了。而且,通过使用高级编程语言,有时甚至也无需考虑系统调用的存在。这是因为操作系统和高......