Arctic 维表 Join

具体 JOIN 语法如下:

SELECT [column_list]
FROM table1 [AS <alias1>]
LEFT JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.keyed_column-name1 [AND ...]

使用说明

注意:维表必须定义主键,并且 Join 条件必须包含所有主键字段。 以下是 Arctic 作为维表 Join 的使用示例:

-- 在当前 session 中以流的模式运行 Flink 任务
SET 'execution.runtime-mode' = 'streaming';

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET 'table.dynamic-table-options.enabled'='true';

-- 创建一张 Arctic 
CREATE TABLE IF NOT EXISTS arctic_catalog.default_db.`user` (
    id   INT,
    name STRING,
    age  INT,
    PRIMARY KEY (id) NOT ENFORCED
);

-- 创建一张主表,需要将 LOCALTIMESTAMP 定义为 watermark。如下所示:
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    user_id     INT,
    order_time  AS LOCALTIMESTAMP,
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

--  Arctic 表添加 watermark"opt" 可以是任意名称,但不能与 arctic_catalog.default_db.`user` 已有字段同名
CREATE TABLE user_dim (
    opt TIMESTAMP(3),
    WATERMARK FOR opt AS opt
) LIKE arctic_catalog.default_db.`user`;

-- 开启 Arctic 表流读和维表的配置
SELECT order_id, price, user_id, name, age 
FROM orders
LEFT JOIN user_dim /*+OPTIONS('streaming'='true', 'dim-table.enabled'='true')*/
    FOR SYSTEM_TIME AS OF orders.order_time
ON orders.user_id = user_dim.id
Key 默认值 类型 是否必填 描述
dim-table.enabled false Boolean 是否将 Arctic 表作为维表使用,默认false。作为维表时需要设置为 true
  • 当 Arctic 作为维表(user 表)数据量很大时,需要一定的存量数据加载时间。在此期间,左表(orders)的数据会缓存在 Join 算子中,直到维表存量数据加载完, 才会触发 Join 算子的关联操作并向下游输出数据。
  • 现阶段维表读 Arctic Filestore 会存在一定的时间延迟,如果左表(orders)的数据是毫秒级延迟的实时数据,需要指定允许一定时间的延迟,让左表数据缓存一段时间后,再触发 Join。 如允许 10s 的延迟:WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND,避免左表(orders)的数据比维表数据快,导致 Join 关联不上维表侧(user 表)的数据。
  • 未来会基于 Arctic Logstore 实现更实时的维表

注意事项

  • 如果主表有指定 watermark 的需求,如需要维表 Join 后作窗口计算,则可以自由指定:
CREATE TABLE orders (
  order_id    STRING,
  price       DECIMAL(32,2),
  user_id     INT,
  order_time  TIMESTAMP(3),
  WATERMARK FOR order_time AS order_time
  ) WITH (/* ... */);

性能测试

与 Hive 作为维表性能测试对比:

  1. Hive 作为维表,其大小受 TM 的内存限制,并且 TM 的内存较难设置,需要预估真实数据解压后的真实大小。而 Arctic 表作为维表几乎不限制 TM 的内存。比如 HDFS 中 4G 的数据,Hive 运行需要配置 32G 内存,但是 Arctic 只需要 4G 即可。即使增大并行度,Hive 对于单个 TM 也需要 32G,而 Arctic 在 4 个并行度时单个 TM 2G 就可以运行。
  2. Arctic 维表加载速度可调整。增加并行度,Hive 的维表数据加载时间不变,而 Arctic 维表数据加载耗时随并行度的增大而降低。
  3. Arctic 维表 Failover 耗费的时间比 Hive 短。Arctic 利用了 Rocksdb 的增量 checkpoint,可以秒级恢复任务。而 Hive 需要重新读取加载数据。
  4. Arctic 维表快速加载增量数据,而 Hive 需要定时地全量重新加载来实现。
  5. Arctic 最佳实践:用户可调整并发度动态满足吞吐率
最佳配置 说明
维表规模 2100w,13G
Flink 配置 taskmanager.memory.managed.fraction: 0.7
并行度: 16
内存: 2G
Arctic 配置 base.file-index.hash-bucket: 16
change.file-index.hash-bucket: 16
任务可达吞吐率 10w/s

原理解析

详细说明请见:《Arctic 实时维表 Join 原理解析》