Flink-1.12

简述

支持 Arctic 实时数据湖的读写

创建 Catalog

CREATE CATALOG slothTest WITH (
  'type'='arctic',
  'metastore.url'='thrift://127.0.0.1:14230/catalogName',
  'action-column.include'='false'
);
  • type:catalog类型
  • metastore.url:元数据中心地址。thrift://{ip}:{port}/{catalogName}
  • action-column.include:可选,’true’ or ‘false’. 是否在表中添加action列,默认开启

通过 SQL 读取

读取文件数据

Hive升级的表

  • 默认读log
SELECT * from arctic_catalog_dev.ndc_test_db.test_keyed_tb /*+ OPTIONS('arctic.source.batch.mode.enable'='true')*/;

新建表

  • 默认读file
incremental pull

-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 读取当前snapshot的全量数据,之后读取从该snapshot开始的增量数据,监控时间为1s
SELECT * FROM arctic_catalog.arctic_db.sample /*+ OPTIONS('streaming'='true','monitor-interval'='1s') */;

-- 读取从指定snapshot-id开始(不包含该 snapshot)的增量数据,监控时间为1s
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947')*/ ;

支持的参数包括:

  • streaming:是否开启流模式,默认为false不开启。
  • case-sensitive:是否忽略大小写,默认为false不开启
  • start-snapshot-id:从指定的snapshot开始读取增量数据(不包括start-snapshot-id的快照数据)
  • monitor-interval:监控数据文件的时间间隔,默认为10s
读历史快照
  • bounded 读
-- 读取当前snapshot的全量数据
SELECT * FROM arctic_catalog.arctic_db.sample;

-- 读取指定snapshot-id 的全量数据,监控时间为1s
SELECT * FROM sample /*+ OPTIONS('snapshot-id'='3821550127947')*/ ;

支持的参数包括:

  • case-sensitive:是否忽略大小写,默认为false不开启
  • snapshot-id:读指定的snapshot数据(该snapshot的全量数据)
  • as-of-timestamp:读小于该时间的最近一次snapshot数据(该snapshot的全量数据)
  • start-snapshot-id:配合end-snapshot-id,读两个区间的增量数据(snapshot1, snapshot2]
  • end-snapshot-id

读取实时数据,即log数据

通过下面的 SQL 可以从 Arctic 表中读取到实时的流式数据:

如果是新建的表(即非hive升级的表),需要在OPTIONS中增加参数 ‘arctic_read_mode’=’log’

-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 读取表中的实时数据
SELECT * FROM arctic_catalog.arctic_db.sample /*+ OPTIONS('properties.group.id'='test','scan.startup.mode'='earliest-offset') */;

实时数据来自与表绑定的 Kafka Topic,可以通过 Flink SQL Hint 语法在 SQL 中注入读取表时的参数,支持的参数包括:

  • properties.group.id:读取 Kafka Topic 时使用的 group id
  • scan.startup.mode:Kafka 消费者初次启动时获取 offset 的模式,合法的取值包括:earliest-offset、latest-offset、 group-offsets、timestamp、specific-offsets, 具体取值的含义可以参考 Flink官方手册
  • scan.startup.specific-offsets:scan.startup.mode 取值为 specific-offsets 时,为每个分区设置的起始 offset, 参考格式:partition:0,offset: 42;partition:1,offset:300
  • scan.startup.timestamp-millis:scan.startup.mode 取值为 timestamp 时,初次启动时获取数据的起始时间戳(毫秒级)
  • properties.*:Kafka Consumer 支持的其他所有参数都可以通过在前面拼接 properties. 的前缀来设置,如: 'properties.batch.size'='16384',完整的参数信息可以参考 Kafka官方手册

关联维度数据

无主键表暂不支持

通过下面的 SQL 可以从 Arctic 表中读取到维表的数据:

-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 关联维度表中的数据,必须加上FOR SYSTEM_TIME AS OF ${souce_table_event_time_column},表示JOIN维表当前时刻所看到的每条数据
SELECT source.id, dimension.data
FROM arctic_catalog.arctic_db.source
LEFT JOIN arctic_catalog.arctic_db.dimension /*+ OPTIONS('index.read.properties.cache.strategy'='LRU','index.read.properties.cache.size'='1000') */
FOR SYSTEM_TIME AS OF source.join_time ON source.id = dimension.id;

实时数据来自与表绑定的 HBase,可以通过 Flink SQL Hint 语法在 SQL 中注入读取表时的参数,支持的参数包括:

  • index.read.properties.cache.strategy:读取HBase时的缓存策略,默认为None不开启,可选值为LRU
  • index.read.properties.cache.size:当选择了LRU缓存策略,可设置缓存数量,默认为10000
  • index.read.properties.cache.ttl:当选择了LRU缓存策略,可设置缓存过期时间,默认3小时,单位:秒

通过 SQL 写入

通过下面的 SQL 可以将实时数据写入 Arctic 表中:


-- 开启通过 SQL Hint 设置表参数
SET table.dynamic-table-options.enabled=true;

-- 往表中写入实时数据
INSERT INTO arctic_catalog.arctic_db.sample /*+ OPTIONS('arctic_emit_mode'='log', 'log.version' = 'v1') */
SELECT id, data from other_kafka_table;

Arctic 异构的数据存储方案,使得其背后可能存在多个存储后端,在写入时可以通过 Flink SQL Hint 指定不同的写入后端, 完整的参数包括:

  • arctic_emit_mode:数据写入模式,现阶段支持的后端包括:file、log、kv,无主键表默认为file,有主键表默认为log,支持同时写入多个后端,不同后端之间用逗号分割, 如:'arctic_emit_mode' = 'file,log,kv'
  • compact.minor.enable:写入时是否开启 Minor Compaction,默认关闭
  • log.version: log 数据格式,包括’v1’或不配置。默认使用旧的版本。当前建议都加上 v1
  • write.shuffle.policy: 只支持写 hive 升级的表 file 部分。包括:none、primary-key、partition-key、primary-partition-key、automatically。 默认automatically。
    • none: forward,不进行 shuffle。
    • primary-key: 按主键 shuffle。
    • partition-key: 按分区 shuffle。
    • primary-partition-key: 按主键+分区 shuffle。
    • automatically: 如果是有主键且有分区表,则为 primary-partition-key;如果是有主键且无分区表,则为 primary-key;如果是无主键且有分区表,则为 partition-key。否则为 none
  • 其他表参数:Arctic 表的所有参数都可以通过 SQL Hint 动态修改,当然只针对此任务生效,具体的参数列表可以参考 表配置