Kafka Sink

简述

Easystream 支持输出到 Kafka,支持多个 Kafka 版本。

示例

CREATE TABLE user_age (
  user_id BIGINT,
  item_id VARCHAR,
  age INT,
  behavior VARCHAR,
  ts TIMESTAMP
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'update-mode' = 'append',
  'connector.sink-partitioner' = 'round-robin',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);

With 参数

参数 注释说明 备注
connector.type 类型 必填:kafka
connector.version kafka 版本 必填:universal
connector.topic kafka 的 topic 必填
connector.startup-mode 启动方式 可选 earliest-offset、latest-offset、group-offsets 和 specific-offsets,当指定为specific-offsets 时,还需要配置如下:’connector.specific-offsets.0.partition’ = ‘0’, ‘connector.specific-offsets.0.offset’ = ‘42’, ‘connector.specific-offsets.1.partition’ = ‘1’, ‘connector.specific-offsets.1.offset’ = ‘300’
connector.properties.*.key 配置kafka的property key 必填
connector.properties.*.value 配置kafka的property value 必填: ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, 或者是 ‘connector.properties.0.key’ = ‘zookeeper.connect’, — optional: connector specific properties ‘connector.properties.0.value’ = ‘localhost:2181’,’connector.properties.1.key’ = ‘bootstrap.servers’, ‘connector.properties.1.value’ = ‘localhost:9092’, ‘connector.properties.2.key’ = ‘group.id’, ‘connector.properties.2.value’ = ‘testGroup’
update-mode 更新模式 必填:目前只支持 append
format.type 输出数据格式 必填。e.g. ‘json’。
format.derive-schema 数据schema是否来源于table schema 选填。e.g. ‘true’。
format.json-schema 使用JSON schema 选填。derive-schema已设置的话,该项不用设置。
connector.sink-partitioner 配置flink数据输出到kafka的partition 选填。e.g. fixed、round-robin、custom。
connector.sink-partitioner-class ‘org.mycompany.MyPartitioner’ sink partitioner为custom才生效 选填

Kafka 对应版本配置

version Kafka 版本
universal 0.8.2.2
universal 0.9.0.1
universal 0.10.2.1
universal 0.11.0.2
universal 0.11+