RocketMQ SQL 连接器
更新时间: 2022-07-05 16:15:56
阅读 683
Rocketmq
Easystream 支持使用 Rocketmq 作为输入/输出数据源。
Flink-1.10
Rocketmq Source
简述
Easystream 支持使用 Rocketmq 作为输入数据源。
示例
CREATE TABLE source (
`key` varchar,
`value` VARCHAR
) WITH (
'connector.type' = 'rocketmq',
'nameserver.address' = 'xxx:9876;xxx:9876',
'group' = 'rocketmqTest',
'topic' = 'rocketmqTest',
'brokerserver.heartbeat.interval' = '30000',
'tag' = '*',
'offset.persist.interval' = '5000',
'batch.size' = '256',
'delay.when.message.not.found'='10',
'offset.reset' = 'earlist'
-- 'consumer.offset.from.timestamp'='1577253600000'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 数据源类型 | 必填,’rocketmq’,小写 |
nameserver.address | rocketmq 服务地址 | 必填 |
group | 消费者组 (consumerGroup) | 必填 |
topic | 读取的 topic | 必填 |
offset.reset | offset 设置的类型:earliest、latest、timestamp。如果设置为 timestamp,则必须指定参数 offset.from.timestamp | 必填 |
offset.from.timestamp | 如果 offset=timestamp,则此值需要填一个13位的毫秒时间 | 选填 |
brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
tag | 读取的tag | 选填,默认为’*’,全部tag |
offset.persist.interval | 不开启 checkpoint 下,offset 提交的时间间隔 | 选填,默认 5000ms |
batch.size | 每次拉取的最大数据条数 | 选填,默认256条 |
delay.when.message.not.found | 当没有获取到数据时的等待时间 | 选填,默认 10ms |
Rocketmq Sink
简述
Easystream 支持输出到 Rocketmq。
示例
create table sink(
`key` varchar,
`value` VARCHAR
) with(
'connector.type' = 'rocketmq',
'nameserver.address' = 'xxx:9876;xxx:9876',
'group' = 'sloth-test-sink-0117-01',
'topic' = 'sloth-test-0117-01',
'brokerserver.heartbeat.interval' = '30000',
'async' = 'true',
'retry.times' = '3',
'timeout' = '3000',
'flash.on.checkpoint' = 'true',
'msg.delay.level' = '0'
);
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 数据源类型 | 必填,’rocketmq’,小写 |
nameserver.address | rocketmq 服务地址 | 必填 |
group | 写入的组 | 必填 |
topic | 写入的 topic | 必填 |
brokerserver.heartbeat.interval | 与 brokerserver 的心跳间隔 | 选填,默认是 30000ms |
async | 是否开启异步写入 | 选填,默认为 false |
retry.times | async=true 时的重试次数 | 选填,默认3次 |
flash.on.checkpoint | 是否每次打 checkpoint 时才刷新数据到目标端 | 选填,默认为 false |
msg.delay.level | 数据的发送级别 | 选填,默认为0 |
FLINK-1.12
RocketMq source
CREATE TABLE rocketmq_source (
`topic` STRING METADATA VIRTUAL,
`surname` STRING,
`givenName` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'sloth-test-0601',
'group' = 'behavior_consume_group',
'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876'
);
RocketMq sink
CREATE TABLE rocketmq_sink (
`surname` STRING,
`givenName` STRING
) WITH (
'connector' = 'rocketmq',
'topic' = 'sloth-test-0601-sink',
'group' = 'behavior_produce_group',
'name.server.address' = 'sloth-test1.dg.163.org:9876;sloth-test2.dg.163.org:9876',
'key.columns' = 'surname',
'write.keys.to.body' = 'true'
);
属性
支持的metadata列
参数 | 注释说明 | 备注 |
---|---|---|
topic | topic名 | 只读列 |
with属性
参数 | 注释说明 | 备注 |
---|---|---|
connector | 数据源类型 | 必填,’rocketmq’,小写 |
name.server.address | rocketmq 服务地址 | 必填,可用于source或sink |
group | 写入的组 | 必填,可用于source或sink |
topic | 写入的 topic | 必填,可用于source或sink |
key.columns | 键列 | 只能用于sink |
write.keys.to.body | 是否将键写到body | 只能用于sink |
文档反馈
以上内容对您是否有帮助?