Arctic 维表 Join
更新时间: 2023-04-24 20:24:25
阅读 167
Arctic 维表 Join
具体 JOIN 语法如下:
SELECT [column_list]
FROM table1 [AS <alias1>]
LEFT JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.keyed_column-name1 [AND ...]
使用说明
注意:维表必须定义主键,并且 Join 条件必须包含所有主键字段。 以下是 Arctic 作为维表 Join 的使用示例:
-- 在当前 session 中以流的模式运行 Flink 任务
SET 'execution.runtime-mode' = 'streaming';
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET 'table.dynamic-table-options.enabled'='true';
-- 创建一张 Arctic 表
CREATE TABLE IF NOT EXISTS arctic_catalog.default_db.`user` (
id INT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
);
-- 创建一张主表,需要将 LOCALTIMESTAMP 定义为 watermark。如下所示:
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
user_id INT,
order_time AS LOCALTIMESTAMP,
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 给 Arctic 表添加 watermark。"opt" 可以是任意名称,但不能与 arctic_catalog.default_db.`user` 已有字段同名
CREATE TABLE user_dim (
opt TIMESTAMP(3),
WATERMARK FOR opt AS opt
) LIKE arctic_catalog.default_db.`user`;
-- 开启 Arctic 表流读和维表的配置
SELECT order_id, price, user_id, name, age
FROM orders
LEFT JOIN user_dim /*+OPTIONS('streaming'='true', 'dim-table.enabled'='true')*/
FOR SYSTEM_TIME AS OF orders.order_time
ON orders.user_id = user_dim.id
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
dim-table.enabled | false | Boolean | 否 | 是否将 Arctic 表作为维表使用,默认false。作为维表时需要设置为 true |
- 当 Arctic 作为维表(user 表)数据量很大时,需要一定的存量数据加载时间。在此期间,左表(orders)的数据会缓存在 Join 算子中,直到维表存量数据加载完, 才会触发 Join 算子的关联操作并向下游输出数据。
- 现阶段维表读 Arctic Filestore 会存在一定的时间延迟,如果左表(orders)的数据是毫秒级延迟的实时数据,需要指定允许一定时间的延迟,让左表数据缓存一段时间后,再触发 Join。 如允许 10s 的延迟:WATERMARK FOR order_time AS order_time - INTERVAL '10' SECOND,避免左表(orders)的数据比维表数据快,导致 Join 关联不上维表侧(user 表)的数据。
- 未来会基于 Arctic Logstore 实现更实时的维表
注意事项
- 如果主表有指定 watermark 的需求,如需要维表 Join 后作窗口计算,则可以自由指定:
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
user_id INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
性能测试
与 Hive 作为维表性能测试对比:
- Hive 作为维表,其大小受 TM 的内存限制,并且 TM 的内存较难设置,需要预估真实数据解压后的真实大小。而 Arctic 表作为维表几乎不限制 TM 的内存。比如 HDFS 中 4G 的数据,Hive 运行需要配置 32G 内存,但是 Arctic 只需要 4G 即可。即使增大并行度,Hive 对于单个 TM 也需要 32G,而 Arctic 在 4 个并行度时单个 TM 2G 就可以运行。
- Arctic 维表加载速度可调整。增加并行度,Hive 的维表数据加载时间不变,而 Arctic 维表数据加载耗时随并行度的增大而降低。
- Arctic 维表 Failover 耗费的时间比 Hive 短。Arctic 利用了 Rocksdb 的增量 checkpoint,可以秒级恢复任务。而 Hive 需要重新读取加载数据。
- Arctic 维表快速加载增量数据,而 Hive 需要定时地全量重新加载来实现。
- Arctic 最佳实践:用户可调整并发度动态满足吞吐率
最佳配置 | 说明 |
---|---|
维表规模 | 2100w,13G |
Flink 配置 | taskmanager.memory.managed.fraction: 0.7 并行度: 16 内存: 2G |
Arctic 配置 | base.file-index.hash-bucket: 16 change.file-index.hash-bucket: 16 |
任务可达吞吐率 | 10w/s |
原理解析
详细说明请见:《Arctic 实时维表 Join 原理解析》
文档反馈
以上内容对您是否有帮助?