TDengine 3.0.4.0 发布了一个重要特性:支持用 Python 语言编写的自定义函数(UDF)。这个特性极大节省了 UDF 开发的时间成本。作为时序大数据处理平台,不支持 Python UDF 显然是不完整的。UDF 在实现自己业务中特有的逻辑时非常有用,比如量化交易场景计算自研的交易信号。本文内容由浅入深包括 4 个示例程序:
1
定义一个只接收一个整数的标量函数:输入 n, 输出 ln(n^2 + 1)。
2
定义一个接收 n 个整数的标量函数, 输入 (x1, x2, ..., xn), 输出每个值和它们的序号的乘积的和:x1 + 2 * x2 + ... + n * xn。
3
定义一个标量函数,输入一个时间戳,输出距离这个时间最近的下一个周日。完成这个函数要用到第三方库 moment。我们在这个示例中讲解使用第三方库的注意事项。
4
定义一个聚合函数,计算某一列最大值和最小值的差, 也就是实现 TDengien 内置的 spread 函数。
同时也包含大量实用的 debug 技巧。
本文假设你用的是 Linux 系统,且已安装好了 TDengine 3.0.4.0+ 和 Python 3.x。
示例一
最简单的 UDF
编写一个只接收一个整数的 UDF 函数:输入 n, 输出 ln(n^2 + 1)。
首先编写一个 Python 文件,存在系统某个目录,比如 /root/udf/myfun.py 内容如下:
from math import log
def init():
pass
def destroy():
pass
def process(block):
rows, _ = block.shape()
return [log(block.data(i, 0) ** 2 + 1) for i in range(rows)]
这个文件包含 3 个函数, init 和 destroy 都是空函数,它们是 UDF 的生命周期函数,即使什么都不做也要定义。最关键的是 process 函数, 它接受一个数据块,这个数据块对象有两个方法:
- shape() 返回数据块的行数和列数
- data(i, j) 返回 i 行 j 列的数据
标量函数的 process 方法传入的数据块有多少行,就需要返回多少个数据。上述代码中我们忽略的列数,因为我们只想对每行的第一个数做计算。
接下来我们在时序数据库(Time Series Database) TDengine 中创建对应的 UDF 函数,执行下面语句:
create function myfun as '/root/udf/myfun.py' outputtype double language 'Python'
taos> create function myfun as '/root/udf/myfun.py' outputtype double language 'Python';
Create OK, 0 row(s) affected (0.005202s)
看起来很顺利,接下来 show 一下系统中所有的自定义函数,确认创建成功:
taos> show functions;
name |
=================================
myfun |
Query OK, 1 row(s) in set (0.005767s)
接下来就来测试一下这个函数,测试之前先执行下面的 SQL 命令,制造些测试数据:
create database test;
create table t(ts timestamp, v1 int, v2 int, v3 int);
insert into t values('2023-05-01 12:13:14', 1, 2, 3);
insert into t values('2023-05-03 08:09:10', 2, 3, 4);
insert into t values('2023-05-10 07:06:05', 3, 4, 5);
测试 myfun 函数:
taos> select myfun(v1, v2) from t;
DB error: udf function execution failure (0.011088s)
不幸的是执行失败了,什么原因呢?
查看 udfd 进程的日志: /var/log/taos/udfd.log 发现以下错误信息:
05/24 22:46:28.733545 01665799 UDF ERROR can not load library libtaospyudf.so. error: operation not permitted
05/24 22:46:28.733561 01665799 UDF ERROR can not load python plugin. lib path libtaospyudf.so
错误很明确:没有加载到 Python 插件 libtaospyudf.so, 看官方文档原来是要先安装 taospyudf 这个 Python 包。于是:
pip3 install taospyudf
安装过程会编译 C++ 源码,因此系统上要有 cmake 和 gcc。编译生成的 libtaospyudf.so 文件自动会被复制到 /usr/local/lib/ 目录,因此如果是非 root 用户,安装时需加 sudo。安装完可以检查这个目录是否有了这个文件:
root@slave11 ~/udf $ ls -l /usr/local/lib/libtaos*
-rw-r--r-- 1 root root 671344 May 24 22:54 /usr/local/lib/libtaospyudf.so
这时再去执行 SQL 测试 UDF,会发现报同样的错误,原因是新安装的共享库还未生效,还需执行命令:
ldconfig
此时再去测试 UDF,终于成功了:
taos> select myfun(v1) from t;
myfun(v1) |
============================
0.693147181 |
1.609437912 |
2.302585093 |
至此,我们完成了第一个 UDF
标签:4.0,TDengine,05,Python,number,UDF,log,def,函数 From: https://blog.51cto.com/tdengine/6412785