INFO-FlinkUDF开发和使用

适用模块
实时计算,UDF Studio
具体说明
在本地IDE进行FlinkUDF开发,打包后上传并注册到UDF Studio,并在FlinkSQL中使用
使用示例
1. 创建Maven项目并进行项目配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-udf</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <dependencies>

        <!--
            单纯开发UDF只需要引入这个依赖即可
            注意:1. 在打包时,不需要在您的Jar中包含Flink的核心依赖,比如flink-core,flink-common等等,
                 所有您需要设置这些依赖的scope属性为provided
                 2. Hadoop集群中包含的一些依赖,比如hadoop-common, hadoop-core等以及日志依赖也不建议打包到即将上传的UDF Jar中
         -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <!-- Shade:打包插件,如果您使用了第三方依赖,并且需要将其打包到您的Jar中,这个插件可以帮您完成 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.scala-lang:*</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <!-- Scala Compiler:在使用Scala开发时很有必要配置这个,否则在IDEA中点击打包时并不会帮您自动编译scala代码 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
                        scala classes can be resolved later in the (Java) compile phase -->
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>

                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
                         scala classes can be resolved later in the (Java) test-compile phase -->
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <jvmArgs>
                        <jvmArg>-Xms128m</jvmArg>
                        <jvmArg>-Xmx512m</jvmArg>
                    </jvmArgs>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
2. 编写一个UDF将输入字符进行MD5加密
根据要求,编写UDF要继承org.apache.flink.table.functions.ScalarFunction,并实现eval方法
package org.example;

import org.apache.flink.table.functions.ScalarFunction;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
 * Md5UDF
 *
 * @author xgSama
 * @date 2023/11/6 16:26:20
 */
public class Md5UDF extends ScalarFunction {

    /**
     * 根据Flink UDF的要求,定义此方法,方法名固定为 eval
     *
     * @param input 入参
     * @return md5加密后字符
     */
    public String eval(String input) {
        try {
            // 获取MD5哈希算法实例
            MessageDigest md = MessageDigest.getInstance("MD5");
            // 计算输入字符串的哈希值
            md.update(input.getBytes());
            byte[] digest = md.digest();

            // 将字节数组转换为十六进制字符串
            StringBuilder sb = new StringBuilder();
            for (byte b : digest) {
                sb.append(String.format("%02x", b & 0xff));
            }
            return sb.toString();
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 algorithm not found", e);
        }
    }
}
3.使用maven进行打包
若您使用IDEA进行开发,在完事上述配置及开放后,可以直接点击package进行打包。 或者在项目的根目录使用mvn命令进行打包mvn clean package。完成后,非original开头的那个包是我们在后面需要的包。
4.在UDF Studio中注册这个函数
1. 先上传我们的Jar INFO-FlinkUDF开发和使用 - 图1 2. 创建函数 INFO-FlinkUDF开发和使用 - 图2 3. 上线函数 函数在函数管理页面点击上线,并由相应角色审批后方可使用
5. 在FlinkSQL中测试使用
CREATE TABLE mock (
    record STRING
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1',
  'fields.record.length' = '20'
);

CREATE TABLE sink (
  record STRING,
  record_md5 STRING
) WITH (
  'connector' = 'print'
);

INSERT INTO sink 
SELECT 
    record, udfstudio_md5_test(record) AS record_md5
FROM mock;

INFO-FlinkUDF开发和使用 - 图3

查看运行结果 INFO-FlinkUDF开发和使用 - 图4


作者:程艺哲