Arctic 连接器

简述

支持 Arctic 实时数据湖的读写

创建 Catalog

CREATE CATALOG slothTest WITH (
  'type'='arctic',
  'metastore.url'='thrift://127.0.0.1:14230/catalogName'
);
  • type:catalog 类型
  • metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}
  • arctic.version:Flink 1.12 版本的可选配置项,表示是否指定 arctic 版本为 0.2.8 以上的高版本,当不填写该配置项时,默认为 0.2.8 的 arctic 版本,当需要指定为非 0.2.8 版本时,需要指定该配置项为 0.3

通过 SQL 读取

读取文件数据(Filestore)

非主键表

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;

-- 读当前快照之后的增量数据
SELECT * FROM unkeyed /*+ OPTIONS('monitor-interval'='1s')*/ ;

Hint Options

Key 默认值 类型 是否必填 描述
streaming true Boolean 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
monitor-interval 10s Duration arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔
start-snapshot-id (none) Long 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据
其他表参数 (none) String Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX

主键表

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;

-- 读全量数据及 changelog 中的 CDC 数据
SELECT * FROM keyed;

Hint Options

Key 默认值 类型 是否必填 描述
streaming true String 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
monitor-interval 10s String arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔
scan.startup.mode latest String 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis'
其他表参数 (none) String Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX

读取实时数据,即log数据

通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;

SELECT * FROM test_table /*+ OPTIONS('arctic.read.mode'='log') */;

支持以下 Hint Options :

Key 默认值 类型 是否必填 描述
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
properties.group.id (none) String 查询时可选,写入时可不填 读取 Kafka Topic 时使用的 group id
scan.startup.mode latest String 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis'
scan.startup.timestamp-millis (none) Long 当'scan.startup.mode'='timestamp'时有效,从指定的Kafka时间读取数据,值为从1970 1月1日 00:00:00.000 GMT 开始的毫秒时间戳
properties.* (none) String Kafka Consumer 支持的其他所有参数都可以通过在前面拼接 properties. 的前缀来设置,如:'properties.batch.size'='16384',完整的参数信息可以参考 Kafka官方手册

通过 SQL 写入

Arctic 表支持通过 Flink Sql 往 Log 或 File 写入数据

INSERT OVERWRITE

当前仅支持非主键表的 INSERT OVERWRITE。INSERT OVERWRITE 只允许以 Flink Batch 的模式运行。 替换表中的数据,Overwrite 为原子操作。 分区会由查询语句中动态生成,这些分区的数据会被全量覆盖。

INSERT OVERWRITE unkeyed VALUES (1, 'a', '2022-07-01');

也支持覆盖指定分区的数据

INSERT OVERWRITE `arctic_catalog`.`arctic_db`.`unkeyed` PARTITION(data='2022-07-01') SELECT 5, 'b';

对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据

INSERT INTO

对于 Arctic 表,可以指定往 File 或 Log(需在建表时 开启 Log 配置)写入数据。 对于 Arctic 主键表,写 File 会将 CDC 数据写入 ChangeStore 中.


-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 同时往 Arctic 表的 Filestore, Logstore 中写入数据
INSERT INTO `arctic_catalog`.`arctic_db`.`test_table` 
    /*+ OPTIONS('arctic.emit.mode'='log,file') */
SELECT id, name from `source`;