首页 > 数据库 >(4)SparkSQL中如何定义UDF和使用UDF

(4)SparkSQL中如何定义UDF和使用UDF

时间:2022-10-03 22:34:35浏览次数:56  
标签:java 定义 api UDF SparkSQL org apache import spark

Spark SQL中用户自定义函数,用法和Spark SQL中的内置函数类似;是saprk SQL中内置函数无法满足要求,用户根据业务需求自定义的函数。

首先定义一个UDF函数:

package com.udf;

import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import scala.collection.mutable.WrappedArray;


/**
* Created by lj on 2022-07-25.
*/
public class TestUDF implements UDF1<String, String> {
@Override
public String call(String s) throws Exception {
return s+"_udf";
}
}

使用UDF函数:

package com.examples;

import com.pojo.WaterSensor;
import com.udf.TestUDF;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

/**
* Created by lj on 2022-07-25.
*/
public class SparkSql_Socket_UDF {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;

public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

//获得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));

/**
* 设置日志的级别: 避免日志重复
*/
ssc.sparkContext().setLogLevel("ERROR");

//从socket源获取数据
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);

JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;

public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(6), Durations.minutes(9)); //指定窗口大小 和 滑动频率 必须是批处理时间的整数倍

mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

spark.udf().register("TestUDF", new TestUDF(), DataTypes.StringType);

Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 创建临时表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select *,TestUDF(id) as udftest from log");
System.out.println("========= " + time + "=========");
//输出前20条数据
result.show();
}
});


//开始作业
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}

代码说明:

(4)SparkSQL中如何定义UDF和使用UDF_大数据

应用效果展示:

(4)SparkSQL中如何定义UDF和使用UDF_大数据_02

 



标签:java,定义,api,UDF,SparkSQL,org,apache,import,spark
From: https://blog.51cto.com/u_14465598/5730552

相关文章

  • 源码角度了解Skywalking之AbstractClassEnhancePluginDefine插件增强定义
    源码角度了解Skywalking之AbstractClassEnhancePluginDefine插件增强定义AbstractClassEnhancePluginDefine是所有插件的抽象类,我们在分析Skywalking初始化流程的时候见到......
  • java---回顾方法的定义和调用
    方法的回顾和调用packagecom.oop.demo;​importjava.io.IOError;importjava.io.IOException;​//return代表方法结束,返回一个结果//下方就是一个类publicclassDemo01......
  • 自定义异常
    自定义异常使用Java内置的异常类可以描述在编程时出现的大部分异常情况。除此之外,用户还可以自定义异常。用户自定义异常类,只需要继承Exception类即可。在程序中使用自......
  • Vue2 自定义属性
    概述vue中不不仅仅有官方提供的指令,用户还可以根据自己的需要进行自定义指令。比如当我们需要一个常用的操作将文字改为蓝色,如果我们需要修改大量标签时,就可以使用自定......
  • #yyds干货盘点# 软件测试的定义
    1973年,BillHetzel给出了软件测试的第一个定义:“软件测试就是为了程序能够按预期设想运行而建立足够的信心”。这个定义强调的是证实程序按预期运行,当软件测试这种技术手段......
  • spring boot 自定义线程池与使用
    一、进行线程池创建importcn.hutool.core.thread.ThreadFactoryBuilder;importlombok.extern.slf4j.Slf4j;importorg.springframework.aop.interceptor.AsyncUncaug......
  • Jmeter组件:参数化之用户定义的变量
    1、UserDefinedVariables:用户定义的变量,可以将请求路径设置为变量或者将参数值设置为变量等2、添加一个变量存储http请求的路径3、通过${变量名}取值......
  • 关于 js 函数定义方式
    函数声明式function(a,b){returna+b}特点:此种方式可定义命名的函数变量,而无需给变量赋值,这是一种独立的结构,不能嵌套在非功能模块中。函数名在自身作用域和父作用域内是......
  • 自定义注解
    转载:http://www.qizhentao.cn/article/15#menu_7https://www.bilibili.com/video/BV1Ma411u7SD?p=1&vd_source=46d50b5d646b50dcb2a208d3946b1598......
  • 写过自定义指令吗,原理是什么?
    背景看了一些自定义指令的文章,但是探究其原理的文章却不多见,所以我决定水一篇。如何自定义指令?其实关于这个问题官方文档上已经有了很好的示例的,我们先来温故一下。除......