要求:统计每一个商品的四种行为出现次数
案例
package SparkSQL.fun.project
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/**
* 统计每一个商品的四种行为出现次数,
* 效果:每种商品如果某个行为不存在,那么用0来表示,最后返回如下结果
*
* 自定义聚合函数完成--累加类型的聚合函数
* 1、输入的参数是behavior
* 2、输出的是一个Map
* goodsId,total_times
* |72 |[pv -> 2, buy -> 0, cart -> 0, fav -> 0] |
* |81 |[pv -> 13, buy -> 1, cart -> 1, fav -> 0]|
*/
object BehaviorCode1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("project01").setMaster("local[*]")
val session = SparkSession.builder().config(sparkConf).getOrCreate()
val map = Map("mode" -> "dropMalformed", "inferSchema" -> "true")
val frame = session.read.options(map).csv("G:\\shixunworkspace\\sparkcode\\src\\main\\java\\SparkSQL\\fun\\project\\b.csv")
val frame1 = frame.toDF("userId", "goodsId", "categoryId", "behavior", "time")
frame1.show()
import session.implicits._
// 将DataFrame转换成Dataset,一般Dataset中类型是Bean类型
val dataset: Dataset[UserBehaviorBean] = frame1.map((row) => {
UserBehaviorBean(row.getAs[Int](0),
row.getInt(1),
row.getInt(2),
row.getString(3),
row.getInt(4)
)
})
dataset.createTempView("temp")
session.udf.register("time", new BehaviorTimesFun)
// 当前sql语句的问题:如果某个商品没有某个行为的话,不会记录
val frame2 = session.sql("select goodsId, time(behavior) count from temp group by goodsId")
frame2.show(100, false)
session.stop()
}
}
class BehaviorTimesFun extends UserDefinedAggregateFunction {
override def inputSchema: StructType = {
StructType(Array(
StructField("input", DataTypes.StringType)
))
}
override def bufferSchema: StructType = {
StructType(Array(
StructField("sum",
DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType))
))
}
override def dataType: DataType = {
DataTypes.createMapType(DataTypes.StringType, DataTypes.LongType)
}
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map("pv"->0L, "buy"->0L, "cart"->0L, "fav"->0L)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
val str: String = input.getString(0) // str为行为,如pv, fav, 等
val map = buffer.getMap[String, Long](0)
val map1 = map.updated(str, map.getOrElse(str, 0L) + 1L)
buffer(0) = map1
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map = buffer2.getMap[String, Long](0)
for (elem <- map) {
val map1 = buffer1.getMap[String, Long](0)
val map2 = map1.updated(elem._1, map1.getOrElse(elem._1, 0L) + elem._2)
buffer1(0) = map2
}
}
override def evaluate(buffer: Row): Any = {
buffer.getMap[String, Long](0)
}
}
标签:map,聚合,自定义,val,session,四种,override,DataTypes,def
From: https://www.cnblogs.com/jsqup/p/16659668.html