Rocketmq

Easystream 支持使用 Rocketmq 作为输入/输出数据源。

Flink-1.10

Rocketmq Source

简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例

CREATE TABLE source (
 `key` varchar,
 `value` VARCHAR
) WITH (
  'connector.type' = 'rocketmq',
  'nameserver.address' = 'xxx:9876;xxx:9876',
  'group' = 'rocketmqTest',
  'topic' = 'rocketmqTest',
  'brokerserver.heartbeat.interval' = '30000',
  'tag' = '*',
  'offset.persist.interval' = '5000',
  'batch.size' = '256',
  'delay.when.message.not.found'='10',
  'offset.reset' = 'earlist'
   -- 'consumer.offset.from.timestamp'='1577253600000'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 消费者组 (consumerGroup) 必填
topic 读取的 topic 必填
offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填
offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
tag 读取的tag 选填,默认为’*’,全部tag
offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms
batch.size 每次拉取的最大数据条数 选填,默认256条
delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms

Rocketmq Sink

简述

Easystream 支持输出到 Rocketmq。

示例

create table sink(
  `key` varchar,
  `value` VARCHAR
) with(
   'connector.type' = 'rocketmq',
   'nameserver.address' = 'xxx:9876;xxx:9876',
   'group' = 'sloth-test-sink-0117-01',
   'topic' = 'sloth-test-0117-01',
   'brokerserver.heartbeat.interval' = '30000',
   'async' = 'true',
   'retry.times' = '3',
   'timeout' = '3000',
   'flash.on.checkpoint' = 'true',
   'msg.delay.level' = '0'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 写入的组 必填
topic 写入的 topic 必填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
async 是否开启异步写入 选填,默认为 false
retry.times async=true 时的重试次数 选填,默认3次
flash.on.checkpoint 是否每次打 checkpoint 时才刷新数据到目标端 选填,默认为 false
msg.delay.level 数据的发送级别 选填,默认为0

FLINK-1.12

RocketMq source

CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601',
  'group' = 'behavior_consume_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876'
);

RocketMq sink

CREATE TABLE rocketmq_sink (
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601-sink',
  'group' = 'behavior_produce_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876',
  'key.columns' = 'surname',
  'write.keys.to.body' = 'true'
);

属性

支持的metadata列

参数 注释说明 备注
topic topic名 只读列

with属性

参数 注释说明 备注
connector 数据源类型 必填,’rocketmq’,小写
name.server.address rocketmq 服务地址 必填
group 写入的组 必填
topic 写入的 topic 必填
key.columns 键列
write.keys.to.body 是否将键写到body