Kafka Source
更新时间: 2021-08-26 20:42:29
阅读 1689
Kafka Source
简述
Easystream 支持使用 Kafka 作为输入数据源,表字段支持 json 消息格式。
示例
CREATE TABLE user_log (user_id BIGINT,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'xxx','connector.startup-mode' = 'earliest-offset','connector.properties.zookeeper.connect' = 'xxx','connector.properties.bootstrap.servers' = 'xxx','connector.properties.group.id' = 'xxx','update-mode' = 'append','format.type' = 'json','format.derive-schema' = 'true','format.fail-on-missing-field' = 'false');
Time、Date、Timestamp 类型
- For example: 2018-01-01 for date, 20:43:59Z for time, and 2018-01-01T20:43:59Z for timestamp.
With 参数
| 参数 | 注释说明 | 备注 |
|---|---|---|
| connector.type | 类型 | 必填:kafka |
| connector.version | kafka 版本 | 必填:universal |
| connector.topic | kafka 的 topic | 必填 |
| connector.startup-mode | 启动方式 | 可选 earliest-offset、latest-offset、group-offsets 和 specific-offsets,当指定为specific-offsets 时,还需要配置如下:’connector.specific-offsets.0.partition’ = ‘0’, ‘connector.specific-offsets.0.offset’ = ‘42’, ‘connector.specific-offsets.1.partition’ = ‘1’, ‘connector.specific-offsets.1.offset’ = ‘300’ |
| connector.properties.*.key | 配置kafka的property key | 必填 |
| connector.properties.*.value | 配置kafka的property value | 必填: ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, 或者是 ‘connector.properties.0.key’ = ‘zookeeper.connect’, — optional: connector specific properties ‘connector.properties.0.value’ = ‘localhost:2181’,’connector.properties.1.key’ = ‘bootstrap.servers’, ‘connector.properties.1.value’ = ‘localhost:9092’, ‘connector.properties.2.key’ = ‘group.id’, ‘connector.properties.2.value’ = ‘testGroup’ |
| flink.partition-discovery.interval-millis | 定时检查是否有新分区产生 | 可选;默认不启用,单位为毫秒。 |
Kafka 对应版本配置
| version | Kafka 版本 |
|---|---|
| universal | 0.8.2.2 |
| universal | 0.9.0.1 |
| universal | 0.10.2.1 |
| universal | 0.11.0.2 |
| universal | 0.11+ |
文档反馈
以上内容对您是否有帮助?