Rocketmq Source
更新时间: 2021-08-26 20:42:29
阅读 452
Rocketmq Source
简述
Easystream 支持使用 Rocketmq 作为输入数据源。
示例
CREATE TABLE source (
`key` varchar,
`value` VARCHAR
) WITH (
'connector.type' = 'rocketmq',
'nameserver.address' = 'xxx:9876;xxx:9876',
'group' = 'rocketmqTest',
'topic' = 'rocketmqTest',
'brokerserver.heartbeat.interval' = '30000',
'tag' = '*',
'offset.persist.interval' = '5000',
'batch.size' = '256',
'delay.when.message.not.found'='10',
'offset.reset' = 'earlist'
-- 'consumer.offset.from.timestamp'='1577253600000'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 数据源类型 | 必填,’rocketmq’,小写 |
nameserver.address | rocketmq 服务地址 | 必填 |
group | 消费者组 (consumerGroup) | 必填 |
topic | 读取的 topic | 必填 |
offset.reset | offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp | 必填 |
offset.from.timestamp | 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 | 选填 |
brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
tag | 读取的tag | 选填,默认为’*’,全部tag |
offset.persist.interval | 不开启 checkpoint 下,offset 提交的时间间隔 | 选填,默认 5000ms |
batch.size | 每次拉取的最大数据条数 | 选填,默认256条 |
delay.when.message.not.found | 当没有获取到数据时的等待时间 | 选填,默认 10ms |
文档反馈
以上内容对您是否有帮助?