首页 > 编程语言 >Spark UDF/UDAF(JAVA)

Spark UDF/UDAF(JAVA)

时间:2022-12-24 16:47:16浏览次数:41  
标签:JAVA buffer apache UDF org spark import Spark public

UDF(User-Defined-Function)

UDF是用于处理一行数据的,接受一行输入产生一个输出,类似与map()算子,

UDAF(User- Defined Aggregation Funcation)

UDAF用于接收一组输入数据然后产生一个输出结果。
UDAF需要使用继承UserDefinedAggregateFunction的自定义类来实现功能,UserDefinedAggregateFunction中提供了8个抽象方法来帮助我们实现UDAF的构建。

public StructType inputSchema()
用于指定UDAF所输入数据的schmema的,也就是需要在这个方法类定义UDAF输入数据的字段的名称合字段的类型。

StructType bufferSchema()
因为UDAF是将数据进行聚合的,因此会使用到中间的临时变量进行数据存储,这个方法是用于定义这些中间的临时变量的Schema的。

DataType dataType()
这个方法是用于定义UDAF的返回结果的数据结构的。

boolean deterministic()
这个方法用于返回聚合函数是否是幂等的,即相同输入是否总是能得到相同输出。
为什么会有这个方法呢?这源于spark的推测执行(spark.speculation=true推测执行开启):推测执行是指对于Spark程序里面少部分运行慢的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例,从而加快运行速度。但是推测执行只有在函数是幂等的情况下才会这样运作,如果不是幂等的函数只会一直等待该Task执行。

void initialize(MutableAggregationBuffer buffer)
该方法用于初始化缓冲区的字段。

void update(MutableAggregationBuffer buffer, Row row)
该方法用于处理相同的executor间的数据合并,当有新的输入数据时,update用户更新缓存变量。

"void merge(MutableAggregationBuffer buffer, Row row)":
该方法用于不同excutor间已经进行初步聚合的数据进行合并。

"Object evaluate(Row row)":
通过前面的缓冲区完成聚合后,在这个方法里对聚合的字段进行最终的运算。

实例:

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;


public class MyUDAF extends UserDefinedAggregateFunction {
    private StructType inputSchema;
    private StructType bufferSchema;

    public MyUDAF() {
        List<StructField> inputFields = new ArrayList<>();
        inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.DoubleType, true));
        inputSchema = DataTypes.createStructType(inputFields);

        List<StructField> bufferFields = new ArrayList<>();
        bufferFields.add(DataTypes.createStructField("sum", DataTypes.DoubleType, true));
        bufferFields.add(DataTypes.createStructField("count", DataTypes.DoubleType, true));
        bufferSchema = DataTypes.createStructType(bufferFields);
    }

    //1、该聚合函数的输入参数的数据类型
    public StructType inputSchema() {
        return inputSchema;
    }

    //2、聚合缓冲区中的数据类型.(有序性)
    public StructType bufferSchema() {
        return bufferSchema;
    }

    //3、返回值的数据类型
    public DataType dataType() {
        return DataTypes.DoubleType;
    }

    //4、这个函数是否总是在相同的输入上返回相同的输出,一般为true
    public boolean deterministic() {
        return true;
    }

    //5、初始化给定的聚合缓冲区,在索引值为0的sum=0;索引值为1的count=1;
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, 0D);
        buffer.update(1, 0D);
    }

    //6、更新
    public void update(MutableAggregationBuffer buffer, Row input) {
        //如果input的索引值为0的值不为0
        if (!input.isNullAt(0)) {
            double updateSum = buffer.getDouble(0) + input.getDouble(0);
            double updateCount = buffer.getDouble(1) + 1;
            buffer.update(0, updateSum);
            buffer.update(1, updateCount);
        }
    }

    //7、合并两个聚合缓冲区,并将更新后的缓冲区值存储回“buffer1”
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        double mergeSum = buffer1.getDouble(0) + buffer2.getDouble(0);
        double mergeCount = buffer1.getDouble(1) + buffer2.getDouble(1);
        buffer1.update(0, mergeSum);
        buffer1.update(1, mergeCount);
    }

    //8、计算出最终结果
    public Double evaluate(Row buffer) {
        return buffer.getDouble(0) / buffer.getDouble(1);
    }
}

main函数:

import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;

import java.math.BigDecimal;

public class UDAFJAVA {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("RunMyUDAF")
                .master("local")
                .getOrCreate();
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("ERROR");

        // Register the function to access it
        spark.udf().register("myAverage", new MyUDAF());

        Dataset<Row> df = spark.read().json("D:\\02Code\\0901\\sd_demo\\src\\data\\udaf.json");
        df.createOrReplaceTempView("employees");
        df.show();

        //保留两位小数,四舍五入
        spark.udf().register("twoDecimal", new UDF1<Double, Double>() {
            @Override
            public Double call(Double in) throws Exception {
                BigDecimal b = new BigDecimal(in);
                double res = b.setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                return res;
            }
        }, DataTypes.DoubleType);

        Dataset<Row> result = spark
                .sql("SELECT name,twoDecimal(myAverage(salary)) as avg_salary FROM employees group by name");
        result.show();
        spark.stop();
    }
}

udaf.json:

{"name":"Michael","salary":0}
{"name":"Andy","salary":4537}
{"name":"Justin","salary":3500.0}
{"name":"Berta","salary":0}
{"name":"Michael","salary":3000.0}
{"name":"Andy","salary":4500.0}
{"name":"Justin","salary":3500.0}
{"name":"Berta","salary":4000.0}
{"name":"Andy","salary":4500.0}

标签:JAVA,buffer,apache,UDF,org,spark,import,Spark,public
From: https://www.cnblogs.com/liuyechang/p/17002996.html

相关文章

  • 项目实战:JavaFX + Netty 仿微信聊天程序(注册、登录、查好友、聊天)
    记录一下使用JavaFX+Netty开发仿微信聊天程序---米虫IM。功能需求米虫IM已经完成的功能有:用户注册功能用户登录功能搜索好友功能添加好友功能文本聊天功能离......
  • Java 初步
    1972年C诞生:指针和内存管理令人头疼1982年C++诞生1995年Java诞生:没有指针、没有内存管理、可移植性(JVM),刚开始叫C++——,后来叫oak,后来正式命名为javaJava2标准版(J2SE......
  • java美食论坛系统发帖子系统美食论坛网站美食分享论坛源码
    ssm开发的美食论坛系统,用户注册之后可以发布关于美食的帖子,其他人可以回帖,评论,点赞回复和评论,分为楼主,第一楼,第二楼等。可以再个人中心查看我对别人的回复,以及别人对我的回......
  • java论坛贴子网站ssm论坛项目发帖子网站论坛系统论坛源码
    ssm开发的论坛系统,用户注册后可以发布帖子,其他人可以评论回复点赞评论和点赞回复,用户可以在个人中心管理自己的帖子,以及查看自己对他人的回复,和他人对自己的回复。演示视......
  • java基于ssh的旅游系统
    本项目主要发西安各个旅游景点和附近酒店信息的网站,用户可以根据旅游团一起旅游,可以也可以自驾游,还可以发布旅游活动等。演示视频https://www.bilibili.com/video/BV1wv4......
  • Java接口
    什么是接口?接口是一种约定,提供给需要实现功能类的一种方式,实现接口必须实现接口的抽象方法如何创建接口?通过interface关键字创建接口例如:publicinterface接口名{......
  • Java异常
    什么是异常?指程序在运行过程中发生的不正常事件,他会中断程序运行打印错误信息语句:System.err.println("");系统强制退出:System.exit(1);异常的关键字:try执行可能......
  • JAVA面向对象
    JAVA面向对象面向对象编程(OOP,Object-OrientedProgramming)以类的方式组织代码,以对象的形式封装数据三大特性封装继承多态类与对象的关系类是一种抽象的数据类......
  • JavaScript:类(class)
    在JS中,类是后来才出的概念,早期创造对象的方式是newFunction()调用构造函数创建函数对象;而现在,可以使用newclassName()构造方法来创建类对象了;所以在很多方面,类的使用方......
  • 初识java
    配置完环境之后,让我们来写出第一个Java程序吧!Hello,world!创建一个后缀名为.java的文件,并使用下载好的文本编辑器打开创建一个Java基本类publicclassHello{//基本......