Doris 连接器

简述

通过 SQL 读取

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
)  WITH (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:8030',
      'table.identifier' = 'db.table',
      'username' = 'root',
      'password' = 'password'
);

通过 SQL 写入

注意:更新和删除数据需要基于 Unique Key 模型

-- enable checkpoint
SET 'execution.checkpointing.interval' = '60s';
CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
) WITH (
      'connector' = 'doris',
      'fenodes' = 'FE_IP:8030',
      'table.identifier' = 'db.table',
      'username' = 'root',
      'password' = 'password',
      'sink.label-prefix' = 'doris_label'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source;

通用配置项

Key Default Value Required Comment
fenodes -- Y Doris FE http 地址
table.identifier -- Y Doris 表名,如:db.tbl
username -- Y 访问 Doris 的用户名
password -- Y 访问 Doris 的密码
doris.request.retries 3 N 向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms 30000 N 向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms 30000 N 向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s 3600 N 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.size Integer.MAX_VALUE N 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size 1024 N 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。
doris.exec.mem.limit 2147483648 N 单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.async FALSE N 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size 64 N 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.field -- N 读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.query -- N 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。
sink.label-prefix -- Y Stream load导入使用的label前缀。2pc场景下要求全局唯一,用来保证Flink的EOS语义。
sink.properties.* -- N Stream Load 的导入参数。例如: JSON格式导入 'sink.properties.format' = 'json' ; 'sink.properties.read_json_by_line' = 'true'
sink.enable-delete TRUE N 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。
sink.enable-2pc TRUE N 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。