Rocketmq Source

简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例

  1. CREATE TABLE source (
  2. `key` varchar,
  3. `value` VARCHAR
  4. ) WITH (
  5. 'connector.type' = 'rocketmq',
  6. 'nameserver.address' = 'xxx:9876;xxx:9876',
  7. 'group' = 'rocketmqTest',
  8. 'topic' = 'rocketmqTest',
  9. 'brokerserver.heartbeat.interval' = '30000',
  10. 'tag' = '*',
  11. 'offset.persist.interval' = '5000',
  12. 'batch.size' = '256',
  13. 'delay.when.message.not.found'='10',
  14. 'offset.reset' = 'earlist'
  15. -- 'consumer.offset.from.timestamp'='1577253600000'
  16. );

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 消费者组 (consumerGroup) 必填
topic 读取的 topic 必填
offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填
offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
tag 读取的tag 选填,默认为’*’,全部tag
offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms
batch.size 每次拉取的最大数据条数 选填,默认256条
delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms