Iceberg 连接器

简述

支持 Iceberg 实时数据湖的读写

Iceberg 官方文档:https://iceberg.apache.org/docs/1.0.0/flink/

目前只支持通过 DDL 方式进行使用,暂不支持通过元数据中心进行使用的方式

创建 Catalog

--目前只支持 Hive Catalog--

需要 kerberos 验证的 catalog 定义

CREATE CATALOG iceberg_caltalog WITH (
    'catalog-type'='hive',
    'type'='iceberg',
    'uri'='thrift://localhost:9083',
    'auth.method'='kerberos',
    'hdfs-site'='hdfs-site-hms.xml',
    'core-site'='core-site-hms.xml',
    'hive-site'='hive-site-hms.xml',
    'krb.conf'='krb5-hms.conf',
    'krb.keytab'='krb-hms.keytab',
    'krb.principal'='your-principal'
);
  • catalog-type:catalog 类型,目前只支持 hive
  • type: 固定为 iceberg
  • uri:Hive metastore 的 thrift URI
  • auth.method:认证方式,支持 kerberos 和 simple 两种模式
  • hdfs-site:依赖的 hdfs-site.xml 文件名,kerberos 认证模式下必填
  • core-site:依赖的 core-site.xml 文件名,kerberos 认证模式下必填
  • hive-site:依赖的 hive-site.xml 文件名,kerberos 认证模式下必填
  • krb.conf:krb.conf 文件名,kerberos 认证模式下必填
  • krb.keytab:krb.keytab 文件名,kerberos 认证模式下必填
  • krb.principal:指定的 principal 标识符,kerberos 认证模式下必填
  • simple.user.name:指定的Hadoop UserName,simple 模式下必填

通过 simple 方式访问的 Catalog 定义

CREATE CATALOG iceberg_caltalog WITH (
    'catalog-type'='hive',
    'type'='iceberg',
    'uri'='thrift://localhost:9083',
    'auth.method'='simple',
    'simple.user.name'='hdfs'
);

通过 SQL 读取

-- 在当前 session 中以流的模式运行 Flink 任务
SET 'execution.runtime-mode' = 'streaming';

-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET 'table.dynamic-table-options.enabled' = 'true';

-- 读当前快照之后的增量数据
SELECT * FROM  iceberg_caltalog.test_db.test_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/;

-- 从指定的快照ID开始读取增量数据(不包含该快照ID的数据)
SELECT * FROM iceberg_caltalog.test_db.test_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

Hint Options

Key 默认值 类型 是否必填 描述
monitor-interval 10s Duration 监控新提交数据文件的时间间隔
start-snapshot-id (none) Long 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据

通过 SQL 写入

Iceberg 表支持通过 Flink SQL 写入数据

INSERT INTO

-- 开启通过 SQL Hint 设置表参数
SET 'table.dynamic-table-options.enabled'='true';

INSERT INTO  iceberg_caltalog.test_db.test_table VALUES (1, 'a');
INSERT INTO  iceberg_caltalog.test_db.test_table SELECT id, data from other_kafka_table;

INSERT OVERWRITE

INSERT OVERWRITE  `iceberg_caltalog`.`test_db`.`test_table`  VALUES (1, 'a');

对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据

Iceberg 同样支持覆盖--指定分区--下的数据

INSERT OVERWRITE  `iceberg_caltalog`.`test_db`.`test_table` PARTITION(data='a') SELECT 6;