Kafka Source

简述

Easystream 支持使用 Kafka 作为输入数据源,表字段支持 json 消息格式。

示例


CREATE TABLE user_log (
  user_id BIGINT,
  item_id VARCHAR,
  category_id VARCHAR,
  behavior VARCHAR,
  ts TIMESTAMP
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'xxx',
  'connector.startup-mode' = 'earliest-offset',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'update-mode' = 'append',
  'format.type' = 'json',
  'format.derive-schema' = 'true',
  'format.fail-on-missing-field' = 'false'
);

With 参数

参数 注释说明 备注
connector.type 类型 必填:kafka
connector.version kafka 版本 必填:universal
connector.topic kafka 的 topic 必填
connector.startup-mode 启动方式 可选 earliest-offset、latest-offset、group-offsets 和 specific-offsets,当指定为specific-offsets 时,还需要配置如下:’connector.specific-offsets.0.partition’ = ‘0’, ‘connector.specific-offsets.0.offset’ = ‘42’, ‘connector.specific-offsets.1.partition’ = ‘1’, ‘connector.specific-offsets.1.offset’ = ‘300’
connector.properties.*.key 配置kafka的property key 必填
connector.properties.*.value 配置kafka的property value 必填: ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, 或者是 ‘connector.properties.0.key’ = ‘zookeeper.connect’, — optional: connector specific properties ‘connector.properties.0.value’ = ‘localhost:2181’,’connector.properties.1.key’ = ‘bootstrap.servers’, ‘connector.properties.1.value’ = ‘localhost:9092’, ‘connector.properties.2.key’ = ‘group.id’, ‘connector.properties.2.value’ = ‘testGroup’
flink.partition-discovery.interval-millis 定时检查是否有新分区产生 可选;默认不启用,单位为毫秒。

Kafka 对应版本配置

version Kafka 版本
universal 0.8.2.2
universal 0.9.0.1
universal 0.10.2.1
universal 0.11.0.2
universal 0.11+