Rocketmq Source
更新时间: 2021-08-26 20:42:29
阅读 613
Rocketmq Source
简述
Easystream 支持使用 Rocketmq 作为输入数据源。
示例
CREATE TABLE source (`key` varchar,`value` VARCHAR) WITH ('connector.type' = 'rocketmq','nameserver.address' = 'xxx:9876;xxx:9876','group' = 'rocketmqTest','topic' = 'rocketmqTest','brokerserver.heartbeat.interval' = '30000','tag' = '*','offset.persist.interval' = '5000','batch.size' = '256','delay.when.message.not.found'='10','offset.reset' = 'earlist'-- 'consumer.offset.from.timestamp'='1577253600000');
With 参数
| 参数 | 注释说明 | 备注 |
|---|---|---|
| connector.type | 数据源类型 | 必填,’rocketmq’,小写 |
| nameserver.address | rocketmq 服务地址 | 必填 |
| group | 消费者组 (consumerGroup) | 必填 |
| topic | 读取的 topic | 必填 |
| offset.reset | offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp | 必填 |
| offset.from.timestamp | 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 | 选填 |
| brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
| tag | 读取的tag | 选填,默认为’*’,全部tag |
| offset.persist.interval | 不开启 checkpoint 下,offset 提交的时间间隔 | 选填,默认 5000ms |
| batch.size | 每次拉取的最大数据条数 | 选填,默认256条 |
| delay.when.message.not.found | 当没有获取到数据时的等待时间 | 选填,默认 10ms |
文档反馈
以上内容对您是否有帮助?