Kafka Sink
更新时间: 2022-02-28 10:32:04
阅读 1036
Kafka Sink
简述
Easystream 支持输出到 Kafka,支持多个 Kafka 版本。
示例
CREATE TABLE user_age (
user_id BIGINT,
item_id VARCHAR,
age INT,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'xxx',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'connector.sink-partitioner' = 'round-robin',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 类型 | 必填:kafka |
connector.version | kafka 版本 | 必填:universal |
connector.topic | kafka 的 topic | 必填 |
connector.startup-mode | 启动方式 | 可选 earliest-offset、latest-offset、group-offsets 和 specific-offsets,当指定为specific-offsets 时,还需要配置如下:’connector.specific-offsets.0.partition’ = ‘0’, ‘connector.specific-offsets.0.offset’ = ‘42’, ‘connector.specific-offsets.1.partition’ = ‘1’, ‘connector.specific-offsets.1.offset’ = ‘300’ |
connector.properties.*.key | 配置kafka的property key | 必填 |
connector.properties.*.value | 配置kafka的property value | 必填: ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, 或者是 ‘connector.properties.0.key’ = ‘zookeeper.connect’, — optional: connector specific properties ‘connector.properties.0.value’ = ‘localhost:2181’,’connector.properties.1.key’ = ‘bootstrap.servers’, ‘connector.properties.1.value’ = ‘localhost:9092’, ‘connector.properties.2.key’ = ‘group.id’, ‘connector.properties.2.value’ = ‘testGroup’ |
update-mode | 更新模式 | 必填:目前只支持 append |
format.type | 输出数据格式 | 必填。e.g. ‘json’。 |
format.derive-schema | 数据schema是否来源于table schema | 选填。e.g. ‘true’。 |
format.json-schema | 使用JSON schema | 选填。derive-schema已设置的话,该项不用设置。 |
connector.sink-partitioner | 配置flink数据输出到kafka的partition | 选填。e.g. fixed、round-robin、custom。 |
connector.sink-partitioner-class | ‘org.mycompany.MyPartitioner’ sink partitioner为custom才生效 | 选填 |
Kafka 对应版本配置
version | Kafka 版本 |
---|---|
universal | 0.8.2.2 |
universal | 0.9.0.1 |
universal | 0.10.2.1 |
universal | 0.11.0.2 |
universal | 0.11+ |
文档反馈
以上内容对您是否有帮助?