首页 > 其他分享 >自定义聚合函数(统计每一个商品的四种行为出现次数)

自定义聚合函数(统计每一个商品的四种行为出现次数)

时间:2022-09-05 21:34:32浏览次数:70  
标签:map 聚合 自定义 val session 四种 override DataTypes def

要求:统计每一个商品的四种行为出现次数

案例

package SparkSQL.fun.project

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
 * 统计每一个商品的四种行为出现次数,
 *  效果:每种商品如果某个行为不存在,那么用0来表示,最后返回如下结果
 *
 *  自定义聚合函数完成--累加类型的聚合函数
 *    1、输入的参数是behavior
 *    2、输出的是一个Map
 *    goodsId,total_times
 *    |72     |[pv -> 2, buy -> 0, cart -> 0, fav -> 0] |
 *    |81     |[pv -> 13, buy -> 1, cart -> 1, fav -> 0]|
 */
object BehaviorCode1 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("project01").setMaster("local[*]")
    val session = SparkSession.builder().config(sparkConf).getOrCreate()
    val map = Map("mode" -> "dropMalformed", "inferSchema" -> "true")

    val frame = session.read.options(map).csv("G:\\shixunworkspace\\sparkcode\\src\\main\\java\\SparkSQL\\fun\\project\\b.csv")
    val frame1 = frame.toDF("userId", "goodsId", "categoryId", "behavior", "time")
    frame1.show()

    import session.implicits._
    // 将DataFrame转换成Dataset,一般Dataset中类型是Bean类型
    val dataset: Dataset[UserBehaviorBean] = frame1.map((row) => {
      UserBehaviorBean(row.getAs[Int](0),
        row.getInt(1),
        row.getInt(2),
        row.getString(3),
        row.getInt(4)
      )
    })
    dataset.createTempView("temp")

    session.udf.register("time", new BehaviorTimesFun)
    // 当前sql语句的问题:如果某个商品没有某个行为的话,不会记录
    val frame2 = session.sql("select goodsId, time(behavior) count from temp group by goodsId")
    frame2.show(100, false)

    session.stop()
  }
}

class BehaviorTimesFun extends UserDefinedAggregateFunction {
  override def inputSchema: StructType = {
    StructType(Array(
      StructField("input", DataTypes.StringType)
    ))
  }

  override def bufferSchema: StructType = {
    StructType(Array(
      StructField("sum",
        DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType))
    ))
  }

  override def dataType: DataType = {
    DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType)
  }

  override def deterministic: Boolean = true

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Map("pv"->0L, "buy"->0L, "cart"->0L, "fav"->0L)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val str: String = input.getString(0) // str为行为,如pv, fav, 等
    val map = buffer.getMap[String, Long](0)
    val map1 = map.updated(str, map.getOrElse(str, 0L) + 1L)
    buffer(0) = map1
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val map = buffer2.getMap[String, Long](0)
    for (elem <- map) {
      val map1 = buffer1.getMap[String, Long](0)
      val map2 = map1.updated(elem._1, map1.getOrElse(elem._1, 0L) + elem._2)
      buffer1(0) = map2
    }
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getMap[String, Long](0)
  }
}

标签:map,聚合,自定义,val,session,四种,override,DataTypes,def
From: https://www.cnblogs.com/jsqup/p/16659668.html

相关文章

  • 自定义聚合函数(统计每种行为的触发次数排名前三的商品id)
    packageSparkSQL.fun.projectimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.expressions.{MutableAggregationBuffer,UserDefinedAggregateFunction......
  • 八、Spring Boot 实现一个自定义start
    http://springboot.javaboy.org/2019/0520/springboot-starterstarter的作用用来做依赖导入、自动配置Starter的核心就是条件注解@Conditional当classpath下存在......
  • 八、Spring Boot 中自定义 SpringMVC 配置
    转发:https://www.javaboy.org/2019/0816/spring-boot-springmvc.html先说结论,使用Java8的,自定义配置使用实现WebMvcConfigurer接口,Java8之前使用WebMvcConfigurerAdapte......
  • 【WPF】自定义用户控件 代有字数限制的输入框
    最终效果  用户控件WPF用户控件继承UserControl类,其行为与WPF窗口非常相似:有一个XAML文件和一个代码后置文件。在XAML文件中,您可以添加现有的WPF控件以创建所需的外......
  • 四种内存泄漏方法总结和比较
     关注:QStockView,获取股票智能分析报警软件目录1     简介...12     检测内存泄漏的方法...12.1    VS2015性能探测器...12.2    VLD嵌入......
  • Flask 学习-47.Flask-RESTX 自定义响应内容marshal_with
    前言Flask-RESTX提供了一种简单的方法来控制您在响应中实际呈现的数据或期望作为输入有效负载的数据。使用该fields模块,您可以在资源中使用所需的任何对象(ORM模型/自定......
  • springboot聚合项目搭建
    springboot聚合项目搭建1、简介1.1、什么是聚合项目?一个项目中包含多个子项目的项目。结构:|-父模块---|子模块1---|子模块2---|子模块31.2、聚合项目有什么......
  • 自定义反应钩子。它们是什么以及它们有什么帮助?
    自定义反应钩子。它们是什么以及它们有什么帮助?Photoby劳塔罗·安德里亚尼on不飞溅介绍ReactHooks是在React.jsv16.8中引入的。它们允许我们在功能组件中使......
  • Java自定义注解
    简介注解是一种能被添加到java源代码中的元数据,方法、类、参数和包都可以用注解来修饰。注解可以看作是一种特殊的标记,可以用在方法、类、参数和包上,程序在编译或者运行时......
  • 【WPF】自定义PassWordBox (可以绑定的) 、SecureString类型吗?
    目的自定义一个可以绑定的密码输入框知识点:自定义控件、  SecureString类型System.Security.SecureString(表示应保密的文本)保存非托管内存中,需要用指针逐个字......