首页 > 数据库 >SparkSQL 自定义聚合函数[弱类型]

SparkSQL 自定义聚合函数[弱类型]

时间:2024-01-14 15:44:21浏览次数:27  
标签:聚合 自定义 buffer getLong SparkSQL import return spark public

本文的前提条件: SparkSQL in Java

代码如下

1.自定义聚合函数

package cn.coreqi.udaf;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

/**
 * 自定义聚合函数类[弱类型],计算年龄的平均值
 * 1.集成UserDefinedAggregateFunction
 * 2.重写方法
 */
public class MyAvgUDAF1 extends UserDefinedAggregateFunction {

    /**
     * 定义输入数据的结构 - In
     * @return
     */
    @Override
    public StructType inputSchema() {
        return new StructType().add("age", DataTypes.LongType);   //年龄
    }

    /**
     * 缓存区数据的结构 - Buffer
     * 缓存区用于临时计算
     * @return
     */
    @Override
    public StructType bufferSchema() {
        return new StructType()
                .add("total",DataTypes.LongType)//累计的总年龄
                .add("count",DataTypes.LongType); //用户的数量
    }

    /**
     * 函数计算结果的数据类型 - Out
     * @return
     */
    @Override
    public DataType dataType() {
        return DataTypes.LongType;
    }

    /**
     * 函数的稳定性
     * 传入相同的参数结果是否相同
     * @return
     */
    @Override
    public boolean deterministic() {
        return true;
    }

    /**
     * 缓存区初始化
     * @param buffer
     */
    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0,0L);    //初始化 total 为 0
        buffer.update(1,0L);    // 初始化 count 为 0
    }

    /**
     * 数据加载到缓存区后如何更新缓存区中的值
     * @param buffer
     * @param input
     */
    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        buffer.update(0,buffer.getLong(0) + input.getLong(0));  // 缓存区中的 total + 输入数据中的 age[输入结构中只有age一个参数,因此使用索引0即可取出age的值]
        buffer.update(1,buffer.getLong(1) + 1); // count++
    }

    /**
     * 缓存区数据合并
     * 分布式计算,会有多个缓存区,最终多个缓存区需要合并到一起
     * @param buffer1
     * @param buffer2
     */
    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0));
        buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1));
    }

    /**
     * 计算逻辑,此处为计算平均值
     * @param buffer
     * @return
     */
    @Override
    public Object evaluate(Row buffer) {
        return buffer.getLong(0) / buffer.getLong(1);
    }
}

2.使用

package cn.coreqi;

import cn.coreqi.udaf.MyAvgUDAF1;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

public class Main {
    public static void main(String[] args) {
        // 创建SparkConf对象
        SparkConf sparkConf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("sparkSql");

        SparkSession spark = SparkSession
                .builder()
                .config(sparkConf)
                .getOrCreate();

        Dataset<Row> df = spark.read().json("datas/user.json");
        df.show();

        // DataFrames => SQL
        df.createOrReplaceTempView("user");

        spark.udf().register("ageAvg", new MyAvgUDAF1());
        spark.sql("select ageAvg(age) from user").show();

        // 关闭
        spark.close();
    }
}

标签:聚合,自定义,buffer,getLong,SparkSQL,import,return,spark,public
From: https://www.cnblogs.com/fanqisoft/p/17963774

相关文章

  • SparkSQL 自定义函数
    本文的前提条件:SparkSQLinJava参考地址:ScalarUserDefinedFunctions(UDFs)完整代码packagecn.coreqi;importstaticorg.apache.spark.sql.functions.udf;importorg.apache.spark.SparkConf;importorg.apache.spark.sql.*;importorg.apache.spark.sql.expres......
  • SparkSQL in Java
    参考地址:StartingPoint:SparkSession1.新建Maven项目,POM引入依赖<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version&......
  • Jmeter参数化-用户自定义变量
    一 首先我们先来了解下jmeter做参数化的目的:1通过参数化来集中管理配置和测试数据2通过参数化实现数据驱动测试 二线程组添加配置元件中的用户自定义变量 添加变量名称,变量值三使用变量:通过名称来进行引用 四用户定义变量的好处?jmeter中变量都是......
  • 基于VueCli自定义创建项目
    前面学习的一些router封装,相关文件夹的创建,现在可以通过脚手架自动创建,简化了很多步骤1,使用shell命令选择项目目录vuecreatexx-project步骤2,  步骤3  路由模式默认是hash模式,history模式需要服务器端相关配置支持,这里选n,后面有需要可以在配置文件改......
  • .NET中的加密算法总结(自定义加密Helper类续)
    .NET中的加密算法总结(自定义加密Helper类续) 1.1.1摘要       相信许多人都使用过.NET提供的加密算法,而且在使用的过程我们必须了解每种加密算法的特点(对称或非对称,密钥长度和初始化向量等等)。我也看到过很多人写过.NET中加密算法总结,但我发现个别存在一些问题,很......
  • 自定义jQuery插件Step by Step
    自定义jQuery插件StepbyStep 1.1.1摘要随着前端和后端技术的分离,各大互联网公司对于MobileFirst理念都是趋之若鹜的,为了解决网页在不同移动设备上的显示效果,其中一个解决方案就是ResponsiveDesign;但我们今天不是介绍它,正由于前端开发已经十分重要了,所以我们将介绍如何......
  • 智能电网中的安全数据聚合方案
    基于秘密分享实现参考:基于秘密共享和同态加密的隐私数据融合方案-陈信系统模型三层架构:电力供应商(PS)基站(BS)智能电表(SM)第三方聚合器(TPA)可信第三方机构(TA):生成和分发随机数控制中心(CC)敌手模型可信:可信第三方机构(TA)、控制中心(CC)、电力供应商(PS)半诚实:基站(BS)、智能电......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、
    文章目录Flink系列文章一、maven依赖二、示例:表的聚合操作1、示例代码公共部分2、groupby3、GroupByWindowAggregation4、OverWindowAggregation5、DistinctAggregation6、Distinct本文给出了关于表数据的聚合操作示例,比如groupby、distinct、以及groupby、over、distin......
  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以
    文章目录Flink系列文章一、maven依赖二、示例:表的join操作(内联接、外联接以及联接自定义函数等)本文介绍了表的join主要操作,比如内联接、外联接以及联接自定义函数等。本文除了maven依赖外,没有其他依赖。一、maven依赖本文maven依赖参考文章:【flink番外篇】9、FlinkTableAPI支......
  • Qt/C++编写视频监控系统83-自定义悬浮条信息
    一、前言一般视频控件上会给出个悬浮条,这个悬浮条用于显示分辨率或者一些用户期望看到的信息,一般常用的信息除了分辨率以外,还有帧率、封装格式、视频解码器名称、音频解码器名称、实时码率等,由于实际的场景不一样,用户希望能过自定义勾选开启哪些信息,开启的就显示,不开启的则可以不......