简述

支持starrocks/doris的写(使用stream_load优化性能)。

starrocks/doris的读、维表Join、写均可使用jdbc connector

示例

ddl方式

-- source
CREATE TABLE kafka_source (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
      test_double DOUBLE
) WITH (
  'connector' = 'memory',
  'table.name' = 'kafka_source'
);

-- sink
CREATE TABLE sink (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
      test_double DOUBLE
) WITH (
      'connector' = 'starrocks', -- required: specify this table type is jdbc
      'jdbc-url' = 'jdbc:mysql://hadoop336.photo.163.org:9030', -- required: JDBC DB url
      'load-url'='hadoop336.photo.163.org:8030',
      'table-name' = 'allType',  -- required: jdbc table name
      'database-name'='data_transform',
      'username' = 'root', -- optional: jdbc user name and password
      'password' = '123456'
);

INSERT INTO sink select * FROM kafka_source;

元数据中心方式

CREATE TABLE kafka_source (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
--  testDecimal DECIMAL(38, 18),
      test_double DOUBLE
) WITH (
      'connector' = 'memory',
      'table.name' = 'kafka_source'
);

INSERT INTO bdms.data_transform.allType select * FROM kafka_source;

参数配置

Option Required Default Type Description
connector YES NONE String starrocks
jdbc-url YES NONE String this will be used to execute queries in starrocks.
load-url YES NONE String fe_ip:http_port;fe_ip:http_port; 分隔, which would be used to do the batch sinking.
database-name YES NONE String starrocks database name
table-name YES NONE String starrocks table name
username YES NONE String starrocks connecting username
password YES NONE String starrocks connecting password
sink.semantic NO at-least-once String at-least-once or exactly-once(flush at checkpoint only 和配置 sink.buffer-flush.* 将失效).
sink.buffer-flush.max-bytes NO 90M String 最大batch数据大小, range: [64MB, 10GB]. e.g. 100m/mb, k/kb, 5g/gb
sink.buffer-flush.max-rows NO 500000 Long 最大batch行数, range: [64,000, 5000,000].
sink.buffer-flush.interval-ms NO 300000 Long flush时间间隔,单位ms, range: [1000ms, 3600000ms].
sink.max-retries NO 3 Integer stream load 请求最大重试次数, range: [0, 1000].
sink.parallelism NO NULL Integer sink并行度。默认全局并行度
sink.connect.timeout-ms NO 1000 Integer 连接 load-url的超时时间,单位ms, range: [100, 60000].
sink.properties.* NO NONE String stream load 参数,如 'sink.properties.columns' = 'k1, v1'. 参考StarRocks/Doris
  • 注意
    1. Flushat-least-once 的触发时机: cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
    2. sink.buffer-flush.{max-rows|max-bytes|interval-ms}exactly-once 时失效.

      Metrics

Name Type Description
totalFlushBytes counter 成功flush的 bytes.
totalFlushRows counter 成功flush的行数.
totalFlushSucceededTimes counter data-batch成功flush的次数.
totalFlushFailedTimes counter flush失败次数.

Type mappings

注意:当doris 0.14表中存在tinyint(1),读数据会报错

Sink

Flink type StarRocks type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL DECIMAL
BINARY INT
CHAR STRING
VARCHAR STRING
STRING STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N) DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) DATETIME
ARRAY<T> ARRAY<T>
MAP<KT,VT> JSON STRING
ROW<arg T, …> JSON STRING

简述

支持starrocks/doris的读、维表(使用thrift rpc读数据)、写(使用stream_load优化性能)。

示例

ddl方式

-- source
CREATE TABLE kafka_source (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
      test_double DOUBLE
) WITH (
  'connector' = 'memory',
  'table.name' = 'kafka_source'
);

-- sink
CREATE TABLE sink (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
      test_double DOUBLE
) WITH (
      'connector' = 'starrocks', -- required: specify this table type is jdbc
      'jdbc-url' = 'jdbc:mysql://hadoop336.163.org:9030', -- required: JDBC DB url
      'load-url'='hadoop336.163.org:8030',
      'table-name' = 'allType',  -- required: jdbc table name
      'database-name'='data_transform',
      'username' = 'root', -- optional: jdbc user name and password
      'password' = '123456'
);

INSERT INTO sink select * FROM kafka_source;
// source
CREATE TABLE flink_test (
    date_1 DATE,
    datetime_1 TIMESTAMP(6),
    char_1 CHAR(20),
    varchar_1 VARCHAR,
    boolean_1 BOOLEAN,
    tinyint_1 TINYINT,
    smallint_1 SMALLINT,
    int_1 INT,
    bigint_1 BIGINT,
    largeint_1 STRING,
    float_1 FLOAT,
    double_1 DOUBLE,
    decimal_1 DECIMAL(27,9)
) WITH (
   'connector'='starrocks',
   'load-url'='fe_ip1:8030,fe_ip2:8030,fe_ip3:8030',
   'jdbc-url'='jdbc:mysql://fe_ip:9030',
   'username'='root',
   'password'='',
   'database-name'='flink_test',
   'table-name'='flink_test'
);

select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126

元数据中心方式

CREATE TABLE kafka_source (
      test_int INT,
      test_varchar VARCHAR,
      test_boolean BOOLEAN,
      test_tinyint TINYINT,
      test_smallint SMALLINT,
      test_bigint BIGINT,
      test_float FLOAT,
--  testDecimal DECIMAL(38, 18),
      test_double DOUBLE
) WITH (
      'connector' = 'memory',
      'table.name' = 'kafka_source'
);

INSERT INTO bdms.data_transform.allType select * FROM kafka_source;

参数配置

source

Option Required Default Type Description
connector YES NONE String starrocks
load-url YES NONE String Hosts of the fe nodes like: fe_ip1:http_port,fe_ip2:http_port....
jdbc-url YES NONE String Hosts of the fe nodes like: fe_ip1:query_port,fe_ip2: query_port....
username YES NONE String StarRocks user name.
password YES NONE String StarRocks user password.
database-name YES NONE String Database name
table-name YES NONE String Table name
scan.connect.timeout-ms NO 1000 LONG Connect timeout
scan.params.keep-alive-min NO 10 INT Max keep alive time min
scan.params.query-timeout-s NO 600(5min) LONG Query timeout for a single query(The value of this parameter needs to be longer than the estimated period of the source)
scan.params.mem-limit-byte NO 102410241024(1G) LONG Memory limit for a single query
scan.max-retries NO 1 INT Max request retry times.

sink

Option Required Default Type Description
connector YES NONE String starrocks
jdbc-url YES NONE String this will be used to execute queries in starrocks.
load-url YES NONE String fe_ip:http_port;fe_ip:http_port; 分隔, which would be used to do the batch sinking.
database-name YES NONE String starrocks database name
table-name YES NONE String starrocks table name
username YES NONE String starrocks connecting username
password YES NONE String starrocks connecting password
sink.semantic NO at-least-once String at-least-once or exactly-once(flush at checkpoint only 和配置 sink.buffer-flush.* 将失效).
sink.buffer-flush.max-bytes NO 90M String 最大batch数据大小, range: [64MB, 10GB]. e.g. 100m/mb, k/kb, 5g/gb
sink.buffer-flush.max-rows NO 500000 Long 最大batch行数, range: [64,000, 5000,000].
sink.buffer-flush.interval-ms NO 300000 Long flush时间间隔,单位ms, range: [1000ms, 3600000ms].
sink.max-retries NO 3 Integer stream load 请求最大重试次数, range: [0, 1000].
sink.parallelism NO NULL Integer sink并行度。默认全局并行度
sink.connect.timeout-ms NO 1000 Integer 连接 load-url的超时时间,单位ms, range: [100, 60000].
sink.properties.* NO NONE String stream load 参数,如 'sink.properties.columns' = 'k1, v1'. 参考StarRocks/Doris
  • 注意
    1. Flushat-least-once 的触发时机: cachedRows >= ${sink.buffer-flush.max-rows} || cachedBytes >= ${sink.buffer-flush.max-bytes} || idleTime >= ${sink.buffer-flush.interval-ms}
    2. sink.buffer-flush.{max-rows|max-bytes|interval-ms}exactly-once 时失效.

      Metrics

Name Type Description
totalFlushBytes counter 成功flush的 bytes.
totalFlushRows counter 成功flush的行数.
totalFlushSucceededTimes counter data-batch成功flush的次数.
totalFlushFailedTimes counter flush失败次数.
totalScannedRows counter 成功收集的数据量

Type mappings

注意

  1. 当doris 0.14表中存在tinyint(1)、boolean时,读数据会报错。详见:doris bug。此时可使用jdbc connector读。
  2. 当任务失败时,exactly-once 语义无法保证.
  3. 只支持不带聚合的SQL,如 select {*|columns|count(1)} from {table-name} where ....

    source

StarRocks Flink
NULL NULL
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
LARGEINT STRING
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
DATETIME TIMESTAMP
DECIMAL DECIMAL
DECIMALV2 DECIMAL
DECIMAL32 DECIMAL
DECIMAL64 DECIMAL
DECIMAL128 DECIMAL
CHAR CHAR
VARCHAR STRING

Sink

Flink type StarRocks type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
INTEGER INTEGER
BIGINT BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DECIMAL DECIMAL
BINARY INT
CHAR STRING
VARCHAR STRING
STRING STRING
DATE DATE
TIMESTAMP_WITHOUT_TIME_ZONE(N) DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) DATETIME
ARRAY<T> ARRAY<T>
MAP<KT,VT> JSON STRING
ROW<arg T, …> JSON STRING