产品经理提需求我不慌了,Doris自定义函数三剑客一把梭!
数据工程师小K盯着屏幕发愁。一个看似简单的数据分析需求,却因为复杂的业务规则让他焦头烂额。
“标准SQL函数写不出来,难道要改需求?”
就在这时,他想起了导师曾经提到的Doris"自定义函数",好比打开了一扇魔法之门,让他发现数据分析的世界远比想象中更加精彩。
今天,就让我们一起走进这个神奇的领域,看看如何用自定义函数解锁数据分析的无限可能。
打开数据分析的无限可能
数据分析工程师小张最近接到一个棘手的数据需求 - 计算用户的消费等级。这个看似简单的需求背后,涉及复杂的业务规则:不同时期的消费金额要有不同的权重,还要考虑用户的活跃度、信用评分等多个维度。传统的SQL函数难以满足这种复杂的业务逻辑。
"要是能自己写个函数就好了。"小张嘀咕着。事实上,Doris早就为这种场景准备了完美的解决方案 - 自定义函数。
看完上面这个生动的思维导图,相信大家对Doris的自定义函数已经有了初步印象。Doris支持三类自定义函数:UDF(标量函数)、UDAF(聚合函数)和UDTF(表函数,Doris 3.0 版本开始支持)。它们就像是数据分析的瑞士军刀,能够满足各种复杂的业务场景:
-
Java UDF 是较为常见的自定义标量函数 (Scalar Function),即每输入一行数据,就会有一行对应的结果输出,较为常见的有 ABS,LENGTH 等。值得一提的是对于用户来讲,Hive UDF 是可以直接迁移至 Doris 的。
-
Java UDAF 即为自定义的聚合函数 (Aggregate Function),即在输入多行数据进行聚合后,仅输出一行对应的结果,较为常见的有 MIN,MAX,COUNT 等。
-
JAVA UDTF 即为自定义的表函数 (Table Function),即每输一行数据,可以产生一行或多行的结果,在 Doris 中需要结合 Lateral View 使用可以达到行转列的效果,较为常见的有 EXPLODE,EXPLODE_SPLIT 等。
如上小张画了一张图,展示了Doris UDF的执行过程。当SQL查询包含自定义函数时,Doris会在BE节点的JVM实例中创建相应的函数容器来执行自定义逻辑。这种设计既保证了性能,又提供了足够的灵活性。
Doris自定义函数三剑客
UDF适合处理行级别的数据转换,比如清洗数据、格式转换等。小张最开始的用户等级计算就是典型的UDF场景。一个简单的Java UDF实现如下:
public class UserLevelUDF extends UDF {
public String evaluate(Double amount, Integer activity, Integer credit) {
if (amount == null || activity == null || credit == null) {
return "UNKNOWN";
}
double score = amount * 0.5 + activity * 0.3 + credit * 0.2;
if (score >= 90) return "DIAMOND";
if (score >= 80) return "GOLD";
if (score >= 70) return "SILVER";
return "BRONZE";
}
}
UDAF则专注于数据聚合。假设我们要计算中位数,传统的SQL就很难实现。使用UDAF,这个需求就变得简单了:
package org.apache.doris.udf.demo;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.logging.Logger;
/*UDAF 计算中位数*/
public class MedianUDAF {
Logger log = Logger.getLogger("MedianUDAF");
//状态存储
public static class State {
//返回结果的精度
int scale = 0;
//是否是某一个 tablet 下的某个聚合条件下的数据第一次执行 add 方法
boolean isFirst = true;
//数据存储
public StringBuilder stringBuilder;
}
//状态初始化
public State create() {
State state = new State();
//根据每个 tablet 下的聚合条件需要聚合的数据量大小,预先初始化,增加性能
state.stringBuilder = new StringBuilder(1000);
return state;
}
//处理执行单位处理各自 tablet 下的各自聚合条件下的每个数据
public void add(State state, Double val, int scale) throws IOException {
if (val != null && state.isFirst) {
state.stringBuilder.append(scale).append(",").append(val).append(",");
state.isFirst = false;
} else if (val != null) {
state.stringBuilder.append(val).append(",");
}
}
//处理数据完需要输出等待聚合
public void serialize(State state, DataOutputStream out) throws IOException {
//目前暂时只提供 DataOutputStream,如果需要序列化对象可以考虑拼接字符串,转换 json,序列化成字节数组等方式
//如果要序列化 State 对象,可能需要自己将 State 内部类实现序列化接口
//最终都是要通过 DataOutputStream 传输
out.writeUTF(state.stringBuilder.toString());
}
//获取处理数据执行单位输出的数据
public void deserialize(State state, DataInputStream in) throws IOException {
String string = in.readUTF();
state.scale = Integer.parseInt(String.valueOf(string.charAt(0)));
StringBuilder stringBuilder = new StringBuilder(string.substring(2));
state.stringBuilder = stringBuilder;
}
//聚合执行单位按照聚合条件合并某一个键下数据的处理结果 ,每个键第一次合并时,state1 参数是初始化的实例
public void merge(State state1, State state2) throws IOException {
state1.scale = state2.scale;
state1.stringBuilder.append(state2.stringBuilder.toString());
}
//对每个键合并后的数据进行并输出最终结果
public Double getValue(State state) throws IOException {
String[] strings = state.stringBuilder.toString().split(",");
double[] doubles = new double[strings.length + 1];
doubles = Arrays.stream(strings).mapToDouble(Double::parseDouble).toArray();
Arrays.sort(doubles);
double n = doubles.length - 1;
double index = n * 0.5;
int low = (int) Math.floor(index);
int high = (int) Math.ceil(index);
double value = low == high ? (doubles[low] + doubles[high]) * 0.5 : doubles[high];
BigDecimal decimal = new BigDecimal(value);
return decimal.setScale(state.scale, BigDecimal.ROUND_HALF_UP).doubleValue();
}
//每个执行单位执行完都会执行
public void destroy(State state) {
}
}
UDTF即每输一行数据,可以产生一行或多行的结果。它和 UDF 函数一样,需要用户自主实现一个 evaluate 方法,但是 UDTF 函数的返回值必须是 Array 类型:
public class UDTFStringTest {
public ArrayList<String> evaluate(String value, String separator) {
if (value == null || separator == null) {
return null;
} else {
return new ArrayList<>(Arrays.asList(value.split(separator)));
}
}
}
性能优化与最佳实践
系统迁移是UDF的一大应用场景。许多企业在迁移到Doris时,会发现原系统中有些特殊的函数在Doris中并不存在。这时,我们可以创建同名的自定义函数,让业务代码无缝迁移。
数据分析场景则是UDF最闪耀的舞台。从简单的数据清洗到复杂的机器学习预测,UDF都能胜任。有了自定义函数,数据分析师就像拥有了自己的魔法工具箱,能够随心所欲地处理数据。
在UDF开发中,性能是个关键问题。通过在UDF中使用Static变量,我们可以有效减少资源消耗。比如加载一个大型字典文件:
public class DictUDF {
private static Map<String, String> dict = new HashMap<>();
static {
// 只加载一次字典文件
loadDict();
}
public String evaluate(String key) {
return dict.getOrDefault(key, "NOT_FOUND");
}
}
这种静态加载的方式,让每个查询实例都能共享同一份数据,大大提升了性能。
正如厨师需要各种厨具才能烹饪美味佳肴,数据分析师也需要强大的工具来应对各种数据处理需求。Doris的自定义函数就是这样一套"数据厨具",让我们能够灵活应对各种数据分析场景。
下期,我们将一起探讨其它更有趣有用有价值的内容,敬请期待!
标签:函数,自定义,state,UDF,public,Doris,三剑客 From: https://blog.csdn.net/yzData/article/details/144310559