RocketMQ SQL 连接器
更新时间: 2023-04-24 20:23:30
阅读 184
Rocketmq
Easystream 支持使用 Rocketmq 作为输入/输出数据源。
Flink-1.10
Rocketmq Source
简述
Easystream 支持使用 Rocketmq 作为输入数据源。
示例
CREATE TABLE source (
`key` varchar,
`value` VARCHAR
) WITH (
'connector.type' = 'rocketmq',
'nameserver.address' = 'xxx:9876;xxx:9876',
'group' = 'rocketmqTest',
'topic' = 'rocketmqTest',
'brokerserver.heartbeat.interval' = '30000',
'tag' = '*',
'offset.persist.interval' = '5000',
'batch.size' = '256',
'delay.when.message.not.found'='10',
'offset.reset' = 'earlist'
-- 'consumer.offset.from.timestamp'='1577253600000'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 数据源类型 | 必填,'rocketmq',小写 |
nameserver.address | rocketmq 服务地址 | 必填 |
group | 消费者组 (consumerGroup) | 必填 |
topic | 读取的 topic | 必填 |
offset.reset | offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp | 必填 |
offset.from.timestamp | 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 | 选填 |
brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
tag | 读取的tag | 选填,默认为'*',全部tag |
offset.persist.interval | 不开启 checkpoint 下,offset 提交的时间间隔 | 选填,默认 5000ms |
batch.size | 每次拉取的最大数据条数 | 选填,默认256条 |
delay.when.message.not.found | 当没有获取到数据时的等待时间 | 选填,默认 10ms |
Rocketmq Sink
简述
Easystream 支持输出到 Rocketmq。
示例
create table sink(
`key` varchar,
`value` VARCHAR
) with(
'connector.type' = 'rocketmq',
'nameserver.address' = 'xxx:9876;xxx:9876',
'group' = 'sloth-test-sink-0117-01',
'topic' = 'sloth-test-0117-01',
'brokerserver.heartbeat.interval' = '30000',
'async' = 'true',
'retry.times' = '3',
'timeout' = '3000',
'flash.on.checkpoint' = 'true',
'msg.delay.level' = '0'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 数据源类型 | 必填,'rocketmq',小写 |
nameserver.address | rocketmq 服务地址 | 必填 |
group | 写入的组 | 必填 |
topic | 写入的 topic | 必填 |
brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
async | 是否开启异步写入 | 选填,默认为 false |
retry.times | async=true 时的重试次数 | 选填,默认3次 |
flash.on.checkpoint | 是否每次打 checkpoint 时才刷新数据到目标端 | 选填,默认为 false |
msg.delay.level | 数据的发送级别 | 选填,默认为0 |
FLINK-1.12
RocketMq source
CREATE TABLE rocketmq_source (
`surname` STRING,
`givenName` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'sloth-test-0601',
'group' = 'behavior_consume_group',
'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876'
);
RocketMq sink
CREATE TABLE rocketmq_sink (
`surname` STRING,
`givenName` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'sloth-test-0601-sink',
'group' = 'behavior_produce_group',
'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876',
'key.columns' = 'surname',
'write.keys.to.body' = 'true'
);
属性
支持的metadata列
参数 | 注释说明 | 备注 |
---|---|---|
topic | topic名 | 只读列 |
参数配置
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数默认值 | 参数说明(用于用户手册) |
---|---|---|---|---|---|
connector | 必填 | 源表、目标表 | String | - | 必须为rocketmq |
name.server.address | 必填 | 源表、目标表 | String | - | rocketmq 服务地址 |
topic | 必填 | 源表、目标表 | String | - | 读取的 topic |
group | 可选 | 源表、目标表 | String | - | 消费者组 (consumerGroup) |
tag | 可选 | 源表、目标表 | String | - | 读取的tag |
start.message.offset | 可选 | 源表 | Integer | -1 | 消息开始的偏移量 |
start.time.ms | 可选 | 源表 | Long | -1 | 启动时间点。时间戳,单位为毫秒。 |
start.time | 可选 | 源表 | String | - | 启动时间点。字符串,形式为: 'yyyy-MM-dd HH:mm:ss' |
end.time | 可选 | 源表 | String | - | 截止时间。字符串,形式为:'yyyy-MM-dd HH:mm:ss' |
time.zone | 可选 | 源表 | String | - | 时区 |
partition.discovery.interval.ms | 可选 | 源表 | Long | 30000 | 源表检查新分区的时间间隔,单位为ms。 |
use.new.api | 可选 | 源表 | Boolean | true | 是否启用新API,默认启用。 |
encoding | 可选 | 目标表 | String | UTF-8 | 目标表编码 |
field.delimiter | 可选 | 目标表 | String | \u0001 | 目标表字段分隔符 |
retry.times | 可选 | 目标表 | Integer | 10 | 目标表写入失败重试次数 |
sleep.timeMs | 可选 | 目标表 | Long | 5000L | 目标表写入失败重试间隔时间,单位为毫秒 |
is.dynamic.tag | 可选 | 目标表 | Boolean | false | - |
dynamic.tag.column | 可选 | 目标表 | String | - | - |
dynamic.tag.column.write.included | 可选 | 目标表 | Boolean | true | - |
write.keys.to.body | 可选 | 目标表 | Boolean | false | 是否将键写到body |
key.columns | 可选 | 目标表 | String | - | 键列 |
文档反馈
以上内容对您是否有帮助?