Arctic SQL 连接器
更新时间: 2023-08-01 17:46:43
阅读 68
Arctic 连接器
简述
支持 Arctic 实时数据湖的读写
创建 Catalog
CREATE CATALOG slothTest WITH (
'type'='arctic',
'metastore.url'='thrift://127.0.0.1:14230/catalogName'
);
- type:catalog 类型
- metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}
- arctic.version:Flink 1.12 版本的可选配置项,表示是否指定 arctic 版本为 0.2.8 以上的高版本,当不填写该配置项时,默认为 0.2.8 的 arctic 版本,当需要指定为非 0.2.8 版本时,需要指定该配置项为 0.3
通过 SQL 读取
读取文件数据(Filestore)
非主键表
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;
-- 读当前快照之后的增量数据
SELECT * FROM unkeyed /*+ OPTIONS('monitor-interval'='1s')*/ ;
Hint Options
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
streaming | true | Boolean | 否 | 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据 |
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
monitor-interval | 10s | Duration | 否 | arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔 |
start-snapshot-id | (none) | Long | 否 | 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据 |
其他表参数 | (none) | String | 否 | Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX |
主键表
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;
-- 读全量数据及 changelog 中的 CDC 数据
SELECT * FROM keyed;
Hint Options
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
streaming | true | String | 否 | 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据 |
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
monitor-interval | 10s | String | 否 | arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔 |
scan.startup.mode | latest | String | 否 | 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis' |
其他表参数 | (none) | String | 否 | Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX |
读取实时数据,即log数据
通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;
SELECT * FROM test_table /*+ OPTIONS('arctic.read.mode'='log') */;
支持以下 Hint Options :
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
properties.group.id | (none) | String | 查询时可选,写入时可不填 | 读取 Kafka Topic 时使用的 group id |
scan.startup.mode | latest | String | 否 | 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis' |
scan.startup.timestamp-millis | (none) | Long | 否 | 当'scan.startup.mode'='timestamp'时有效,从指定的Kafka时间读取数据,值为从1970 1月1日 00:00:00.000 GMT 开始的毫秒时间戳 |
properties.* | (none) | String | 否 | Kafka Consumer 支持的其他所有参数都可以通过在前面拼接 properties. 的前缀来设置,如:'properties.batch.size'='16384' ,完整的参数信息可以参考 Kafka官方手册 |
通过 SQL 写入
Arctic 表支持通过 Flink Sql 往 Log 或 File 写入数据
INSERT OVERWRITE
当前仅支持非主键表的 INSERT OVERWRITE。INSERT OVERWRITE 只允许以 Flink Batch 的模式运行。 替换表中的数据,Overwrite 为原子操作。 分区会由查询语句中动态生成,这些分区的数据会被全量覆盖。
INSERT OVERWRITE unkeyed VALUES (1, 'a', '2022-07-01');
也支持覆盖指定分区的数据
INSERT OVERWRITE `arctic_catalog`.`arctic_db`.`unkeyed` PARTITION(data='2022-07-01') SELECT 5, 'b';
对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据
INSERT INTO
对于 Arctic 表,可以指定往 File 或 Log(需在建表时 开启 Log 配置)写入数据。 对于 Arctic 主键表,写 File 会将 CDC 数据写入 ChangeStore 中.
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 同时往 Arctic 表的 Filestore, Logstore 中写入数据
INSERT INTO `arctic_catalog`.`arctic_db`.`test_table`
/*+ OPTIONS('arctic.emit.mode'='log,file') */
SELECT id, name from `source`;
文档反馈
以上内容对您是否有帮助?