Arctic SQL 连接器
更新时间: 2023-04-24 20:24:10
阅读 161
Flink-1.12
简述
支持 Arctic 实时数据湖的读写
创建 Catalog
CREATE CATALOG slothTest WITH (
'type'='arctic',
'metastore.url'='thrift://127.0.0.1:14230/catalogName',
'action-column.include'='false'
);
- type:catalog类型
- metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}
- action-column.include:可选,'true' or 'false'. 是否在表中添加action列,默认开启
通过 SQL 读取
读取文件数据
Hive升级的表
- 默认读log
SELECT * from arctic_catalog_dev.ndc_test_db.test_keyed_tb /*+ OPTIONS('arctic.source.batch.mode.enable'='true')*/;
新建表
- 默认读file
incremental pull
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 读取当前snapshot的全量数据,之后读取从该snapshot开始的增量数据,监控时间为1s
SELECT * FROM arctic_catalog.arctic_db.sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s') */;
-- 读取从指定snapshot-id开始(不包含该 snapshot)的增量数据,监控时间为1s
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947')*/ ;
支持的参数包括:
- streaming:是否开启流模式,默认为false不开启。
- case-sensitive:是否忽略大小写,默认为false不开启
- start-snapshot-id:从指定的snapshot开始读取增量数据(不包括start-snapshot-id的快照数据)
- monitor-interval:监控数据文件的时间间隔,默认为10s
读历史快照
- bounded 读
-- 读取当前snapshot的全量数据
SELECT * FROM arctic_catalog.arctic_db.sample;
-- 读取指定snapshot-id 的全量数据,监控时间为1s
SELECT * FROM sample /*+ OPTIONS('snapshot-id'='3821550127947')*/ ;
支持的参数包括:
- case-sensitive:是否忽略大小写,默认为false不开启
- snapshot-id:读指定的snapshot数据(该snapshot的全量数据)
- as-of-timestamp:读小于该时间的最近一次snapshot数据(该snapshot的全量数据)
- start-snapshot-id:配合end-snapshot-id,读两个区间的增量数据(snapshot1, snapshot2]
- end-snapshot-id
读取实时数据,即log数据
通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:
如果是新建的表(即非hive升级的表),需要在OPTIONS中增加参数 'arctic_read_mode'='log'
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 读取表中的实时数据
SELECT * FROM arctic_catalog.arctic_db.sample /*+ OPTIONS('properties.group.id'='test','scan.startup.mode'='earliest-offset') */;
实时数据来自与表绑定的 Kafka Topic,可以通过 Flink SQL Hint 语法在 SQL 中注入读取表时的参数,支持的参数包括:
- properties.group.id:读取 Kafka Topic 时使用的 group id
- scan.startup.mode:Kafka 消费者初次启动时获取 offset 的模式,合法的取值包括:earliest-offset、latest-offset、 group-offsets、timestamp、specific-offsets, 具体取值的含义可以参考 Flink官方手册
- scan.startup.specific-offsets:scan.startup.mode 取值为 specific-offsets 时,为每个分区设置的起始 offset, 参考格式:partition:0,offset: 42;partition:1,offset:300
- scan.startup.timestamp-millis:scan.startup.mode 取值为 timestamp 时,初次启动时获取数据的起始时间戳(毫秒级)
- properties.*:Kafka Consumer 支持的其他所有参数都可以通过在前面拼接
properties.
的前缀来设置,如:'properties.batch.size'='16384'
,完整的参数信息可以参考 Kafka官方手册
关联维度数据
无主键表暂不支持
通过下面的 SQL 可以从 Arctic 表中读取到维表的数据:
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 关联维度表中的数据,必须加上FOR SYSTEM_TIME AS OF ${souce_table_event_time_column},表示JOIN维表当前时刻所看到的每条数据
SELECT source.id, dimension.data
FROM arctic_catalog.arctic_db.source
LEFT JOIN arctic_catalog.arctic_db.dimension /*+ OPTIONS('index.read.properties.cache.strategy'='LRU','index.read.properties.cache.size'='1000') */
FOR SYSTEM_TIME AS OF source.join_time ON source.id = dimension.id;
实时数据来自与表绑定的 HBase,可以通过 Flink SQL Hint 语法在 SQL 中注入读取表时的参数,支持的参数包括:
- index.read.properties.cache.strategy:读取HBase时的缓存策略,默认为None不开启,可选值为LRU
- index.read.properties.cache.size:当选择了LRU缓存策略,可设置缓存数量,默认为10000
- index.read.properties.cache.ttl:当选择了LRU缓存策略,可设置缓存过期时间,默认3小时,单位:秒
通过 SQL 写入
通过下面的 SQL 可以将实时数据写入 Arctic 表中:
-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;
-- 往表中写入实时数据
INSERT INTO arctic_catalog.arctic_db.sample /*+ OPTIONS('arctic_emit_mode'='log', 'log.version' = 'v1') */
SELECT id, data from other_kafka_table;
Arctic 异构的数据存储方案,使得其背后可能存在多个存储后端,在写入时可以通过 Flink SQL Hint 指定不同的写入后端, 完整的参数包括:
- arctic_emit_mode:数据写入模式,现阶段支持的后端包括:file、log、kv,无主键表默认为file,有主键表默认为log,支持同时写入多个后端,不同后端之间用逗号分割,
如:
'arctic_emit_mode' = 'file,log,kv'
- compact.minor.enable:写入时是否开启 Minor Compaction,默认关闭
- log.version: log 数据格式,包括'v1'或不配置。默认使用旧的版本。当前建议都加上 v1
- write.shuffle.policy: 只支持写 hive 升级的表 file 部分。包括:none、primary-key、partition-key、primary-partition-key、automatically。 默认automatically。
- none: forward,不进行 shuffle。
- primary-key: 按主键 shuffle。
- partition-key: 按分区 shuffle。
- primary-partition-key: 按主键+分区 shuffle。
- automatically: 如果是有主键且有分区表,则为 primary-partition-key;如果是有主键且无分区表,则为 primary-key;如果是无主键且有分区表,则为 partition-key。否则为 none
- 其他表参数:Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置
文档反馈
以上内容对您是否有帮助?