Pulsar流表
更新时间: 2024-03-01 01:57:53
阅读 251
本文将为您介绍 EasyStream 中如何创建 Pulsar 类型流表。
操作步骤
- 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
- 单击 创建表 按键,进行表创建。
- 选择 数据源类型 为 Pulsar。
- 完成剩余表单内容,创建Pulsar流表。
流表配置
配置名称 | 是否必填 | 配置生效类型 | 数据类型 | 默认值 | 参数说明 |
---|---|---|---|---|---|
generic | 可选 | 源表、目标表 | Boolean | true | 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名 |
format | 可选 | 源表、目标表 | String | - | 用来序列化或反序列化 pulsar 消息的格式。 |
key.format | 可选 | 源表、目标表 | String | - | Pulsar 消息的键序列化格式。支持 raw、avro、json 等格式。 |
key.fields | 可选 | 源表、目标表 | String | - | 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。 |
key.fields-prefix | 可选 | 源表、目标表 | String | - | 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。 |
value.format | 可选 | 源表、目标表 | String | - | Pulsar 消息正文的序列化格式。支持 json、avro 等格式。 |
value.fields-include | 可选 | 源表、目标表 | String | ALL | Pulsar 消息正文包含的字段策略。支持 ALL 和 EXCEPT_KEY 选项。 |
topic | 目标表必填; 源表必须设置 topic 或topic-pattern |
源表、目标表 | String | - | 输入或输出的 Topic。如果有多个 Topic,使用半角逗号(,)连接。与 topic-pattern 参数互斥。 |
topic-pattern | 源表必须设置topic 或topic-pattern |
源表 | String | - | 使用正则获得匹配的 Topic。与topic 参数互斥。 |
scan.startup.mode | 可选 | 源表 | String | latest | Source 的启动模式。支持 earliest 、latest 、external-subscription 、timestamp 、specific-offsets 选项。earliest:从最早可用的位置开始消费;latest:从最新可用的位置开始消费;external-subscription:从订阅组的消费位置开始消费;timestamp:从指定的消息发布时间的位置开始消费;specific-offsets:从指定的offset开始消费 |
scan.startup.specific-offsets | 可选 | 源表 | String | - | 指定消费消息的offset。当启动模式使用 specific-offsets 参数时,必须设置该参数。 |
scan.startup.sub-name | 可选 | 源表 | String | - | 当启动模式使用 external-subscription 参数时,必须设置该参数。 |
scan.startup.sub-start-offset | 可选 | 源表 | String | latest | 当启动模式使用 external-subscription 参数时,可以设置该参数。 |
scan.startup.timestamp-millis | 可选 | 源表 | Long | - | 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。 |
partition.discovery.interval-millis | 可选 | 源表 | Long | -1 | 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 |
sink.semantic | 可选 | 目标表 | String | at-least-once | Sink 写出消息的保障级别。支持 at-least-once、exactly-once、none 选项。 |
sink.message-router | 可选 | 目标表 | String | - | 写消息到 Pulsar 分区的路由方式。支持 key-hash 、 round-robin 、自定义 MessageRouter 实现类 的引用路径。 |
sink.parallelism | 可选 | 目标表 | Integer | - | sink算子并行度。 |
文档反馈
以上内容对您是否有帮助?