FileSystem

文件系统连接器支持流式的写,可以按行编码如json,也可按块编码如parquet

从1.4.11.1开始,1.14版本支持写入s3

FLINK-1.12/FLINK-1.14

DDL写法

create table hdfs_test1 (
    user_id string,
    action string,
    appver string
)
partitioned by (user_id)
with (
    'connector'='filesystem',
    'path'='hdfs://bdms-test/user/sloth/lyb_0901',
    'is.related.mammunt' = 'false',
    'hdfs-site' = 'hdfs-site.xml',
    'core-site' = 'core-site.xml',
    'krb.conf' = 'krb5.conf',
    'krb.keytab' = 'sloth.keytab',
    'krb.principal' = 'sloth/dev@BDMS.163.COM',
    'format' = 'parquet',
    'sink.rolling-policy.file-size' = '1MB',
    'sink.rolling-policy.rollover-interval' = '100min',
    'sink.rolling-policy.check-interval' = '1min'
    ,'is.file.name.encrypt' = 'true'
    ,'auth.method' = 'kerberos'
);

Metahub写法

SET 'hdfs_test1.path' = 'hdfs://bdms-test/user/sloth/lyb_meta_0830';
SET 'hdfs_test1.is.related.mammunt' = 'false';

SET 'hdfs_test1.krb.principal' = 'sloth/dev@BDMS.163.COM';
set 'hdfs_test1.auth.method' = 'kerberos';

set 'hdfs_test1.is.file.name.encrypt' = 'true';
SET 'hdfs_test1.krb.keytab' = 'sloth.keytab';
set 'hdfs_test1.krb.conf' = 'krb5.conf';
set 'hdfs_test1.hdfs-site' = 'hdfs-site.xml';
set 'hdfs_test1.core-site' = 'core-site.xml';
SET 'hdfs_test1.part.prefix' = 'begin';
SET 'hdfs_test1.part.suffix' = 'end';

set 'hdfs_test1.partition.default-name' = '_DEFAULT_PART_';
set 'hdfs_test1.connector' = 'filesystem';

SET 'hdfs_test1.format' = 'parquet';
set 'hdfs_test1.compression' = 'snappy';

set 'hdfs_test1.partition.keys' = 'ts_str';
set 'hdfs_test1.sink.parallelism' = '2';

--compaction
set 'hdfs_test1.auto-compaction' = 'true';
set 'hdfs_test1.compaction.file-size' = '1500b';
--partition commit trigger
set 'hdfs_test1.sink.partition-commit.trigger' = 'process-time';
set 'hdfs_test1.sink.partition-commit.delay' = '1h';
set 'hdfs_test1.sink.partition-commit.policy.kind' = 'success-file';
--partition time extractor
set 'hdfs_test1.partition.time-extractor.kind' = 'default';
set 'hdfs_test1.partition.time-extractor.timestamp-pattern' = '$ts_str 01:00:00';
--roll over
set 'hdfs_test1.sink.rolling-policy.file-size' = '3000b';
set 'hdfs_test1.sink.rolling-policy.rollover-interval' = '1000s';
set 'hdfs_test1.sink.rolling-policy.check-interval' = '300s';

属性

1、滚动策略

分区目录下的数据被分割到分区文件中。每个分区对应的sink的每个接受到了数据的子任务都至少会为该分区生成一个分区文件。 根据可配置的滚动策略,当前正在写入的分区文件会被关闭,新的分区文件也会被生成。 该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动分区文件。

Key Default Type Description
sink.rolling-policy.file-size
128MB MemorySize 滚动前,分区文件最大大小.
sink.rolling-policy.rollover-interval
30 min Duration 滚动前,分区文件处于打开状态的最大时长 (默认值是30分钟,以避免产生大量小文件)。 检查该选项的频率由参数 'sink.rolling-policy.check-interval' 控制。
sink.rolling-policy.check-interval
1 min Duration 基于时间的滚动策略的检查间隔。该参数控制了基于参数 'sink.rolling-policy.rollover-interval' 检查分区文件是否该被滚动的检查频率 .

2、文件合并

file sink 支持文件合并,以允许应用程序可以使用较小的检查点间隔而不产生大量文件。

Key Default Type Description
auto-compaction
false Boolean 在流式 sink 中是否开启自动合并功能。数据首先会被写入到临时文件,在检查点完成后,该检查点产生的临时文件会被合并。这些临时文件在合并前不可见.
compaction.file-size
(none) MemorySize 合并目标文件大小,默认值是滚动文件大小.

3、分区提交

分区数据写完毕后,经常需要通知下游应用。比如,在 Hive metastore 中新增分区或者在目录下新增 _SUCCESS 文件。 分区提交策略是可定制的,具体的分区提交行为是基于 triggerspolicies 的组合.

  • Trigger: 分区提交的时机,可以基于从分区中提取的时间对应的水印,或者基于处理时间。
  • Policy: 分区提交策略,内置的策略包括提交 _SUCCESS 文件和 hive metastore, 也可以自己定制提交策略, 比如触发 hive 生成统计信息,合并小文件等。

注意: 分区提交只有在动态分区插入模式下才有效。

(1)分区提交触发器

通过配置分区提交的触发策略,来配置何时提交分区:


Key Default Type Description
sink.partition-commit.trigger
process-time String 分区提交触发器类型。 'process-time': 基于机器时间,既不需要分区时间提取器也不需要水印生成器,一旦 ”当前系统时间“ 超过了 “分区创建系统时间” 和 'sink.partition-commit.delay' 之和,就提交分区; 'partition-time': 基于从分区字段提取的时间,需要水印生成器,一旦 “水印” 超过了 ”从分区字段提取的时间“ 和 'sink.partition-commit.delay' 之和,就提交分区.
sink.partition-commit.delay
0 s Duration 该延迟时间之前分区不会被提交。如果是按天的分区,应配置为 '1 d', 如果是按小时的分区,应配置为 '1 h'.
sink.partition-commit.watermark-time-zone
UTC String 解析 LONG 类型的水印到 TIMESTAMP 类型时所采用的时区,解析得到的水印的 TIMESTAMP 会被用来跟分区时间进行比较以判断分区是否该被提交。 该参数只有在参数 sink.partition-commit.trigger 被设置为 'partition-time' 时才生效。 如果该参数设置的不正确,比如在 TIMESTAMP_LTZ 列上定义了 source rowtime, 但没有设置该参数,则用户可能在若干个小时后才看到分区的提交。 该参数的默认值是 'UTC', 代表水印是定义在 TIMESTAMP 列上或没有定义水印。 如果水印定义在 TIMESTAMP_LTZ 列上,则水印的时区是会话的时区。 该参数的可选值要么是完整的时区名比如 'America/Los_Angeles',要么是自定义的时区 id 比如 'GMT-08:00'.

有两种类型的触发器:

  • 第一种是根据分区的处理时间。 该触发器不需要分区时间提取,也不需要生成水印。通过分区创建时间和当前系统时间来触发分区提交。该触发器更通用但不是很精确。比如,数据的延迟或故障转移都会导致分区的提前提交。
  • 第二种是根据从分区字段提取的时间以及水印。这需要你的作业支持生成水印,分区是根据时间来切割的,比如按小时或按天分区。

如果想让下游系统尽快感知到分区,而不管分区数据是否完整:

  • 'sink.partition-commit.trigger'='process-time' (默认值)
  • 'sink.partition-commit.delay'='0s' (默认值) 一旦分区中有数据,分区立马就会被提交。注意:分区可能会被提交多次。

如果想让下游系统只有在分区数据完整时才感知到分区,且你的作业有水印生成的逻辑,也能从分区字段的值中提取到时间:

  • 'sink.partition-commit.trigger'='partition-time'
  • 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h') 该方式是最精确的提交分区的方式,该方式尽力确保提交的分区包含尽量完整的数据。

如果想让下游系统只有在数据完整时才感知到分区,但是没有水印,或者无法从分区字段的值中提取时间:

  • 'sink.partition-commit.trigger'='process-time' (默认值)
  • 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h') 该方式尽量精确地提交分区,但是数据延迟或故障转移会导致分区的提前提交。

延迟数据的处理:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。

(2)分区时间提取器

时间提取器定义了如何从分区字段值中提取时间.

Key Default Type Description
partition.time-extractor.kind
default String 从分区字段提取时间的时间提取器。支持默认值和定制。对于默认值,可以配置时间戳模式。对于定制,应指定提取器类.
partition.time-extractor.class
(none) String 实现了接口 PartitionTimeExtractor 的提取器类.
partition.time-extractor.timestamp-pattern
(none) String 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-mm-dd hh:mm:ss' 时间戳模式提取。 如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: '$dt'; 如果需要从多个分区字段,比如 'year', 'month', 'day' 和 'hour'提取时间戳,可以配置为:'$year-$month-$day $hour:00:00'; 如果需要从两字分区字段,比如 'dt' 和 'hour' 提取时间戳,可以配置为:'$dt $hour:00:00'.

默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 PartitionTimeExtractor 接口的自定义的提取器。


public class HourPartTimeExtractor implements PartitionTimeExtractor {
    @Override
    public LocalDateTime extract(List<String> keys, List<String> values) {
        String dt = values.get(0);
        String hour = values.get(1);
        return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
    }
}

(3)分区提交策略

分区提交策略指定了提交分区时的具体操作.

  • 第一种是 metastore, 只有 hive 表支持该策略, 该策略下文件系统通过目录层次结构来管理分区.
  • 第二种是 success 文件, 该策略下会在分区对应的目录下写入一个名为 _SUCCESS 的空文件.
Key Default Type Description
sink.partition-commit.policy.kind
(none) String 分区提交策略用来通知下游应用系统某个分区已经写完毕可以被读取了。 metastore: 向 metastore 中增加分区,只有 hive 支持 metastore 策略,文件系统通过目录结构管理分区; success-file: 向目录下增加 '_success' 文件; custom: 使用指定的类来创建提交策略; 支持同时指定多个提交策略,如:'metastore,success-file'.
sink.partition-commit.policy.class
(none) String 实现了 PartitionCommitPolicy 接口的分区提交策略。只有在 custom 提交策略下适用。
sink.partition-commit.success-file.name
_SUCCESS String 使用 success-file 分区提交策略时的文件名,默认值是 '_SUCCESS'.

你也可以实现自己的提交策略,如:


public class AnalysisCommitPolicy implements PartitionCommitPolicy {
    private HiveShell hiveShell;

    @Override
    public void commit(Context context) throws Exception {
        if (hiveShell == null) {
            hiveShell = createHiveShell(context.catalogName());
        }

        hiveShell.execute(String.format(
            "ALTER TABLE %s ADD IF NOT EXISTS PARTITION (%s = '%s') location '%s'",
            context.tableName(),
            context.partitionKeys().get(0),
            context.partitionValues().get(0),
            context.partitionPath()));
        hiveShell.execute(String.format(
            "ANALYZE TABLE %s PARTITION (%s = '%s') COMPUTE STATISTICS FOR COLUMNS",
            context.tableName(),
            context.partitionKeys().get(0),
            context.partitionValues().get(0)));
    }
}

4、Sink 并行度

向外部文件系统(包括 hive) 写文件时的并行度,在流处理模式和批处理模式下,都可以通过对应的 table 选项指定。默认情况下,该并行度跟上一个上游的 chained operator 的并行度一样。当配置了跟上一个上游的 chained operator 不一样的并行度时,写文件的算子和合并文件的算子(如果使用了的话)会使用指定的并行度。

Key Default Type Description
sink.parallelism
(none) Integer 向外部文件系统写文件时的并行度。必须大于 0,否则会抛出异常.

注意: 当前,只有在上游的 changelog 模式是 INSERT-ONLY 时,才支持设置 sink 的并行度。否则的话,会抛出异常。

5、认证

Key Default Type Description
auth.method
(none) String 认证类型,可写simple或kerberos,只有当配置为kerberos时,krb开头的属性才可生效
krb.conf
(none) String kerberos配置文件
krb.keytab
(none) String kerberos认证密钥文件名
krb.principal
(none) String kerberos认证主体名

6、压缩

Key Default Type Description
compression
(none) String 默认不压缩。配置后可按指定格式压缩按行编码、按块编码的文件

jar任务

  1. 引入依赖

flink.verion目前可指定为3.8.0-1.1.0

<dependency>
    <groupId>com.netease.sloth</groupId>
    <artifactId>flink-connector-filesystem-1.12</artifactId>
    <version>${flink.version}</version>
</dependency>
  1. 调用EasyStreamingFileSink向带krb认证的hdfs流式写入

将hdfs、krb相关文件放到resources资源目录,打包后会加到jar包里

        TableMetaStore tableMetaStore = TableMetaStore.builder()
                .withKrbAuth(
                        getResource(HdfsSinkTest.class, "conf" + File.separator + "sloth.keytab"),
                        getResource(HdfsSinkTest.class, "conf" + File.separator + "krb5.conf"),
                        "sloth/dev@BDMS.163.COM"
                )
                .withHdfsSite(getResource(HdfsSinkTest.class, "conf" + File.separator + "hdfs-site.xml"))
                .withCoreSite(getResource(HdfsSinkTest.class, "conf" + File.separator + "core-site.xml"))
                .build();

        EasyStreamingFileSink sink = EasyStreamingFileSink
                .forRowFormat(new Path("hdfs://bdms-test/user/sloth/lyb/krb_sink"), new SimpleStringEncoder<>("UTF-8"))
                .withTableMetaStore(tableMetaStore)
                .withBucketCheckInterval(200)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(20)
                                .build())
                .build();

FLINK-1.10

示例

SQL案例1: 关联猛犸认证文件 & format=parquet:

CREATE TABLE hdfs_sink (
  v_part string,
  v_string STRING,
  v_varchar VARCHAR,
  v_boolean boolean,
  v_byte TINYINT,
  v_short SMALLINT,
  v_int int,
  v_long BIGINT,
  v_float FLOAT,
  v_double DOUBLE
) WITH(
  'connector.type' = 'filesystem',
  'connector.property-version' = '1',
  'path' = 'hdfs://bdms-test/user/sloth/test_file_sink',
  'update-mode' = 'append',
  'hdfs-site' = 'mammut-hdfs-site.xml',
  'core-site' = 'mammut-core-site.xml',
  'is.related.mammunt' = 'true',
  'format' = 'parquet',
  'compression'='none',
  'row.group.size'='1024',
  'part.prefix' = 'newpart',
  'part.suffix' = '_success',
  'partition.keys' = 'v_part',
  'partition.default-name' = '_DEFAULT_PART_'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填:filesystem
connector.property-version connector 配置的版本 必填 :1
path sink file path 必填:hdfs://*
update-mode update 模式 必填:append
hdfs-site hdfs-site.xml 文件名 必填
core-site core-site.xml 文件名 必填
is.related.mammunt krb 认证文件是否关联猛犸 必填:false、true
krb.conf krb conf 文件名 is.related.mammunt=false 时必填
krb.keytab keytab 文件名 is.related.mammunt=false 时必填
krb.principal principal is.related.mammunt=false 时必填
part.prefix part 文件名前缀 选填;part.prefix=newpart,文件名:newpart-0-0
part.suffix part 文件名后缀 选填;part.suffix=success,文件名:part-0-0success
partition.keys 分区字段,需要在 ddl 中定义字段 选填
partition.default-name 分区字段值为空时,使用该值 选填
format sink file 格式 row、json、parquet, format=row 时多个字段用逗号隔开
bucket.check.interval bucket的检查频率 选填;default: 60 * 1000, 单位:ms,format=row、json 时生效
part.size 达到文件大小后生成 part 文件 选填,、;default: 1024 * 1024 * 128, 单位:字节,format=row、json 时生效
rollover.interval 从创建到现在多少时间后,生成 part 文件 选填;default: 60 * 1000, 单位:ms,format=row、jso 时生效
inactivity.interval 超过多少时间不活跃文件,生成 part 文件 选填;default: 60 * 1000, 单位:ms,format=row、json 时生效
charset 字符串编码 选填;default: UTF-8,format=row 时生效
row.group.size parquet 文件大小 选填;default: 1024 * 1024 * 128, 单位:字节,format=parquet 时生效
compression 压缩方式 format=parquet 时必填:none, snappy, gzip, lzo, brotli, lz4, zstd; format=json 或 row 时选填:lzo、lzop