Paimon 连接器

简述

  • 支持 Paimon 的读写(基于 0.8.2 版本),包括 DDL 和流表两种方式
  • Flink Paimon Connector 官方文档:https://paimon.apache.org/docs/0.8/flink/quick-start/
  • 插件 1.4.15+ 版本开始可用
  • 需要使用 Flink 1.18 引擎,不支持 Flink 1.10/1.12/1.14

DDL 方式

创建 Catalog

CREATE CATALOG my_catalog WITH (
    'type'='paimon',
    'metastore' = 'hive',
    'auth.method'='kerberos',
    'hdfs-site'='hdfs-site.xml',
    'core-site'='core-site.xml',
    'hive-site'='hive-site.xml',
    'krb.conf'='krb5.conf',
    'krb.keytab'='grp.mammut_test.keytab',
    'krb.principal'='bdms_grp.mammut_test/dev@BDMS.COM'
);
  • type: 固定为 paimon
  • metastore:hive,表示创建一个 Paimon Hive Catalog
  • auth.method:认证方式,支持 kerberos 和 simple 两种模式
  • hdfs-site:依赖的 hdfs-site.xml 文件名
  • core-site:依赖的 core-site.xml 文件名
  • hive-site:依赖的 hive-site.xml 文件名
  • krb.conf:krb.conf 文件名
  • krb.keytab:krb.keytab 文件名
  • krb.principal:指定的 principal 标识符

新建表

CREATE TABLE IF NOT EXISTS my_catalog.mammut_qa.word_count (
    word STRING,
    cnt BIGINT
);

写入数据

SET 'execution.checkpointing.interval' = '30 s';

CREATE TABLE word_table (
    word STRING,
    cnt BIGINT
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.word.length' = '1'
);
INSERT INTO my_catalog.mammut_qa.word_count SELECT word, cnt FROM word_table;

读取数据

SET 'execution.runtime-mode' = 'streaming';

CREATE TABLE print_table (
    word STRING,
    cnt BIGINT
) WITH (
    'connector' = 'print'
);
INSERT INTO print_table SELECT * FROM my_catalog.mammut_qa.word_count;

流表方式

  • 通过 数仓管理 -> 新建流表的方式创建一张 Paimon 类型的流表
SET 'execution.runtime-mode' = 'streaming';
SET 'table.dynamic-table-options.enabled' = 'true';
SET 'execution.checkpointing.interval' = '30 s';

CREATE TABLE word_table (
    word STRING,
    cnt BIGINT
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'fields.word.length' = '1'
);

CREATE TABLE print_table (
    word STRING,
    cnt BIGINT
) WITH (
    'connector' = 'print'
);

INSERT INTO AAAAAA.paimon_stream_sink_table SELECT word, cnt FROM word_table;
INSERT INTO print_table SELECT word, cnt FROM AAAAAA.paimon_stream_source_table;

通用配置项

请参照社区文档:https://paimon.apache.org/docs/0.8/maintenance/configurations/#flinkconnectoroptions