本文将为您介绍 EasyStream 中如何创建 Pulsar 类型流表。

操作步骤

  1. 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
  2. 单击 创建表 按键,进行表创建。
  3. 选择 数据源类型Pulsar
  4. 完成剩余表单内容,创建Pulsar流表。

流表配置

配置名称 是否必填 配置生效类型 数据类型 默认值 参数说明
generic 可选 源表、目标表 Boolean true 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名
format 可选 源表、目标表 String - 用来序列化或反序列化 pulsar 消息的格式。
key.format 可选 源表、目标表 String - Pulsar 消息的键序列化格式。支持 raw、avro、json 等格式。
key.fields 可选 源表、目标表 String - 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
key.fields-prefix 可选 源表、目标表 String - 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
value.format 可选 源表、目标表 String - Pulsar 消息正文的序列化格式。支持 json、avro 等格式。
value.fields-include 可选 源表、目标表 String ALL Pulsar 消息正文包含的字段策略。支持 ALLEXCEPT_KEY 选项。
topic 目标表必填;
源表必须设置topictopic-pattern
源表、目标表 String - 输入或输出的 Topic。如果有多个 Topic,使用半角逗号(,)连接。与 topic-pattern 参数互斥。
topic-pattern 源表必须设置topictopic-pattern 源表 String - 使用正则获得匹配的 Topic。与topic 参数互斥。
scan.startup.mode 可选 源表 String latest Source 的启动模式。支持 earliestlatestexternal-subscriptiontimestampspecific-offsets 选项。earliest:从最早可用的位置开始消费;latest:从最新可用的位置开始消费;external-subscription:从订阅组的消费位置开始消费;timestamp:从指定的消息发布时间的位置开始消费;specific-offsets:从指定的offset开始消费
scan.startup.specific-offsets 可选 源表 String - 指定消费消息的offset。当启动模式使用 specific-offsets 参数时,必须设置该参数。
scan.startup.sub-name 可选 源表 String - 当启动模式使用 external-subscription 参数时,必须设置该参数。
scan.startup.sub-start-offset 可选 源表 String latest 当启动模式使用 external-subscription 参数时,可以设置该参数。
scan.startup.timestamp-millis 可选 源表 Long - 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。
partition.discovery.interval-millis 可选 源表 Long -1 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。
sink.semantic 可选 目标表 String at-least-once Sink 写出消息的保障级别。支持 at-least-once、exactly-once、none 选项。
sink.message-router 可选 目标表 String - 写消息到 Pulsar 分区的路由方式。支持 key-hashround-robin自定义 MessageRouter 实现类的引用路径。
sink.parallelism 可选 目标表 Integer - sink算子并行度。