首页 > 数据库 >在Kotlin中使用Spark SQL的UDF和UDAF函数

在Kotlin中使用Spark SQL的UDF和UDAF函数

时间:2024-08-25 12:25:37浏览次数:14  
标签:name Buffer Kotlin sql UDAF UDF sparkSession fun spark

1. 项目结构与依赖

1.1 项目依赖

使用gradle:

在项目的build.gradle.kts添加

dependencies {
    implementation("org.apache.spark:spark-sql_2.12:3.3.1")
}

使用maven:

在模块的pom.xml中添加

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.3.1</version>
        </dependency>

2. UDF的使用与实现

UDF,即用户自定义函数,允许用户在SQL查询中使用自定义的函数。下面案例做了一个简单的案例,将首字母变为大写。

2.1 数据源

准备数据源使用JSON数据作为数据格式,保存到项目的根路径下的`data/user.txt`文件。

{"name":"zhangsan","age":19,"gender":"boy"}
{"name":"lisi","age":20,"gender":"boy"}
{"name":"wangwu","age":21,"gender":"boy"}
{"name":"zhaoliu","age":22,"gender":"boy"}
{"name":"sunqi","age":23,"gender":"boy"}
{"name":"zhouba","age":24,"gender":"boy"}
{"name":"wujiu","age":25,"gender":"boy"}
{"name":"zhengshi","age":26,"gender":"boy"}

2.2 代码示例

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.api.java.UDF1
import org.apache.spark.sql.types.DataTypes
import java.util.*

class SparkSQL_UDF {
    fun f1() {
        val sparkSession = SparkSession.builder()
            .master("local")
            .appName("Kotlin Spark UDF")
            .orCreate

        // 读取JSON数据并创建视图
        sparkSession.read().json("data/user.txt")
            .createOrReplaceTempView("user")
        
        // 注册UDF函数,将名字的首字母大写
        sparkSession.udf().register("nameHeaderUpper",
            UDF1 { name: String ->
                name.substring(0, 1).uppercase(Locale.getDefault()) + name.substring(1)
            },
            DataTypes.StringType)

        // 使用注册的UDF函数进行SQL查询
        sparkSession.sql("select nameHeaderUpper(name) as name from user").show()

        sparkSession.stop()
    }

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            SparkSQL_UDF().f1()
        }
    }
}

/*输出结果
+---------------------+
|nameHeaderUpper(name)|
+---------------------+
|             Zhangsan|
|                 Lisi|
|               Wangwu|
|              Zhaoliu|
|                Sunqi|
|               Zhouba|
|                Wujiu|
|             Zhengshi|
+---------------------+
*/

2.3 代码解析

1. 创建SparkSession:

使用local模式进行测试

   val sparkSession = SparkSession.builder()
       .master("local")
       .appName("Kotlin Spark UDF")
       .orCreate
2. 读取数据并创建视图:

创建名为user的视图

   sparkSession.read().json("data/user.txt")
       .createOrReplaceTempView("user")
 3. 注册UDF函数:

使用kotlin的lambda表达式来简化UDF函数的创建 要注意这里是Java中的UDF1{}而不是Scala中的Function1{}

   sparkSession.udf().register("nameHeaderUpper",
       UDF1 { name: String ->
           name.substring(0, 1).uppercase(Locale.getDefault()) + name.substring(1)
       },
       DataTypes.StringType)
4. 执行SQL查询:

用sparkSession对象调用sql方法进行查询 并将结果展示到控制台

  sparkSession.sql("select nameHeaderUpper(name) as name from user").show()
5. 停止SparkSession:

释放资源
调用close和stop方法都可以

   sparkSession.stop()

3. UDAF的使用与实现

UDAF,即用户自定义聚合函数,允许用户定义复杂的聚合逻辑,如求平均值、总和等。

下面是一个简单案例来实现求年龄的平均值

3.1 代码示例

import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.functions
import java.io.Serializable

class SparkSQL_UDAF {
    fun f1() {
        val sparkSession = SparkSession.builder()
            .master("local")
            .appName("Kotlin Spark UDAF")
            .orCreate

        // 定义聚合器
        val agg = object : Aggregator<Long, Buffer, Long>(){
            override fun reduce(b: Buffer?, a: Long?): Buffer {
                val updatedBuffer = b ?: Buffer(0, 0)
                updatedBuffer.cnt++
                updatedBuffer.count += a!!
                return updatedBuffer
            }

            override fun outputEncoder(): Encoder<Long> {
                return Encoders.LONG()
            }

            override fun zero(): Buffer {
                return Buffer(0L, 0L)
            }

            override fun bufferEncoder(): Encoder<Buffer> {
                return Encoders.bean(Buffer::class.java)
            }

            override fun finish(reduction: Buffer?): Long {
                return reduction?.count?.div(reduction.cnt) ?: 0L
            }

            override fun merge(b1: Buffer?, b2: Buffer?): Buffer {
                return Buffer(
                    count = (b1?.count ?: 0) + (b2?.count ?: 0),
                    cnt = (b1?.cnt ?: 0) + (b2?.cnt ?: 0)
                )
            }
        }

        // 注册UDAF函数
        sparkSession.udf().register("avgAge", functions.udaf(agg, Encoders.LONG()))

        // 使用注册的UDAF函数进行SQL查询
        sparkSession.read().json("data/user.txt").createOrReplaceTempView("user")
        sparkSession.sql("select avgAge(age) as avg_age from user").show()

        sparkSession.stop()
    }

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            SparkSQL_UDAF().f1()
        }
    }
}

data class Buffer(var count: Long, var cnt: Long) : Serializable {
    constructor() : this(0, 0)
}

/*输出结果
+-----------+
|avgage(age)|
+-----------+
|         22|
+-----------+
*/

3.2 代码解析

1. Buffer数据类:

用来缓存聚合的中间过程的类 这里不能使用Scala中的二元组和kotlin中的Pair类 因为都不能修改其中的数据 而且需注意 需要有空参构造 (Kotlin中的data class 默认没有空参构造)
 

   data class Buffer(var count: Long, var cnt: Long) : Serializable {
       constructor() : this(0, 0)
   }
2. 定义UDAF函数:

为了看着清晰 创建了一个匿名内部类对象agg 继承自Aggregator 需要定义输入输出和缓存的泛型

实现其对应的方法

val agg = object : Aggregator<Long, Buffer, Long>(){
       override fun reduce(b: Buffer?, a: Long?): Buffer {
           val updatedBuffer = b ?: Buffer(0, 0)
           updatedBuffer.cnt++
           updatedBuffer.count += a!!
           return updatedBuffer
       }

       override fun outputEncoder(): Encoder<Long> {
           return Encoders.LONG()
       }

       override fun zero(): Buffer {
           return Buffer(0L, 0L)
       }

       override fun bufferEncoder(): Encoder<Buffer> {
           return Encoders.bean(Buffer::class.java)
       }

       override fun finish(reduction: Buffer?): Long {
           return reduction?.count?.div(reduction.cnt) ?: 0L
       }

       override fun merge(b1: Buffer?, b2: Buffer?): Buffer {
           return Buffer(
               count = (b1?.count ?: 0) + (b2?.count ?: 0),
               cnt = (b1?.cnt ?: 0) + (b2?.cnt ?: 0)
           )
       }
   }

其中 reduce是对输入的数据进行聚合(这里是累加)

outputEncoder和bufferEncoder是将输出和缓存的结果进行序列化

zero是对数据赋初值

merge是 多分区的数据进行合并时调用的方法 用于将缓存数据合并 (这里时Buffer类)

finish是输出最终结果

要十分注意在这些方法中的空安全处理(有时候kt的空安全挺烦人的) 不要轻易用非空断言

3. 注册UDAF函数:

使用functions.udaf()注册 要指明输出结果的类型

   sparkSession.udf().register("avgAge", functions.udaf(agg, Encoders.LONG()))
4. 执行SQL查询:

将读入的数据注册为视图 调用sparkSession的sql方法查询 并将结果在控制台打印输出

   sparkSession.read().json("data/user.txt").createOrReplaceTempView("user")
   sparkSession.sql("select avgAge(age) as avg_age from user").show()

5.释放资源:

调用stop过close方法释放资源

   sparkSession.stop()

4. 总结

在不适用spark kotlin api的情况下 用kotlin来写SparkSql基本是使用spark提供的Java api 因为kt与Scala不是完全兼容 所以要注意其中的一些高阶函数还有元组的使用和序列化等问题 

标签:name,Buffer,Kotlin,sql,UDAF,UDF,sparkSession,fun,spark
From: https://blog.csdn.net/qq_42531534/article/details/141526505

相关文章

  • 使用Cloudflare Worker搭建自己的AI绘画工具
    demo:https://aidraw.foxhank.top0.前言Cloudflare公司推出了workers-ai,可以免费在Cloudflare的全球网络上运行由无服务器GPU提供支持的机器学习模型。WorkersAI可以Cloudflare网络上使用自己的代码运行机器学习模型,也就是说,只要写一个js代码,就可以免费调用cloudfl......
  • Android开发语言Kotlin简介
    官方认可:自2017年Google正式宣布Kotlin成为Android开发的官方语言后,它在Android开发中的流行度就有了显著提升。与Java的兼容性:Kotlin在设计时就考虑到了与Java的互操作性,这让开发者能够在Android项目中轻松使用Kotlin,同时继续利用现有的Java代码和库。......
  • 记一次Kotlin Visibility Modifiers引发的问题
    概述测试环境爆出ERROR告警日志java.lang.IllegalStateException:Didn'tfindreportforspecifiedlanguage,登录测试环境ELK查到如下具体的报错堆栈日志:java.lang.IllegalStateException:Didn'tfindreportforspecifiedlanguageatcom.aba.report.service.biz.Assessme......
  • 免费域名注册教程:可托管cloudflare,无需双向解析、了解什么是域名
    前言在使用域名前,你需要知道域名的基本知识,如果你已经知道了基本的域名知识可以跳过这一段。什么是域名?通常情况下,你是通过IP地址加端口号来访问网站的。有了域名之后,你就可以通过域名来访问这个IP和端口,从而更加方便地访问你的网站。使用域名有什么好处?增强网站的SEO:一个......
  • 利用 Cloudflare workers 反代 github
    反代Github似乎会被认定为欺诈,严重的会封禁域名,不建议尝试首先绑定你的域名到cloudflare,然后创建一个Worker后写入以下代码并添加自定义域名//反代目标网站.constupstream='github.com';//反代目标网站的移动版.constupstream_mobile='github.com';//访问......
  • 利用 Cloudflare Pages 部署免费 Telegraph 图床
    TelegraphImage是一个可以作为Flickr和imgur替代品的免费图片托管服务。本教程将指导您如何通过CloudflarePages免费部署和使用TelegraphImage。项目地址https://github.com/cf-pages/Telegraph-Image部署步骤准备工作首先,确保您拥有一个Cloudflare账户,并已将......
  • 【待做】Mysql攻击之UDF提权
    一、前置知识1.1secure_file_privUDF是mysql的一个拓展接口,UDF(Userdefinedfunction)可翻译为用户自定义函数,这个是用来拓展Mysql的技术手段。这就意味着,我们可以通过udf为mysql添加任意功能,包括自定义sql函数,tcp开发,http请求,甚至直接调用系统命令;一、前置知识1.1......
  • Android笔试面试题AI答之Kotlin(6)
    文章目录24.以下代码执行的结果是什么?25.解释一下下述Kotlin代码有什么问题?26.如何在Kotlin中创建常量?示例注意事项总结27.Koltin可以互换使用IntArray和Kotlin中的Array吗?IntArrayArray<Int>互换使用从IntArray到Array<Int>从Array<Int>到IntArra......
  • kotlin 和 lombok编译冲突的解决办法
    kotlin和lombok编译冲突的解决办法源文档:kotlinlang.org/docs/lombok.html使用KotlinLombok编译插件解决<plugin><groupId>org.jetbrains.kotlin</groupId><artifactId>kotlin-maven-plugin</artifactId><version>${kotlin.version}......