Iceberg SQL 连接器
更新时间: 2024-01-18 15:53:26
阅读 419
Iceberg 连接器
简述
支持 Iceberg 实时数据湖的读写
Iceberg 官方文档:https://iceberg.apache.org/docs/1.0.0/flink/
目前只支持通过 DDL 方式进行使用,暂不支持通过元数据中心进行使用的方式
创建 Catalog
--目前只支持 Hive Catalog--
需要 kerberos 验证的 catalog 定义
CREATE CATALOG iceberg_caltalog WITH (
'catalog-type'='hive',
'type'='iceberg',
'uri'='thrift://localhost:9083',
'auth.method'='kerberos',
'hdfs-site'='hdfs-site-hms.xml',
'core-site'='core-site-hms.xml',
'hive-site'='hive-site-hms.xml',
'krb.conf'='krb5-hms.conf',
'krb.keytab'='krb-hms.keytab',
'krb.principal'='your-principal'
);
- catalog-type:catalog 类型,目前只支持 hive
- type: 固定为 iceberg
- uri:Hive metastore 的 thrift URI
- auth.method:认证方式,支持 kerberos 和 simple 两种模式
- hdfs-site:依赖的 hdfs-site.xml 文件名,kerberos 认证模式下必填
- core-site:依赖的 core-site.xml 文件名,kerberos 认证模式下必填
- hive-site:依赖的 hive-site.xml 文件名,kerberos 认证模式下必填
- krb.conf:krb.conf 文件名,kerberos 认证模式下必填
- krb.keytab:krb.keytab 文件名,kerberos 认证模式下必填
- krb.principal:指定的 principal 标识符,kerberos 认证模式下必填
- simple.user.name:指定的Hadoop UserName,simple 模式下必填
通过 simple 方式访问的 Catalog 定义
CREATE CATALOG iceberg_caltalog WITH (
'catalog-type'='hive',
'type'='iceberg',
'uri'='thrift://localhost:9083',
'auth.method'='simple',
'simple.user.name'='hdfs'
);
通过 SQL 读取
-- 在当前 session 中以流的模式运行 Flink 任务
SET 'execution.runtime-mode' = 'streaming';
-- 打开动态表参数配置开关,让 Flink SQL 中配置的 hint options 生效
SET 'table.dynamic-table-options.enabled' = 'true';
-- 读当前快照之后的增量数据
SELECT * FROM iceberg_caltalog.test_db.test_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/;
-- 从指定的快照ID开始读取增量数据(不包含该快照ID的数据)
SELECT * FROM iceberg_caltalog.test_db.test_table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
Hint Options
Key | 默认值 | 类型 | 是否必填 | 描述 |
---|---|---|---|---|
monitor-interval | 10s | Duration | 否 | 监控新提交数据文件的时间间隔 |
start-snapshot-id | (none) | Long | 否 | 从指定的 snapshot 开始读取增量数据(不包括 start-snapshot-id 的快照数据),不指定则读当前快照之后(不包含当前)的增量数据 |
通过 SQL 写入
Iceberg 表支持通过 Flink SQL 写入数据
INSERT INTO
-- 开启通过 SQL Hint 设置表参数
SET 'table.dynamic-table-options.enabled'='true';
INSERT INTO iceberg_caltalog.test_db.test_table VALUES (1, 'a');
INSERT INTO iceberg_caltalog.test_db.test_table SELECT id, data from other_kafka_table;
INSERT OVERWRITE
INSERT OVERWRITE `iceberg_caltalog`.`test_db`.`test_table` VALUES (1, 'a');
对于无分区的表,INSERT OVERWRITE 将覆盖表里的全量数据
Iceberg 同样支持覆盖--指定分区--下的数据
INSERT OVERWRITE `iceberg_caltalog`.`test_db`.`test_table` PARTITION(data='a') SELECT 6;
文档反馈
以上内容对您是否有帮助?