Rocketmq Source

简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例

CREATE TABLE source (
 `key` varchar,
 `value` VARCHAR
) WITH (
  'connector.type' = 'rocketmq',
  'nameserver.address' = 'xxx:9876;xxx:9876',
  'group' = 'rocketmqTest',
  'topic' = 'rocketmqTest',
  'brokerserver.heartbeat.interval' = '30000',
  'tag' = '*',
  'offset.persist.interval' = '5000',
  'batch.size' = '256',
  'delay.when.message.not.found'='10',
  'offset.reset' = 'earlist'
   -- 'consumer.offset.from.timestamp'='1577253600000'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 消费者组 (consumerGroup) 必填
topic 读取的 topic 必填
offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填
offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
tag 读取的tag 选填,默认为’*’,全部tag
offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms
batch.size 每次拉取的最大数据条数 选填,默认256条
delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms