Flink-1.12
简述
支持starrocks/doris的写(使用stream_load优化性能)。
starrocks/doris的读、维表Join、写均可使用jdbc connector
示例
ddl方式
-- source
CREATE TABLE kafka_source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_double DOUBLE
) WITH (
'connector' = 'memory',
'table.name' = 'kafka_source'
);
-- sink
CREATE TABLE sink (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_double DOUBLE
) WITH (
'connector' = 'starrocks', -- required: specify this table type is jdbc
'jdbc-url' = 'jdbc:mysql://hadoop336.photo.163.org:9030', -- required: JDBC DB url
'load-url'='hadoop336.photo.163.org:8030',
'table-name' = 'allType', -- required: jdbc table name
'database-name'='data_transform',
'username' = 'root', -- optional: jdbc user name and password
'password' = '123456'
);
INSERT INTO sink select * FROM kafka_source;
元数据中心方式
CREATE TABLE kafka_source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
-- testDecimal DECIMAL(38, 18),
test_double DOUBLE
) WITH (
'connector' = 'memory',
'table.name' = 'kafka_source'
);
INSERT INTO bdms.data_transform.allType select * FROM kafka_source;
参数配置
Option |
Required |
Default |
Type |
Description |
connector |
YES |
NONE |
String |
starrocks |
jdbc-url |
YES |
NONE |
String |
this will be used to execute queries in starrocks. |
load-url |
YES |
NONE |
String |
fe_ip:http_port;fe_ip:http_port 用 ; 分隔, which would be used to do the batch sinking. |
database-name |
YES |
NONE |
String |
starrocks database name |
table-name |
YES |
NONE |
String |
starrocks table name |
username |
YES |
NONE |
String |
starrocks connecting username |
password |
YES |
NONE |
String |
starrocks connecting password |
sink.semantic |
NO |
at-least-once |
String |
at-least-once or exactly-once (flush at checkpoint only 和配置 sink.buffer-flush.* 将失效). |
sink.buffer-flush.max-bytes |
NO |
90M |
String |
最大batch数据大小, range: [64MB, 10GB] . e.g. 100m/mb, k/kb, 5g/gb |
sink.buffer-flush.max-rows |
NO |
500000 |
Long |
最大batch行数, range: [64,000, 5000,000] . |
sink.buffer-flush.interval-ms |
NO |
300000 |
Long |
flush时间间隔,单位ms, range: [1000ms, 3600000ms] . |
sink.max-retries |
NO |
3 |
Integer |
stream load 请求最大重试次数, range: [0, 1000] . |
sink.parallelism |
NO |
NULL |
Integer |
sink并行度。默认全局并行度 |
sink.connect.timeout-ms |
NO |
1000 |
Integer |
连接 load-url 的超时时间,单位ms, range: [100, 60000] . |
sink.properties.* |
NO |
NONE |
String |
stream load 参数,如 'sink.properties.columns' = 'k1, v1' . 参考StarRocks/Doris |
- 注意
Flush
在 at-least-once
的触发时机: cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
sink.buffer-flush.{max-rows|max-bytes|interval-ms}
在 exactly-once
时失效.Metrics
Name |
Type |
Description |
totalFlushBytes |
counter |
成功flush的 bytes. |
totalFlushRows |
counter |
成功flush的行数. |
totalFlushSucceededTimes |
counter |
data-batch成功flush的次数. |
totalFlushFailedTimes |
counter |
flush失败次数. |
Type mappings
注意:当doris 0.14表中存在tinyint(1),读数据会报错
Sink
Flink type |
StarRocks type |
BOOLEAN |
BOOLEAN |
TINYINT |
TINYINT |
SMALLINT |
SMALLINT |
INTEGER |
INTEGER |
BIGINT |
BIGINT |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DECIMAL |
DECIMAL |
BINARY |
INT |
CHAR |
STRING |
VARCHAR |
STRING |
STRING |
STRING |
DATE |
DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) |
DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) |
DATETIME |
ARRAY<T> |
ARRAY<T> |
MAP<KT,VT> |
JSON STRING |
ROW<arg T, ...> |
JSON STRING |
Flink-1.14
简述
支持starrocks/doris的读、维表(使用thrift rpc读数据)、写(使用stream_load优化性能)。
示例
ddl方式
-- source
CREATE TABLE kafka_source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_double DOUBLE
) WITH (
'connector' = 'memory',
'table.name' = 'kafka_source'
);
-- sink
CREATE TABLE sink (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_double DOUBLE
) WITH (
'connector' = 'starrocks', -- required: specify this table type is jdbc
'jdbc-url' = 'jdbc:mysql://hadoop336.163.org:9030', -- required: JDBC DB url
'load-url'='hadoop336.163.org:8030',
'table-name' = 'allType', -- required: jdbc table name
'database-name'='data_transform',
'username' = 'root', -- optional: jdbc user name and password
'password' = '123456'
);
INSERT INTO sink select * FROM kafka_source;
// source
CREATE TABLE flink_test (
date_1 DATE,
datetime_1 TIMESTAMP(6),
char_1 CHAR(20),
varchar_1 VARCHAR,
boolean_1 BOOLEAN,
tinyint_1 TINYINT,
smallint_1 SMALLINT,
int_1 INT,
bigint_1 BIGINT,
largeint_1 STRING,
float_1 FLOAT,
double_1 DOUBLE,
decimal_1 DECIMAL(27,9)
) WITH (
'connector'='starrocks',
'load-url'='fe_ip1:8030,fe_ip2:8030,fe_ip3:8030',
'jdbc-url'='jdbc:mysql://fe_ip:9030',
'username'='root',
'password'='',
'database-name'='flink_test',
'table-name'='flink_test'
);
select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126
元数据中心方式
CREATE TABLE kafka_source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
-- testDecimal DECIMAL(38, 18),
test_double DOUBLE
) WITH (
'connector' = 'memory',
'table.name' = 'kafka_source'
);
INSERT INTO bdms.data_transform.allType select * FROM kafka_source;
参数配置
source
Option |
Required |
Default |
Type |
Description |
connector |
YES |
NONE |
String |
starrocks |
load-url |
YES |
NONE |
String |
Hosts of the fe nodes like: fe_ip1:http_port,fe_ip2:http_port... . |
jdbc-url |
YES |
NONE |
String |
Hosts of the fe nodes like: fe_ip1:query_port,fe_ip2: query_port... . |
username |
YES |
NONE |
String |
StarRocks user name. |
password |
YES |
NONE |
String |
StarRocks user password. |
database-name |
YES |
NONE |
String |
Database name |
table-name |
YES |
NONE |
String |
Table name |
scan.connect.timeout-ms |
NO |
1000 |
LONG |
Connect timeout |
scan.params.keep-alive-min |
NO |
10 |
INT |
Max keep alive time min |
scan.params.query-timeout-s |
NO |
600(5min) |
LONG |
Query timeout for a single query(The value of this parameter needs to be longer than the estimated period of the source) |
scan.params.mem-limit-byte |
NO |
102410241024(1G) |
LONG |
Memory limit for a single query |
scan.max-retries |
NO |
1 |
INT |
Max request retry times. |
sink
Option |
Required |
Default |
Type |
Description |
connector |
YES |
NONE |
String |
starrocks |
jdbc-url |
YES |
NONE |
String |
this will be used to execute queries in starrocks. |
load-url |
YES |
NONE |
String |
fe_ip:http_port;fe_ip:http_port 用 ; 分隔, which would be used to do the batch sinking. |
database-name |
YES |
NONE |
String |
starrocks database name |
table-name |
YES |
NONE |
String |
starrocks table name |
username |
YES |
NONE |
String |
starrocks connecting username |
password |
YES |
NONE |
String |
starrocks connecting password |
sink.semantic |
NO |
at-least-once |
String |
at-least-once or exactly-once (flush at checkpoint only 和配置 sink.buffer-flush.* 将失效). |
sink.buffer-flush.max-bytes |
NO |
90M |
String |
最大batch数据大小, range: [64MB, 10GB] . e.g. 100m/mb, k/kb, 5g/gb |
sink.buffer-flush.max-rows |
NO |
500000 |
Long |
最大batch行数, range: [64,000, 5000,000] . |
sink.buffer-flush.interval-ms |
NO |
300000 |
Long |
flush时间间隔,单位ms, range: [1000ms, 3600000ms] . |
sink.max-retries |
NO |
3 |
Integer |
stream load 请求最大重试次数, range: [0, 1000] . |
sink.parallelism |
NO |
NULL |
Integer |
sink并行度。默认全局并行度 |
sink.connect.timeout-ms |
NO |
1000 |
Integer |
连接 load-url 的超时时间,单位ms, range: [100, 60000] . |
sink.ignore.update-before |
NO |
True |
Boolean |
是否忽略UpdateBefore数据,当存在主键变更的数据时,为了保证数据的一致性,需要将此配置置为False,以保证UpdateBefore中对应的主键数据能在目标端被正确删除 |
sink.properties.* |
NO |
NONE |
String |
stream load 参数,如 'sink.properties.columns' = 'k1, v1' . 参考StarRocks/Doris |
- 注意
Flush
在 at-least-once
的触发时机: cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
sink.buffer-flush.{max-rows|max-bytes|interval-ms}
在 exactly-once
时失效.Metrics
Name |
Type |
Description |
totalFlushBytes |
counter |
成功flush的 bytes. |
totalFlushRows |
counter |
成功flush的行数. |
totalFlushSucceededTimes |
counter |
data-batch成功flush的次数. |
totalFlushFailedTimes |
counter |
flush失败次数. |
totalScannedRows |
counter |
成功收集的数据量 |
Type mappings
注意:
- 当doris 0.14表中存在tinyint(1)、boolean时,读数据会报错。详见:doris bug。此时可使用jdbc connector读。
- 当任务失败时,
exactly-once
语义无法保证. - 只支持不带聚合的SQL,如
select {*|columns|count(1)} from {table-name} where ...
.source
StarRocks |
Flink |
NULL |
NULL |
BOOLEAN |
BOOLEAN |
TINYINT |
TINYINT |
SMALLINT |
SMALLINT |
INT |
INT |
BIGINT |
BIGINT |
LARGEINT |
STRING |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DATE |
DATE |
DATETIME |
TIMESTAMP |
DECIMAL |
DECIMAL |
DECIMALV2 |
DECIMAL |
DECIMAL32 |
DECIMAL |
DECIMAL64 |
DECIMAL |
DECIMAL128 |
DECIMAL |
CHAR |
CHAR |
VARCHAR |
STRING |
Sink
Flink type |
StarRocks type |
BOOLEAN |
BOOLEAN |
TINYINT |
TINYINT |
SMALLINT |
SMALLINT |
INTEGER |
INTEGER |
BIGINT |
BIGINT |
FLOAT |
FLOAT |
DOUBLE |
DOUBLE |
DECIMAL |
DECIMAL |
BINARY |
INT |
CHAR |
STRING |
VARCHAR |
STRING |
STRING |
STRING |
DATE |
DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) |
DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) |
DATETIME |
ARRAY<T> |
ARRAY<T> |
MAP<KT,VT> |
JSON STRING |
ROW<arg T, ...> |
JSON STRING |
以上内容对您是否有帮助?