自定义函数(UDF)开发
Contents
自定义函数(UDF)开发#
背景#
虽然OpenMLDB内置了上百个函数,以供数据科学家作数据分析和特征抽取。但是在某些场景下还是不能很好的满足要求,为了便于用户快速灵活实现特定的特征计算需求,我们支持了基于 C++ 的用户自定义函数(UDF)开发,以及动态用户自定义函数库的加载。
See also
用户也可以使用内置函数开发的方式扩展 OpenMLDB 的计算函数库。但是内置函数开发需要修改源代码和重新编译。如果用户希望贡献扩展函数到 OpenMLDB 代码库,那么可以参考内置函数的开发文档。
开发步骤#
开发自定义函数#
C++函数名规范#
C++内置函数名统一使用snake_case风格
要求函数名能清晰表达函数功能
函数不能重名。函数名不能和内置函数及其他自定义函数重名。所有内置函数的列表参考这里
C++类型与SQL类型对应关系#
内置C++函数的参数类型限定为:BOOL类型,数值类型,时间戳日期类型和字符串类型。C++类型SQL类型对应关系如下:
SQL类型 |
C/C++ 类型 |
---|---|
BOOL |
|
SMALLINT |
|
INT |
|
BIGINT |
|
FLOAT |
|
DOUBLE |
|
STRING |
|
TIMESTAMP |
|
DATE |
|
函数参数和返回值#
返回值:
如果udf输出类型是基本类型,并且
return_nullable
设置为false, 则通过函数返回值返回如果udf输出类型是基本类型,并且
return_nullable
设置为true, 则通过函数参数返回如果udf输出类型是string, timestamp, date, 通过函数最后一个参数返回
参数:
如果输入字段是基本类型,通过值传递
如果输入字段是string, timestamp, date, 通过指针传递
函数的第一个参数必须是UDFContext* ctx,不能更改。UDFContext的定义如下:
struct UDFContext { ByteMemoryPool* pool; // 用来分配内存 void* ptr; // 开发聚合函数时用来存储临时变量 };
如果参数声明为nullable的,那么所有参数都是nullable的,每一个输入参数其后需要添加bool参数(通常命名为is_null),其顺序为
arg1, arg1_is_null, arg2, arg2_is_null, ...
。不可以随意调整参数顺序。如果返回值声明为nullable的,那么通过参数来返回,并且添加bool参数(通常命名为is_null)来表示返回值是否为null
例如,函数sum有俩个参数,如果参数和返回值设置为nullable的话,单行函数原型如下:
extern "C"
void sum(::openmldb::base::UDFContext* ctx, int64_t input1, bool input1_is_null, int64_t input2, bool input2_is_null, int64_t* output, bool* is_null) {
函数声明:
函数必须用extern “C”来声明
内存管理#
在单行函数中,不允许使用
new
和malloc
给输入和输出参数开辟空间。函数内部可以使用new
和malloc
申请临时空间, 申请的空间在函数返回前需要释放掉。在聚合函数中,在init函数中可以使用
new
/malloc
开辟空间,但是必须在output函数中释放。最后的返回值如果是string需要保存在mempool开辟的空间中若需要动态开辟空间,可以使用OpenMLDB提供的内存管理接口。函数执行完OpenMLDB会自动释放内存.
char *buffer = ctx->pool->Alloc(size);
一次分配空间的最大长度不能超过2M字节
单行函数开发#
单行函数(scalar function)对单行数据进行处理,返回单个值,比如 abs
, sin
, cos
, date
, year
等。
具体开发步骤为:
包含头文件udf/openmldb_udf.h
实现自定义函数逻辑
#include "udf/openmldb_udf.h" // 必须包含此头文件
// 实现一个udf,截取字符串的前两个字符
extern "C"
void cut2(::openmldb::base::UDFContext* ctx, ::openmldb::base::StringRef* input, ::openmldb::base::StringRef* output) {
if (input == nullptr || output == nullptr) {
return;
}
uint32_t size = input->size_ <= 2 ? input->size_ : 2;
// 在udf函数中申请内存空间需要用ctx->pool
char *buffer = ctx->pool->Alloc(size);
memcpy(buffer, input->data_, size);
output->size_ = size;
output->data_ = buffer;
}
因为返回值是string类型,所以此处需要通过函数最后一个参数返回。如果返回值是基本类型,通过函数返回值返回,可参考test_udf.cc中的strlength。
聚合函数开发#
聚合函数(aggregate function)对一个数据集(比如一列数据)执行计算,返回单个值,比如 sum
, avg
, max
, min
, count
等。
具体开发步骤为:
包含头文件udf/openmldb_udf.h
实现自定义函数逻辑
开发一个聚合函数需要实现如下三个C++方法:
init函数。 在init函数中做一些初始化工作,如开辟中间变量的空间等。函数命名格式为:“聚合函数名_init”。
update函数。对每一行响应字段的处理逻辑在update函数中。函数命名格式为:“聚合函数名_update”。
output函数。处理最后的聚合值,并返回结果。函数命名格式为:“聚合函数名_output”。
注:在init函数和update函数中需要把UDFContext*
作为返回值返回
#include "udf/openmldb_udf.h" // 必须包含此头文件
// 实现名称为special_sum的聚合函数
extern "C"
::openmldb::base::UDFContext* special_sum_init(::openmldb::base::UDFContext* ctx) {
// 开辟中间变量空间,并赋值給UDFContext中的ptr
ctx->ptr = ctx->pool->Alloc(sizeof(int64_t));
// 初始化中间变量的值
*(reinterpret_cast<int64_t*>(ctx->ptr)) = 10;
// 返回UDFContext指针,不能省略
return ctx;
}
extern "C"
::openmldb::base::UDFContext* special_sum_update(::openmldb::base::UDFContext* ctx, int64_t input) {
// 从UDFContext中的ptr取出中间变量并更新
int64_t cur = *(reinterpret_cast<int64_t*>(ctx->ptr));
cur += input;
*(reinterpret_cast<int*>(ctx->ptr)) = cur;
// 返回UDFContext指针,不能省略
return ctx;
}
// 在output函数中处理聚合计算结果,并返回
extern "C"
int64_t special_sum_output(::openmldb::base::UDFContext* ctx) {
return *(reinterpret_cast<int64_t*>(ctx->ptr)) + 5;
}
// Get the third non-null value of all values
extern "C"
::openmldb::base::UDFContext* third_init(::openmldb::base::UDFContext* ctx) {
ctx->ptr = reinterpret_cast<void*>(new std::vector<int64_t>());
return ctx;
}
extern "C"
::openmldb::base::UDFContext* third_update(::openmldb::base::UDFContext* ctx, int64_t input, bool is_null) {
auto vec = reinterpret_cast<std::vector<int64_t>*>(ctx->ptr);
if (!is_null && vec->size() < 3) {
vec->push_back(input);
}
return ctx;
}
extern "C"
void third_output(::openmldb::base::UDFContext* ctx, int64_t* output, bool* is_null) {
auto vec = reinterpret_cast<std::vector<int64_t>*>(ctx->ptr);
if (vec->size() != 3) {
*is_null = true;
} else {
*is_null = false;
*output = vec->at(2);
}
// free the memory allocated in init function with new/malloc
delete vec;
}
// Get the first non-null value >= threshold
extern "C"
::openmldb::base::UDFContext* first_ge_init(::openmldb::base::UDFContext* ctx) {
// threshold init in update
// threshold, thresh_flag, first_ge, first_ge_flag
ctx->ptr = reinterpret_cast<void*>(new std::vector<int64_t>(4, 0));
return ctx;
}
extern "C"
::openmldb::base::UDFContext* first_ge_update(::openmldb::base::UDFContext* ctx, int64_t input, bool is_null, int64_t threshold, bool threshold_is_null) {
auto pair = reinterpret_cast<std::vector<int64_t>*>(ctx->ptr);
if (!threshold_is_null && pair->at(1) == 0) {
pair->at(0) = threshold;
pair->at(1) = 1;
}
if (!is_null && pair->at(3) == 0 && input >= pair->at(0)) {
pair->at(2) = input;
pair->at(3) = 1;
}
return ctx;
}
extern "C"
void first_ge_output(::openmldb::base::UDFContext* ctx, int64_t* output, bool* is_null) {
auto pair = reinterpret_cast<std::vector<int64_t>*>(ctx->ptr);
// threshold is null or no value >= threshold
if (pair->at(1) == 0 || pair->at(3) == 0) {
*is_null = true;
} else {
*is_null = false;
*output = pair->at(2);
}
// *is_null = true;
// free the memory allocated in init function with new/malloc
delete pair;
}
如上所示,聚合函数init函数仅单参数,无论是几个参数的聚合函数,init中都只有一个参数UDFContext。update函数参数个数和类型,与聚合函数的参数个数和类型一致。同样的,如果想要聚合函数nullable,每个参数都需要添加一个bool参数,表示该参数是否为null。output函数只会有一个输出参数或返回值,nullable同理。更多udf/udaf实现参考这里。
编译动态库#
拷贝include目录
https://github.com/4paradigm/OpenMLDB/tree/main/include
到某个路径下,下一步编译会用到。如/work/OpenMLDB/执行编译命令,其中 -I 指定inlcude目录的路径 -o 指定产出动态库的名称
g++ -shared -o libtest_udf.so examples/test_udf.cc -I /work/OpenMLDB/include -std=c++17 -fPIC
拷贝动态库#
编译过的动态库需要被拷贝到 TaskManager 和 tablets中。如果 TaskManager 和 tablets中不存在udf
目录,请先创建并重启这些进程(保证环境变量生效)。
tablet的UDF目录是
path_to_tablet/udf
。TaskManager的UDF目录是
path_to_taskmanager/taskmanager/bin/udf
。
例如,假设tablet和TaskManager的部署路径都是 /work/openmldb
,其目录结构将如下所示:
/work/openmldb/
├── bin
├── conf
├── taskmanager
│ ├── bin
│ │ ├── taskmanager.sh
│ │ └── udf
│ │ └── libtest_udf.so
│ ├── conf
│ └── lib
├── tools
└── udf
└── libtest_udf.so
Note
对于有多个tablet场景,每个tablet都需要拷贝动态库。
在执行’ DROP FUNCTION ‘之前请勿删除动态库。
注册、删除和查看函数#
注册函数使用CREATE FUNCTION
注册单行函数,cut2函数将字符串的前两个字符返回:
CREATE FUNCTION cut2(x STRING) RETURNS STRING OPTIONS (FILE='libtest_udf.so');
注册聚合函数,special_sum函数初始为10,再将输入的值累加,并且最后加上5返回(演示函数,无特殊意义):
CREATE AGGREGATE FUNCTION special_sum(x BIGINT) RETURNS BIGINT OPTIONS (FILE='libtest_udf.so');
注册聚合函数,并且输入参数和返回值都支持null,third函数返回第三个非null的值,如果非null的值不足3个,返回null:
CREATE AGGREGATE FUNCTION third(x BIGINT) RETURNS BIGINT OPTIONS (FILE='libtest_udf.so', ARG_NULLABLE=true, RETURN_NULLABLE=true);
注:
参数类型和返回值类型必须和代码的实现保持一致
一个udf函数只能对一种类型起作用。如果想用于多种类型,需要创建多个函数
FILE
指定动态库的文件名,不需要包含路径
成功注册后就可在 OpenMLDB 中使用该自定义函数:SELECT cut2(c1) FROM t1
可以通过 SHOW FUNCTIONS
查看已注册的 UDF
SHOW FUNCTIONS;
通过 DROP FUNCTION
删除已注册的 UDF
DROP FUNCTION cut2;
Warning
同一个udf so如果注册了多个函数,只删除一个函数时,该so不会从Tablet Server内存中删除。此时替换so文件是无用的,并且如果此时增删udf,有一定危险影响Tablet Server运行。
建议:删除所有udf后,替换udf so文件。