Arctic 连接器

简述

支持 Arctic 实时数据湖的读写

创建 Catalog

CREATE CATALOG slothTest WITH (
  'type'='arctic',
  'metastore.url'='thrift://127.0.0.1:14230/catalogName'
);
  • type:catalog 类型
  • metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}

通过 SQL 读取

读取文件数据(Filestore)

非主键表

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;

-- 读当前快照之后的增量数据
SELECT * FROM unkeyed /*+ OPTIONS('monitor-interval'='1s')*/ ;

Hint Options

Key 默认值 类型 是否必填 描述
streaming true Boolean 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
monitor-interval 10s Duration arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔
start-snapshot-id (none) Long 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据
其他表参数 (none) String Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX

主键表

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled = true;

-- 读全量数据及 changelog 中的 CDC 数据
SELECT * FROM keyed;

Hint Options

Key 默认值 类型 是否必填 描述
streaming true String 以流的方式读取有界数据还是无解数据,false:读取有界数据,true:读取无界数据
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
monitor-interval 10s String arctic.read.mode = file 时才生效。监控新提交数据文件的时间间隔
scan.startup.mode latest String 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis'
其他表参数 (none) String Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置。对于Catalog上的权限相关配置,也可以配置在Hint中,参数见 catalog ddl 中的 properties.auth.XXX

读取实时数据,即log数据

通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:

-- 在当前 session 中以流的模式运行 Flink 任务
SET execution.runtime-mode = streaming;

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET table.dynamic-table-options.enabled=true;

SELECT * FROM test_table /*+ OPTIONS('arctic.read.mode'='log') */;

支持以下 Hint Options :

Key 默认值 类型 是否必填 描述
arctic.read.mode file String 指定读 Arctic 表 File 或 Log 的数据。当值为 log 时,必须 开启 Log 配置
properties.group.id (none) String 查询时可选,写入时可不填 读取 Kafka Topic 时使用的 group id
scan.startup.mode latest String 有效值为earliest、latest、timestamp(读file暂未支持)。当arctic.read.mode = file 时仅支持earliest、latest。'earliest'表示读取全量表数据,在streaming=true时会继续incremental pull;'latest':表示读取当前snapshot之后的数据,不包括当前snapshot数据。当arctic.read.mode = log 时,表示 Kafka 消费者初次启动时获取 offset 的模式,'earliest'表示从Kafka中最早的位置读取,'latest'表示从最新的位置读取,'timestamp'表示从Kafka中指定时间位置读取,需配置参数 'scan.startup.timestamp-millis'
scan.startup.timestamp-millis (none) Long 当'scan.startup.mode'='timestamp'时有效,从指定的Kafka时间读取数据,值为从1970 1月1日 00:00:00.000 GMT 开始的毫秒时间戳
properties.* (none) String Kafka Consumer 支持的其他所有参数都可以通过在前面拼接 properties. 的前缀来设置,如:'properties.batch.size'='16384',完整的参数信息可以参考 Kafka官方手册

通过 SQL 写入

Arctic 表支持通过 Flink Sql 往 Log 或 File 写入数据

INSERT OVERWRITE

当前仅支持非主键表的 INSERT OVERWRITE。INSERT OVERWRITE 只允许以 Flink Batch 的模式运行。 替换表中的数据,Overwrite 为原子操作。 分区会由查询语句中动态生成,这些分区的数据会被全量覆盖。

INSERT OVERWRITE unkeyed VALUES (1, 'a', '2022-07-01');

也支持覆盖指定分区的数据

INSERT OVERWRITE `arctic_catalog`.`arctic_db`.`unkeyed` PARTITION(data='2022-07-01') SELECT 5, 'b';

对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据

INSERT INTO

对于 Arctic 表,可以指定往 File 或 Log(需在建表时 开启 Log 配置)写入数据。 对于 Arctic 主键表,写 File 会将 CDC 数据写入 ChangeStore 中.


-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 同时往 Arctic 表的 Filestore, Logstore 中写入数据
INSERT INTO `arctic_catalog`.`arctic_db`.`test_table` 
    /*+ OPTIONS('arctic.emit.mode'='log,file') */
SELECT id, name from `source`;

Lookup Joins

Lookup Joins 是流式查询中的一种连接。它用于从 Arctic 查询的数据来打宽一个表。该连接要求事实表有一个 process time 属性。

create catalog arctic with (
    'type'='arctic',
    'metastore.url'='thrift://***:***/trino_online_env_hive'-- 具体地址可以查询 AMS
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN arctic.db_test.customer FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

加速初始化维表

Lookup Join 算子将在本地维护一个 RocksDB 缓存,并根据 lookup.reloading.interval 拉取该表的最新数据更新到 rocksdb。Lookup join 算子将只提取必要的数据,所以你的分区的过滤条件对性能非常重要;Projection filter 下推也对加载性能有很大提升,如果不需要使用的维表字段可以不 Select。

Shuffle Hash Lookup join

开启以下配置可以加速维表初始化,提高维表 Cache 命中率,提升 Lookup Join 性能

-- 加速初始化,提高维表cache命中率
set 'table.lookup.hash-shuffle.enabled'='true';

限制

左表如果是有更新数据(update/delete)数据,需要左表的主键字段包含 lookup keys,否则左表数据经过 Hash shuffle 后数据有可能会乱序。

Arctic 维表参数说明

参数 默认值 类型 说明
lookup.cache.max-rows 10000 Long Cache data to memory using the LRU algorithm.
rocksdb.writing-threads 5 Integer Writing data into rocksDB thread number.
rocksdb.block-cache.capacity 32 * 1024 * 1024 Long Use the LRUCache strategy for blocks, the size of the BlockCache can be configured based on your memory requirements and available system resources. Default is 32MB.
rocksdb.block-cache.numShardBits -1 Integer Use the LRUCache strategy for blocks. The cache is sharded to 2^numShardBits shards, by hash of the key. Default is -1, means it is automatically determined: every shard will be at least 512KB and number of shard bits will not exceed 6.
rocksdblookup.cache.ttl-after-write none Duration The TTL after which the row will expire in the lookup cache.
lookup.reloading.interval 10 s Duration Configuration option for specifying the interval in seconds to reload lookup data in RocksDB.
The default value is 10 seconds.