在SQL任务中使用UDTF

UDTF函数是flink 提供的自定义函数中的一类表值函数,和标量函数不同的是,它可以返回任意多行,返回的行也可以包含1到多列。 要定义一个表值函数,需要扩展flink.table.functions 下的TableFunction,可以实现多个名为eval 的方法对求值进行重载。 详情可查看UTDF

函数开发

以解析原始日志为例,编写UDTF 从原始日志中提取appid,uid,cid等信息。原始日志如下:

        2021-11-20 12:34:25,346 48135186 [app-2-4] INFO  - {"appid":3,"uid":207051,"consid":"d6eaf66c-f2eb-4fhc-b9f0-c238b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":11,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382865000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}
        2020-11-20 12:35:00,346 48135186 [app-2-4] INFO  - {"appid":7,"uid":207351,"consid":"d6eaf66c-f2eb-4fbc-b9f0-c298b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":17,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382900000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}
        2021-11-20 12:35:49,346 48135186 [app-2-4] INFO  - {"appid":1,"uid":207451,"consid":"d4eaf66c-f2eb-4fbc-b9f0-c438b295496b","ctype":"AOS","sdkVer":62,"protVer":1,"sid":6,"cid":18,"spendtime":0,"retcode":200,"props":{"antispamVer":"0","taskDeltaTime":0},"begintime":1637382949000,"packsize":104,"cluster":"defaultGroup","traceid":"b5006527c5184712bab255a14b16bee7"}

pom.xml

 <properties>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>

    <dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope> <!-- 这个依赖参与编译,不参与运行和打包-->
    </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

程序代码:

package com.netease;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;

/**
 * @author ZhiYuan
 * @date 2021/12/7 10:38 上午
 **/
public class UDTFLogSplit extends TableFunction <Tuple3<Long, Integer, Integer>> {

    public  void eval(String message) {

        Tuple3<Long, Integer, Integer> row = new Tuple3<>();
        String[] split = message.split("- ");
        String json = split[1].substring(0, split[1].length());

        JsonObject jsonObject = new JsonParser().parse(json).getAsJsonObject();
        long uid = jsonObject.get("uid").getAsLong();
        int sid = jsonObject.get("sid").getAsInt();
        int cid = jsonObject.get("cid").getAsInt();

        Tuple3<Long,Integer,Integer> tuple3 = Tuple3.of(uid,sid,cid);
        collect(tuple3);

    }
}

打包上传

jdk版本使用1.8
使用maven 工具,单击package 对当前项目进行打包即可。注意flink 集群依赖不参与打包 mavenpackage

上传函数包到平台,为了分类管理不同业务的函数,可以创建文件夹,并将函数上传到指定的目录位置。 uploadFunction

函数创建&注册

在“UDF管理“ 中新增函数 createFunction

在任务开发中添加新增的函数到函数依赖中

functionrelyon

通过create function语句申明使用

create function splitAppLog as 'com.netease.UDTFLogSplit';

SQL 写法

使用lateral table 语句:

insert into sink_print 
select 
    T.uid,
    T.sid,
    T.cid
 from appLog, lateral table(splitAppLog(log)) as T(uid,sid,cid);

使用 left join on true 配合lateral table

insert into sink_print 
select 
    T.uid,
    T.sid,
    T.cid
 from appLog
 left join lateral table(splitAppLog(log)) as T(uid,sid,cid) on true;

需要注意的是在程序中如果未指定与字段对应的变量名称,默认按照顺序输出字段。如果出现字段值对应错误问题,请检查字段顺序与程序解析字段顺序。

完整sql:

--Comment: 请输入业务注释信息
--********************************************************************--
create table appLog(
    log String
)with(
   'connector' = 'kafka',
  'topic' = 'app_log',
  'properties.bootstrap.servers' = 'xxx:9092',
  'properties.group.id' = 'logs_moudle',
  'scan.startup.mode' = 'latest-offset',
  'parallelism' = '1',
  'format' = 'raw'
);

create table sink_print(
    uid bigint,
    sid int,
    cid int
)with(
 'connector' = 'print'
);

-- 创建函数声明
create function splitAppLog as 'com.netease.UDTFLogSplit';

insert into sink_print 
select 
    T.uid,
    T.sid,
    T.cid
 from appLog
 left join lateral table(splitAppLog(log)) as T(uid,sid,cid) on true;


 输出结果:
+I(207451,6,18)
+I(207451,6,18)
+I(207451,6,18)
+I(207451,3,11)
+I(207451,6,18)
+I(207451,6,18)