Arctic SQL 连接器
Arctic 连接器
简述
支持 Arctic 实时数据湖的读写
创建 Catalog
CREATE CATALOG slothTest WITH (
'type'='arctic',
'metastore.url'='thrift://127.0.0.1:14230/catalogName'
);
- type:catalog 类型
- metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}
通过 SQL 读取
读取文件数据(Filestore)
非主键表
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;
-- 读当前快照之后的增量数据
SELECT * FROM unkeyed /*+ OPTIONS('monitor-interval'='1s')*/ ;
Hint Options
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
streaming | true | Boolean | 否 | 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据 |
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
monitor-interval | 10s | Duration | 否 | arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔 |
start-snapshot-id | (none) | Long | 否 | 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据 |
其他表参数 | (none) | String | 否 | Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX |
主键表
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;
-- 读全量数据及 changelog 中的 CDC 数据
SELECT * FROM keyed;
Hint Options
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
streaming | true | String | 否 | 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据 |
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
monitor-interval | 10s | String | 否 | arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔 |
scan.startup.mode | latest | String | 否 | 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis' |
其他表参数 | (none) | String | 否 | Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX |
读取实时数据,即log数据
通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:
-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;
SELECT * FROM test_table /*+ OPTIONS('arctic.read.mode'='log') */;
支持以下 Hint Options :
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
arctic.read.mode | file | String | 否 | 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置 |
properties.group.id | (none) | String | 查询时可选,写入时可不填 | 读取 Kafka Topic 时使用的 group id |
scan.startup.mode | latest | String | 否 | 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis' |
scan.startup.timestamp-millis | (none) | Long | 否 | 当'scan.startup.mode'='timestamp'时有效,从指定的Kafka时间读取数据,值为从1970 1月1日 00:00:00.000 GMT 开始的毫秒时间戳 |
properties.* | (none) | String | 否 | Kafka Consumer 支持的其他所有参数都可以通过在前面拼接 properties. 的前缀来设置,如:'properties.batch.size'='16384' ,完整的参数信息可以参考 Kafka官方手册 |
通过 SQL 写入
Arctic 表支持通过 Flink Sql 往 Log 或 File 写入数据
INSERT OVERWRITE
当前仅支持非主键表的 INSERT OVERWRITE。INSERT OVERWRITE 只允许以 Flink Batch 的模式运行。 替换表中的数据,Overwrite 为原子操作。 分区会由查询语句中动态生成,这些分区的数据会被全量覆盖。
INSERT OVERWRITE unkeyed VALUES (1, 'a', '2022-07-01');
也支持覆盖指定分区的数据
INSERT OVERWRITE `arctic_catalog`.`arctic_db`.`unkeyed` PARTITION(data='2022-07-01') SELECT 5, 'b';
对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据
INSERT INTO
对于 Arctic 表,可以指定往 File 或 Log(需在建表时 开启 Log 配置)写入数据。 对于 Arctic 主键表,写 File 会将 CDC 数据写入 ChangeStore 中.
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 同时往 Arctic 表的 Filestore, Logstore 中写入数据
INSERT INTO `arctic_catalog`.`arctic_db`.`test_table`
/*+ OPTIONS('arctic.emit.mode'='log,file') */
SELECT id, name from `source`;
Lookup Joins
Lookup Joins 是流式查询中的一种连接。它用于从 Arctic 查询的数据来打宽一个表。该连接要求事实表有一个 process time 属性。
create catalog arctic with (
'type'='arctic',
'metastore.url'='thrift://***:***/trino_online_env_hive'-- 具体地址可以查询 AMS
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN arctic.db_test.customer FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
加速初始化维表
Lookup Join 算子将在本地维护一个 RocksDB 缓存,并根据 lookup.reloading.interval
拉取该表的最新数据更新到 rocksdb。Lookup join 算子将只提取必要的数据,所以你的分区的过滤条件对性能非常重要;Projection filter 下推也对加载性能有很大提升,如果不需要使用的维表字段可以不 Select。
Shuffle Hash Lookup join
开启以下配置可以加速维表初始化,提高维表 Cache 命中率,提升 Lookup Join 性能
-- 加速初始化,提高维表cache命中率
set 'table.lookup.hash-shuffle.enabled'='true';
限制
左表如果是有更新数据(update/delete)数据,需要左表的主键字段包含 lookup keys,否则左表数据经过 Hash shuffle 后数据有可能会乱序。
Arctic 维表参数说明
参数 | 默认值 | 类型 | 说明 |
---|---|---|---|
lookup.cache.max-rows | 10000 | Long | Cache data to memory using the LRU algorithm. |
rocksdb.writing-threads | 5 | Integer | Writing data into rocksDB thread number. |
rocksdb.block-cache.capacity | 32 * 1024 * 1024 | Long | Use the LRUCache strategy for blocks, the size of the BlockCache can be configured based on your memory requirements and available system resources. Default is 32MB. |
rocksdb.block-cache.numShardBits | -1 | Integer | Use the LRUCache strategy for blocks. The cache is sharded to 2^numShardBits shards, by hash of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and number of shard bits will not exceed 6. |
rocksdblookup.cache.ttl-after-write | none | Duration | The TTL after which the row will expire in the lookup cache. |
lookup.reloading.interval | 10 s | Duration | Configuration option for specifying the interval in seconds to reload lookup data in RocksDB. The default value is 10 seconds. |
以上内容对您是否有帮助?