Kafka SQL 连接器
更新时间: 2023-12-29 15:27:05
阅读 2043
Kafka Connectors
Flink-1.12/FLINK-1.14
1.12简述
- kafka输入输出,支持upsert方式
1.12示例
- ddl
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
-- ts AS PROCTIME(),
ts BIGINT,
ts0 AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR ts0 AS ts0 - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'userlog-test',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
'format' = 'json'
);
CREATE TABLE usertest_sink (
id VARCHAR,
name varchar
) WITH (
'connector' = 'kafka',
'topic' = 'usertest_sink',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
'properties.group.id' = 'test',
'table.append-only' = 'true',
'sink.partitioner' = 'round-robin',
'sink.parallelism' = '1',
'format' = 'json'
);
insert into usertest_sink
select user_id id, '1' name from user_log group by user_id, hop(ts0, interval '10' SECOND, interval '10' SECOND)
;
如果kafka 集群开启了sasl 方式的认证 增加如下参数
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-HSA-256',
'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'
- 1.12元数据方式
set 'user_log.scan.startup.mode' = 'earliest-offset';
set 'user_log.properties.group.id' = 'test';
insert into ztes.user_test_sink
select user_id id, '1' name from ztes.user_log
;
1.12参数
配置名称 | 是否必填 | 配置生效类型 | 数据类型 | 默认值 | 参数说明 |
---|---|---|---|---|---|
connector | 必填 | 源表、目标表 | String | - | 必须为:kafka |
properties.bootstrap.servers | 必填 | 源表、目标表 | String | - | 逗号分隔的 Kafka broker 列表。 |
properties.group.id | 必填 | 源表 | String | - | Kafka source 的消费组 id |
topic | 目标表必填; 源表必须设置'topic'或't opic-pattern' | 源表、目标表 | String | - | 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。 |
topic-pattern | 可选 | 源表 | String | - | 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。 |
scan.startup.mode | 可选 | 源表 | String | group-offsets | Kafka consumer 的启动模式。有效值为:'earliest-offset','latest-offset','group-offsets','timestamp' 和 'specific-offsets' |
format | 可选 | 源表、目标表 | String | - | 用来序列化或反序列化 Kafka 消息的格式。注意:该配置项和 'value.format' 二者必需其一。 |
key.format | - | 源表、目标表 | String | - | 用来序列化和反序列化 Kafka 消息键(Key)的格式。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。 |
key.fields | 可选 | 源表、目标表 | String | - | 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。 |
key.fields-prefix | 可选 | 源表、目标表 | String | - | 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。 |
value.format | 可选 | 源表、目标表 | String | - | 序列化和反序列化 Kafka 消息体时使用的格式。注意:该配置项和 'format' 二者必需其一。 |
value.fields-include | 可选 | 源表、目标表 | 枚举类型 | ALL | 定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。可选值: ALL , EXCEPT_KEY |
scan.startup.specific-offsets | 可选 | 源表 | String | - | 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。 |
scan.startup.timestamp-millis | 可选 | 源表 | Long | - | 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。 |
scan.topic-partition-discovery.interval | 可选 | 源表 | Duration | - | Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。 |
sink.partitioner | 可选 | 目标表 | String | default | Flink partition 到 Kafka partition 的分区映射关系,可选值有: default :使用 Kafka 默认的分区器对消息进行分区。 fixed :每个 Flink partition 最终对应最多一个 Kafka partition。 round-robin :Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。 自定义 FlinkKafkaPartitioner的子类 :例如 'org.mycompany.MyPartitioner'。 |
sink.parallelism | 可选 | 目标表 | Integer | - | 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
sink.semantic | 可选 | 目标表 | String | - | 定义kafka的sink语义。有效的枚举值为: at-least-once , exactly-once 和 none |
table.append-only | 可选 | 目标表 | Boolean | TRUE | 是否为追加模式 |
parallelism | 可选 | 源表 | Integer | - | kafka source算子的并行度 |
Flink-1.10
简述
- kafka输入输出,表字段支持json消息格式
示例:
- flink kafka connector官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
Time、Date、Timestamp类型
- For example: 2018-01-01 for date, 20:43:59Z for time, and 2018-01-01T20:43:59Z for timestamp.
source
CREATE TABLE user_log (
user_id BIGINT,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'xxx',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true',
'format.fail-on-missing-field' = 'false'
);
- sink
CREATE TABLE user_age ( user_id BIGINT, item_id VARCHAR, age INT, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'xxx', 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.group.id' = 'xxx', 'update-mode' = 'append', 'connector.sink-partitioner' = 'round-robin', 'format.type' = 'json', 'format.derive-schema' = 'true' );
with参数
source参数
- connector.type
- 'kafka' 必选。
- connector.version
- 某kafka版本;必选。universal。版本对应关系见Kafka版本对应关系。
- connector.topic
- kafka的topic;必选
- connector.startup-mode
- 启动方式,可选。earliest-offset、latest-offset、group-offsets和specific-offsets,当指定为specific-offsets时,还需要配置如下:
'connector.specific-offsets.0.partition' = '0', 'connector.specific-offsets.0.offset' = '42', 'connector.specific-offsets.1.partition' = '1', 'connector.specific-offsets.1.offset' = '300'
- 启动方式,可选。earliest-offset、latest-offset、group-offsets和specific-offsets,当指定为specific-offsets时,还需要配置如下:
- connector.properties.*.key
- 配置kafka的property key,必选。
- connector.properties.*.value
- 配置kafka的property value,必选。
'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.group.id' = 'xxx', 或者是 'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'testGroup',
- 配置kafka的property value,必选。
- flink.partition-discovery.interval-millis
- 定时检查是否有新分区产生;可选;默认不启用,单位为毫秒。
sink 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 类型 | 必填:kafka |
connector.version | kafka 版本 | 必填:universal |
connector.topic | kafka 的 topic | 必填 |
connector.properties.*.key | 配置kafka的property key | 必填 |
connector.properties.*.value | 配置kafka的property value | 必填: 'connector.properties.zookeeper.connect' = 'xxx', 'connector.properties.bootstrap.servers' = 'xxx', 'connector.properties.group.id' = 'xxx', 或者是 'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties 'connector.properties.0.value' = 'localhost:2181','connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'connector.properties.2.key' = 'group.id', 'connector.properties.2.value' = 'testGroup' |
update-mode | 更新模式 | 必填:目前只支持 append |
format.type | 输出数据格式 | 必填。e.g. 'json'。 |
format.derive-schema | 数据schema是否来源于table schema | 选填。e.g. 'true'。 |
format.json-schema | 使用JSON schema | 选填。derive-schema已设置的话,该项不用设置。 |
connector.sink-partitioner | 配置flink数据输出到kafka的partition | 选填。e.g. fixed、round-robin、custom。 |
connector.sink-partitioner-class | 'org.mycompany.MyPartitioner' sink partitioner为custom才生效 | 选填 |
kafka版本对应配置
version | Kafka版本 |
---|---|
universal | 0.8.2.2 |
universal | 0.9.0.1 |
universal | 0.10.2.1 |
universal | 0.11.0.2 |
universal | 0.11+ |
文档反馈
以上内容对您是否有帮助?