首页 > 其他分享 >spark如何自定义函数

spark如何自定义函数

时间:2024-12-15 10:29:52浏览次数:11  
标签:__ 函数 自定义 environ sql msg spark os

UDF:一对一的函数【User Defined Functions】

  • substr、split、concat、instr、length、from_unixtime

UDAF:多对一的函数【User Defined Aggregation Functions】 聚合函数

  • count、sum、max、min、avg、collect_set/list

UDTF:一对多的函数【User Defined Tabular Functions】

  • explode、json_tuple【解析JSON格式】、parse_url_tuple【解析URL函数

Spark中支持UDF和UDAF两种,支持直接使用Hive中的UDF、UDAF、UDTF. pyspark中自定义函数的三种写法:

register 

 

import os

from pyspark.sql import SparkSession


def func(line):
    list1 = line.split("/")
    return str(list1[0]) + "斤/" + str(list1[1]) + "cm"


if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'E:/java-configuration/jdk-8'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'E:/applications/bigdata_config/hadoop-3.3.1/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/35741/miniconda3/python.exe'

    spark = SparkSession.builder \
        .master("local[2]") \
        .appName("第一个sparksql案例") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()
    df = spark.read.format("csv").option("sep", "\t").option("header", True) \
        .load("../../resources/input/a.txt")
        
    df.createOrReplaceTempView("a")
    spark.sql("""
    select id,name,concat(split(msg,"/")[0],"斤/",split(msg,"/")[1],"cm") msg from a
    """).show()
    
    # 将编写好的python方法变成spark的sql方法 
    spark.udf.register("sqlfunc", func)

    spark.sql("""
        select id,name,sqlfunc(msg) msg from a
        """).show()

    spark.stop()

udf注册方式定义UDF函数

不常用,只能用于DSL开发中

定义:

UDF变量名 = F.udf(函数的处理逻辑, 返回值类型)
import os

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

"""
------------------------------------------
  Description : TODO:
  SourceFile : _04自定义函数
  Author  : 老闫
  Date  : 2024/11/6 星期三
-------------------------------------------
"""
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    spark = SparkSession.builder.master("local[2]").appName("").config(
        "spark.sql.shuffle.partitions", 2).getOrCreate()

    df = spark.read.format("csv").option("sep","\t").option("header","true").load("../../datas/function/a.txt")
    df.createOrReplaceTempView("a")

    # 第二种方案:使用自定义函数
    def myfunc(msg):
        return msg.split("/")[0]+"斤/"+msg.split("/")[1]+"cm"


    myfunc2= F.udf(myfunc, returnType=StringType())

    # 假如自定义函数使用了register 的方式,不仅sql可以使用,DSL语法也可以使用
    df.select(F.col("id"),F.col("name"),myfunc2(F.col("msg")).alias("msgg")).show()

    spark.stop()

 

标签:__,函数,自定义,environ,sql,msg,spark,os
From: https://blog.csdn.net/weixin_52642840/article/details/144478772

相关文章

  • spark将数据输出到hive或mysql中
    hive启动以下服务:start-dfs.shstart-yarn.shmapred--daemonstarthistoryserver/opt/installs/spark/sbin/start-history-server.shhive-server-manager.shstartmetastoreimportosfrompyspark.sqlimportSparkSession"""-----------------------......
  • 【Linux】poll函数
    poll和select的区别不大,主要是poll没有连接数限制,因为它用的链表实现#include<poll.h>intpoll(structpollfd*fds,nfds_tnfds,inttimeout);structpollfd{intfd;//要监控的文件描述符,如果fd为-1,表示内核不再监控shortevents;//......
  • 在Less中有哪些常用的函数?
    在Less中,存在许多实用的函数来帮助开发者更高效地编写和维护CSS代码。以下是一些常用的Less函数:字符串函数escape(@string):通过URL-encoding编码字符串。e(@string):对字符串进行转义处理。%(@string,values...):格式化字符串。replace('content','要进行替换的值',替换值):替......
  • 实现一个批量请求函数 multiRequest(urls, maxNum)
    在前端开发中,处理多个异步请求的一种常见需求是批量请求,并限制并发请求的数量以避免对服务器造成过大压力或浏览器资源耗尽。你可以使用Promise.all、Array.prototype.map和Array.prototype.reduce等方法来实现一个批量请求函数multiRequest,该函数接受一个URL数组和一个最......
  • 在非函数内写return语句,会有什么问题?
    在前端开发或任何编程语言中,return语句主要用于从函数中返回一个值或提前退出函数。如果你在非函数内(例如在全局作用域或代码块中)使用return语句,会导致语法错误或逻辑问题。以下是一些关键点:语法错误:在大多数编程语言中,包括JavaScript,return语句只能在函数体内使用。如果......
  • 考研数学二 2011-2024年 真题积累总结【多元函数与微分方程篇】_多元函数二阶导数_非
    文章目录多元函数1.多元函数二阶导数问题:f^''^~xy~(0,0)与f^''^~yx~(0,0)的计算(是否存在)2.多元函数非条件极值问题3.多元函数基础经典题已知对x的偏导数和对y的偏导数,求f(x,y)微分方程1.利用已知条件,构造微分方程,求y(x)的表达式2.给出关于f(x)的两个微分方程,求这个f......
  • 写一个方法记录函数运行的时间
    在前端开发中,记录一个函数运行的时间是一个常见的需求,通常用于性能调优和调试。你可以使用JavaScript提供的Date对象或者performanceAPI来实现这一功能。下面是两种方法的示例:方法一:使用Date对象Date对象可以获取当前的时间戳,通过计算函数执行前后的时间差,可以得到函......
  • 使用任务队列TaskQueue和线程池ThreadPool技术实现自定义定时任务框架详解
    前言在桌面软件开发中,定时任务是一个常见的需求,比如定时清理日志、发送提醒邮件或执行数据备份等操作。在C#中有一个非常著名的定时任务处理库Hangfire,不过在我们深入了解Hangfire之前,我们可以手动开发一个定时任务案例,用以帮助我们理解Hangfire的核心原理。我们可以利用......
  • QT 定义全局变量、通过函数初始化变量
    1.头文件中定义全局变量#ifndefZ3_GVARS_H#defineZ3_GVARS_H#include<QString> classZ3_GVARS{ public: staticQStringJSON_FILE_NAME; staticQStringSERVER_IP; staticintSERVER_PORT; staticvoidinitConfig();};#endif//!Z3_GVARS_H 2.在cpp......
  • 【条件随机场的学习算法】模型的对数似然函数 公式解析
    本文是将文章【条件随机场的学习算法】中的公式单独拿出来做一个详细的解析,便于初学者更好的理解。公式:L(w)......