INFO-FlinkUDF开发和使用
更新时间: 2024-03-11 02:44:35
阅读 975
INFO-FlinkUDF开发和使用
适用模块
实时计算,UDF Studio具体说明
在本地IDE进行FlinkUDF开发,打包后上传并注册到UDF Studio,并在FlinkSQL中使用使用示例
1. 创建Maven项目并进行项目配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!--
单纯开发UDF只需要引入这个依赖即可
注意:1. 在打包时,不需要在您的Jar中包含Flink的核心依赖,比如flink-core,flink-common等等,
所有您需要设置这些依赖的scope属性为provided
2. Hadoop集群中包含的一些依赖,比如hadoop-common, hadoop-core等以及日志依赖也不建议打包到即将上传的UDF Jar中
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Shade:打包插件,如果您使用了第三方依赖,并且需要将其打包到您的Jar中,这个插件可以帮您完成 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.scala-lang:*</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- Scala Compiler:在使用Scala开发时很有必要配置这个,否则在IDEA中点击打包时并不会帮您自动编译scala代码 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. 编写一个UDF将输入字符进行MD5加密
根据要求,编写UDF要继承org.apache.flink.table.functions.ScalarFunction,并实现eval方法package org.example;
import org.apache.flink.table.functions.ScalarFunction;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
/**
* Md5UDF
*
* @author xgSama
* @date 2023/11/6 16:26:20
*/
public class Md5UDF extends ScalarFunction {
/**
* 根据Flink UDF的要求,定义此方法,方法名固定为 eval
*
* @param input 入参
* @return md5加密后字符
*/
public String eval(String input) {
try {
// 获取MD5哈希算法实例
MessageDigest md = MessageDigest.getInstance("MD5");
// 计算输入字符串的哈希值
md.update(input.getBytes());
byte[] digest = md.digest();
// 将字节数组转换为十六进制字符串
StringBuilder sb = new StringBuilder();
for (byte b : digest) {
sb.append(String.format("%02x", b & 0xff));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 algorithm not found", e);
}
}
}
3.使用maven进行打包
若您使用IDEA进行开发,在完事上述配置及开放后,可以直接点击package进行打包。 或者在项目的根目录使用mvn命令进行打包mvn clean package
。完成后,非original开头的那个包是我们在后面需要的包。
4.在UDF Studio中注册这个函数
1. 先上传我们的Jar 2. 创建函数 3. 上线函数 函数在函数管理页面点击上线,并由相应角色审批后方可使用5. 在FlinkSQL中测试使用
CREATE TABLE mock (
record STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.record.length' = '20'
);
CREATE TABLE sink (
record STRING,
record_md5 STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink
SELECT
record, udfstudio_md5_test(record) AS record_md5
FROM mock;
查看运行结果
作者:程艺哲
文档反馈
以上内容对您是否有帮助?