Rocketmq

Easystream 支持使用 Rocketmq 作为输入/输出数据源。

Flink-1.10

Rocketmq Source

简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例

CREATE TABLE source (
 `key` varchar,
 `value` VARCHAR
) WITH (
  'connector.type' = 'rocketmq',
  'nameserver.address' = 'xxx:9876;xxx:9876',
  'group' = 'rocketmqTest',
  'topic' = 'rocketmqTest',
  'brokerserver.heartbeat.interval' = '30000',
  'tag' = '*',
  'offset.persist.interval' = '5000',
  'batch.size' = '256',
  'delay.when.message.not.found'='10',
  'offset.reset' = 'earlist'
   -- 'consumer.offset.from.timestamp'='1577253600000'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 消费者组 (consumerGroup) 必填
topic 读取的 topic 必填
offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填
offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
tag 读取的tag 选填,默认为’*’,全部tag
offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms
batch.size 每次拉取的最大数据条数 选填,默认256条
delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms

Rocketmq Sink

简述

Easystream 支持输出到 Rocketmq。

示例

create table sink(
  `key` varchar,
  `value` VARCHAR
) with(
   'connector.type' = 'rocketmq',
   'nameserver.address' = 'xxx:9876;xxx:9876',
   'group' = 'sloth-test-sink-0117-01',
   'topic' = 'sloth-test-0117-01',
   'brokerserver.heartbeat.interval' = '30000',
   'async' = 'true',
   'retry.times' = '3',
   'timeout' = '3000',
   'flash.on.checkpoint' = 'true',
   'msg.delay.level' = '0'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,’rocketmq’,小写
nameserver.address rocketmq 服务地址 必填
group 写入的组 必填
topic 写入的 topic 必填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
async 是否开启异步写入 选填,默认为 false
retry.times async=true 时的重试次数 选填,默认3次
flash.on.checkpoint 是否每次打 checkpoint 时才刷新数据到目标端 选填,默认为 false
msg.delay.level 数据的发送级别 选填,默认为0

FLINK-1.12

RocketMq source

CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601',
  'group' = 'behavior_consume_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876'
);

RocketMq sink

CREATE TABLE rocketmq_sink (
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601-sink',
  'group' = 'behavior_produce_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876',
  'key.columns' = 'surname',
  'write.keys.to.body' = 'true'
);

属性

支持的metadata列

参数 注释说明 备注
topic topic名 只读列

with属性

参数 注释说明 备注
connector 数据源类型 必填,’rocketmq’,小写
name.server.address rocketmq 服务地址 必填,可用于source或sink
group 写入的组 必填,可用于source或sink
topic 写入的 topic 必填,可用于source或sink
key.columns 键列 只能用于sink
write.keys.to.body 是否将键写到body 只能用于sink