Doris SQL 连接器
更新时间: 2023-12-29 15:26:22
阅读 224
Doris 连接器
简述
- 支持 Doris 数据库的读写
- Flink Doris Connector 官方文档:https://doris.apache.org/zh-CN/docs/1.2/ecosystem/flink-doris-connector
- 插件 1.4.12+ 版本开始可用
- 需要使用 Flink 1.14 引擎,不支持 Flink 1.10/1.12
通过 SQL 读取
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
) WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password'
);
通过 SQL 写入
注意:更新和删除数据需要基于 Unique Key 模型
-- enable checkpoint
SET 'execution.checkpointing.interval' = '60s';
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
) WITH (
'connector' = 'doris',
'fenodes' = 'FE_IP:8030',
'table.identifier' = 'db.table',
'username' = 'root',
'password' = 'password',
'sink.label-prefix' = 'doris_label'
);
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source;
通用配置项
Key | Default Value | Required | Comment |
---|---|---|---|
fenodes |
-- | Y | Doris FE http 地址 |
table.identifier |
-- | Y | Doris 表名,如:db.tbl |
username |
-- | Y | 访问 Doris 的用户名 |
password |
-- | Y | 访问 Doris 的密码 |
doris.request.retries |
3 | N | 向 Doris 发送请求的重试次数 |
doris.request.connect.timeout.ms |
30000 | N | 向 Doris 发送请求的连接超时时间 |
doris.request.read.timeout.ms |
30000 | N | 向 Doris 发送请求的读取超时时间 |
doris.request.query.timeout.s |
3600 | N | 查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size |
Integer.MAX_VALUE |
N | 一个 Partition 对应的 Doris Tablet 个数。 此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。 |
doris.batch.size |
1024 | N | 一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。 从而减轻网络延迟所带来的额外时间开销。 |
doris.exec.mem.limit |
2147483648 | N | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async |
FALSE | N | 是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch |
doris.deserialize.queue.size |
64 | N | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
doris.read.field |
-- | N | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
doris.filter.query |
-- | N | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |
sink.label-prefix |
-- | Y | Stream load导入使用的label前缀。2pc场景下要求全局唯一,用来保证Flink的EOS语义。 |
sink.properties.* |
-- | N | Stream Load 的导入参数。例如: JSON格式导入 'sink.properties.format' = 'json' ; 'sink.properties.read_json_by_line' = 'true' |
sink.enable-delete |
TRUE | N | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
sink.enable-2pc |
TRUE | N | 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。 |
文档反馈
以上内容对您是否有帮助?