flink udaf demo
之前一个小伙伴留言说想看 TableAggregateFunction 的例子吗?以及自定义函数如何使用sql的方式调用?
Flink SQL 我都是用开发的 sqlSubmit 工具做的提交,很多时候会忽略 flink sql client 方式,所以这里写了个简单的 udaf,并演示在 sqlSubmit 和 sql client 中使用的该 udaf。
udaf 定义
自定义聚合函数(UDAGG)是把一个表(一行或者多行,每行可以有一列或者多列)聚合成一个标量值。
上面的图片展示了一个聚合的例子。假设你有一个关于饮料的表。表里面有三个字段,分别是 id、name、price,表里有 5 行数据。假设你需要找到所有饮料里最贵的饮料的价格,即执行一个 max() 聚合。你需要遍历所有 5 行数据,而结果就只有一个数值。
自定义聚合函数是通过扩展 AggregateFunction 来实现的。AggregateFunction 的工作过程如下。首先,它需要一个 accumulator,它是一个数据结构,存储了聚合的中间结果。通过调用 AggregateFunction 的 createAccumulator() 方法创建一个空的 accumulator。接下来,对于每一行数据,会调用 accumulate() 方法来更新 accumulator。当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果。
- 以上内容来自官网 聚合函数
udaf 求中位数
简单写了个计算中位数的 udaf
定义 累加器
/**
* 累加器 存储了聚合的中间结果
*/
public class NumberAcc {
public List<Double> list = new ArrayList<>();
}
定义 聚合函数
/**
* agg function: 计算中位数
*/
public class Median extends AggregateFunction<Double, NumberAcc> {
// 获取 acc 的值
@Override
public Double getValue(NumberAcc acc) {
// sort list
List<Double> list = acc.list.stream().sorted().collect(Collectors.toList());
// if list is empty, return null
if (list.size() == 0) {
return null;
} else if (list.size() == 1) {
// if list have one element, return it
return list.get(0);
}
double val;
int size = list.size();
int half = size / 2;
if (size % 2 == 0) {
//even, use (size/2 - 1 + size/2) / 2
val = (list.get(half - 1) + list.get(half)) / 2;
} else {
// odd, use size/2
val = list.get(half);
}
return val;
}
// 累加元素
public void accumulate(NumberAcc acc, Double d) {
acc.list.add(d);
}
// 创建累加器
@Override
public NumberAcc createAccumulator() {
return new NumberAcc();
}
// 窗口聚合
public void merge(NumberAcc acc, Iterable<NumberAcc> it) {
for (NumberAcc a : it) {
acc.list.addAll(a.list);
}
}
}
udaf 定义完成
测试 sql
简单的聚合函数样例:读取 kafka 数据,以 item_id 分组,计算 price 字段的中位数,SQL如下:
-- kafka source
drop table if exists user_log;
CREATE TABLE user_log (
user_id VARCHAR
,item_id VARCHAR
,category_id VARCHAR
,behavior VARCHAR
,price double
,proc_time as PROCTIME()
,ts TIMESTAMP(3)
,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka'
,'topic' = 'user_log'
,'properties.bootstrap.servers' = 'localhost:9092'
,'properties.group.id' = 'user_log'
,'scan.startup.mode' = 'latest-offset'
,'format' = 'json'
);
create table user_log_sink(
item_id string
,median_price double
)WITH(
'connector' = 'print'
);
insert into user_log_sink
select item_id, udaf_median(cast(price as double)) median_price
from user_log
group by item_id;
sqlSubmit 中使用
注册 udf
sqlSubmit 是我开发的 flink sql 提交工具,里面有 RegisterUdf 类创建 udf,也很简单,就是调用 StreamTableEnvironment.createTemporarySystemFunction 注册 udf
如下:
object RegisterUdf {
def registerUdf(tabEnv: StreamTableEnvironment, paraTool: ParameterTool) = {
// udf
tabEnv.createTemporarySystemFunction("udf_decode", new Decode)
tabEnv.createTemporarySystemFunction("udf_date_add", new DateAdd)
// udtf
tabEnv.createTemporarySystemFunction("udf_split", new SplitFunction)
tabEnv.createTemporarySystemFunction("udf_parse_json", new ParseJson)
tabEnv.createTemporarySystemFunction("udf_timer", new UdtfTimer(1000))
// 可以基于配置动态生成UDF
// join hbase table, first qualify is join key
tabEnv.createTemporarySystemFunction("udf_join_hbase_non_rowkey_no_cache", new JoinHbaseNonRowkeyNoCache("cf", "c1,c2,c3,c4,c5,c6,c7,c8,c9,c10"))
tabEnv.createTemporarySystemFunction("udf_join_hbase_non_rowkey_cache", new JoinHbaseNonRowkeyCache("cf", "c1,c2,c3,c4,c5,c6,c7,c8,c9,c10", 600, 10000))
// udaf
tabEnv.createTemporarySystemFunction("udaf_uv_count", classOf[BloomFilter]);
tabEnv.createTemporarySystemFunction("udaf_redis_uv_count", new RedisUv(paraTool.get(Constant.REDIS_URL), "user_log_uv"));
// env.createTemporarySystemFunction("udaf_redis_uv_count", new JedisRedisUv("localhost", 6379));
tabEnv.createTemporarySystemFunction("udaf_median", classOf[Median]);
}
}
使用 udf
直接在 sqlSubmit 主类中传入参数 "--sql sql 文件",执行即可
往 topic 写入测试数据:
{"user_id":"4653250000","item_id":"1010017","price":1,"ts":"2022-10-17 10:30:34.743"}
{"user_id":"4653250000","item_id":"1010017","price":5,"ts":"2022-10-17 10:30:34.743"}
{"user_id":"4653250000","item_id":"1010017","price":3,"ts":"2022-10-17 10:30:34.743"}
{"user_id":"4653250000","item_id":"1010017","price":2,"ts":"2022-10-17 10:30:34.743"}
{"user_id":"4653250000","item_id":"1010017","price":7,"ts":"2022-10-17 10:30:34.743"}
输出结果:
+I[1010017, 1.0] # 元素: 1
-U[1010017, 1.0]
+U[1010017, 3.0] # 元素: 1,5
-U[1010017, 3.0]
+U[1010017, 3.0] # 元素: 1,5,3
-U[1010017, 3.0]
+U[1010017, 2.5] # 元素: 1,5,3,2
-U[1010017, 2.5]
+U[1010017, 3.0] # 元素: 1,5,3,2,7
sql 客户端中使用
- flink 1.15.1
注册 udf:
启动 yarn session
./yarn-session.sh -d -nm sql -jm 1g -tm 1g
启动 sql-client,创建 udf,执行 sql
启动 sql client,用参数 “-j ~/git/sqlSubmit/target/original-sqlSubmit-5.0.jar” 指定包含 udf 的 jar 包
venn@venn bin % sql-client.sh embedded -s application_1665975844329_0003 -j ~/git/sqlSubmit/target/original-sqlSubmit-5.0.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink-1.15.1/lib/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/hadoop-3.2.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2022-10-17 11:26:15,380 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /var/folders/yc/_5xfqjbx1zlbj7050lkhf2y80000gn/T/.yarn-properties-venn.
2022-10-17 11:26:15,380 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /var/folders/yc/_5xfqjbx1zlbj7050lkhf2y80000gn/T/.yarn-properties-venn.
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Command history file path: /Users/venn/.flink-sql-history
Flink SQL> CREATE FUNCTION udaf_median as 'com.rookie.submit.udaf.math.Median';
>
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE user_log (
> user_id VARCHAR
> ,item_id VARCHAR
> ,category_id VARCHAR
> ,behavior VARCHAR
> ,price double
> ,proc_time as PROCTIME()
> ,ts TIMESTAMP(3)
> ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka'
> ,'topic' = 'user_log'
> ,'properties.bootstrap.servers' = 'localhost:9092'
> ,'properties.group.id' = 'user_log'
> ,'scan.startup.mode' = 'latest-offset'
> ,'format' = 'json'
> );
[INFO] Execute statement succeed.
Flink SQL> create table user_log_sink(
> item_id string
> ,median_price double
> )WITH(
> 'connector' = 'print'
> );
[INFO] Execute statement succeed.
Flink SQL> insert into user_log_sink
> select item_id, udaf_median(cast(price as double)) median_price
> from user_log
> group by item_id;
[INFO] Submitting SQL update statement to the cluster...
2022-10-17 11:27:05,766 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/opt/flink-1.15.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2022-10-17 11:27:06,062 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at /0.0.0.0:8032
2022-10-17 11:27:06,263 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2022-10-17 11:27:06,268 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2022-10-17 11:27:06,387 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface venn:62000 of application 'application_1665975844329_0003'.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7d3a1766b6f6d3b4059ffb76f2cc244e
查看任务 web ui:
- 创建 udf 语句: CREATE FUNCTION udaf_median as 'com.rookie.submit.udaf.math.Median';
输出
参考文档
- sqlSubmit: https://github.com/springMoon/sqlSubmit
- flink 官网 聚合函数: https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/functions/udfs/#聚合函数
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文