Rocketmq

Easystream 支持使用 Rocketmq 作为输入/输出数据源。

Flink-1.10

Rocketmq Source

简述

Easystream 支持使用 Rocketmq 作为输入数据源。

示例

CREATE TABLE source (
 `key` varchar,
 `value` VARCHAR
) WITH (
  'connector.type' = 'rocketmq',
  'nameserver.address' = 'xxx:9876;xxx:9876',
  'group' = 'rocketmqTest',
  'topic' = 'rocketmqTest',
  'brokerserver.heartbeat.interval' = '30000',
  'tag' = '*',
  'offset.persist.interval' = '5000',
  'batch.size' = '256',
  'delay.when.message.not.found'='10',
  'offset.reset' = 'earlist'
   -- 'consumer.offset.from.timestamp'='1577253600000'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,'rocketmq',小写
nameserver.address rocketmq 服务地址 必填
group 消费者组 (consumerGroup) 必填
topic 读取的 topic 必填
offset.reset offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp 必填
offset.from.timestamp 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 选填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
tag 读取的tag 选填,默认为'*',全部tag
offset.persist.interval 不开启 checkpoint 下,offset 提交的时间间隔 选填,默认 5000ms
batch.size 每次拉取的最大数据条数 选填,默认256条
delay.when.message.not.found 当没有获取到数据时的等待时间 选填,默认 10ms

Rocketmq Sink

简述

Easystream 支持输出到 Rocketmq。

示例

create table sink(
  `key` varchar,
  `value` VARCHAR
) with(
   'connector.type' = 'rocketmq',
   'nameserver.address' = 'xxx:9876;xxx:9876',
   'group' = 'sloth-test-sink-0117-01',
   'topic' = 'sloth-test-0117-01',
   'brokerserver.heartbeat.interval' = '30000',
   'async' = 'true',
   'retry.times' = '3',
   'timeout' = '3000',
   'flash.on.checkpoint' = 'true',
   'msg.delay.level' = '0'
);

With 参数

参数 注释说明 备注
connector.type 数据源类型 必填,'rocketmq',小写
nameserver.address rocketmq 服务地址 必填
group 写入的组 必填
topic 写入的 topic 必填
brokerserver.heartbeat.interval 与 brokerserver 的心跳间隔 选填,默认是 30000ms
async 是否开启异步写入 选填,默认为 false
retry.times async=true 时的重试次数 选填,默认3次
flash.on.checkpoint 是否每次打 checkpoint 时才刷新数据到目标端 选填,默认为 false
msg.delay.level 数据的发送级别 选填,默认为0

FLINK-1.12

RocketMq source

CREATE TABLE rocketmq_source (
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601',
  'group' = 'behavior_consume_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876'
);

RocketMq sink

CREATE TABLE rocketmq_sink (
  `surname` STRING,
  `givenName` STRING
) WITH (
  'connector' = 'rocketmq',
  'topic' = 'sloth-test-0601-sink',
  'group' = 'behavior_produce_group',
  'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876',
  'key.columns' = 'surname',
  'write.keys.to.body' = 'true'
);

属性

支持的metadata列

参数 注释说明 备注
topic topic名 只读列

参数配置

配置名称 是否必填 配置生效类型 参数值字段类型 参数默认值 参数说明(用于用户手册)
connector 必填 源表、目标表 String - 必须为rocketmq
name.server.address 必填 源表、目标表 String - rocketmq 服务地址
topic 必填 源表、目标表 String - 读取的 topic
group 可选 源表、目标表 String - 消费者组 (consumerGroup)
tag 可选 源表、目标表 String - 读取的tag
start.message.offset 可选 源表 Integer -1 消息开始的偏移量
start.time.ms 可选 源表 Long -1 启动时间点。时间戳,单位为毫秒。
start.time 可选 源表 String - 启动时间点。字符串,形式为: 'yyyy-MM-dd HH:mm:ss'
end.time 可选 源表 String - 截止时间。字符串,形式为:'yyyy-MM-dd HH:mm:ss'
time.zone 可选 源表 String - 时区
partition.discovery.interval.ms 可选 源表 Long 30000 源表检查新分区的时间间隔,单位为ms。
use.new.api 可选 源表 Boolean true 是否启用新API,默认启用。
encoding 可选 目标表 String UTF-8 目标表编码
field.delimiter 可选 目标表 String \u0001 目标表字段分隔符
retry.times 可选 目标表 Integer 10 目标表写入失败重试次数
sleep.timeMs 可选 目标表 Long 5000L 目标表写入失败重试间隔时间,单位为毫秒
is.dynamic.tag 可选 目标表 Boolean false -
dynamic.tag.column 可选 目标表 String - -
dynamic.tag.column.write.included 可选 目标表 Boolean true -
write.keys.to.body 可选 目标表 Boolean false 是否将键写到body
key.columns 可选 目标表 String - 键列