Pulsar Connector

简述

支持pulsar的读写

示例

CREATE TABLE user_log (
      user_id VARCHAR,
      item_id INT,
      is_new BOOLEAN,
      test_tiny TINYINT,
      test_small SMALLINT,
      test_big BIGINT,
      test_float FLOAT,
      test_double DOUBLE,
      test_null VARCHAR,
      topic STRING METADATA
) WITH (
    'connector' = 'pulsar',
    'topic' = 'allType',
    'admin-url' = 'http://localhost:8080',
    'service-url' = 'pulsar://localhost:6650',
    'value.format' = 'json',
--     'scan.startup.mode' = 'latest'
    'scan.startup.mode' = 'earliest'
);

参数配置

参数 默认值 描述 必填
connector null 使用的连接器,可选择pulsar和upsert-pulsar。
topic null 输入或输出的 Topic。如果有多个 Topic,使用半角逗号 (, 连接。与 topic-pattern 参数互斥。
topic-pattern null 使用正则获得匹配的 Topic。与topic 参数互斥。
service-url null Pulsar broker 的服务地址。如:pulsar://localhost:6650
admin-url null Pulsar Admin 的服务地址。如:http://localhost:8080
scan.startup.mode latest Source 的启动模式。支持 earliestlatestexternal-subscription 选项。
scan.startup.sub-name null 当使用 external-subscription 参数时,必须设置该参数。
discovery topic interval null 分区发现的时间间隔,单位为毫秒。
sink.message-router key-hash 写消息到 Pulsar 分区的路由方式。支持 key-hashround-robin、自定义 MessageRouter 实现类的引用路径。
sink.semantic at-least-once Sink 写出消息的保障级别。支持 at-least-onceexactly-oncenone 选项。
properties empty Pulsar 可选的配置集,格式为 properties.key='value'。有关详细信息,参见streamAPI配置参数
key.format null Pulsar 消息的键序列化格式。支持 rawavrojson 等格式。
key.fields null 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
key.fields-prefix null 为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。默认情况下,前缀为空。如果定义了自定义前缀,则 Table 模式和 'key.fields' 都将使用带前缀的名称。构造密钥格式的数据类型时,前缀将被删除,并且密钥格式内使用非前缀名称。
format或value.format null Pulsar 消息正文的序列化格式。支持 jsonavro 等格式。有关详细信息,参见 Flink 格式
value.fields-include ALL Pulsar 消息正文包含的字段策略。支持 ALLEXCEPT_KEY 选项。

Pulsar 消息的元数据配置

METADATA 标志用于读写 Pulsar 消息中的元数据,如下所示

说明
R/W 列定义了元数据字段是否可读(R)和/或可写(W)。只读列必须声明为 VIRTUAL,以便在 INSERT INTO 操作中排除它们。**

元数据 数据类型 描述 R/W
topic STRING NOT NULL Pulsar 消息所在的 topic 的名称。 R
messageId BYTES NOT NULL Pulsar 消息 ID。 R
sequenceId BIGINT NOT NULL Pulsar 消息的序列号。 R
publishTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的发布时间。 R
eventTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的生成时间。 R/W
properties MAP NOT NULL Pulsar 消息的扩展信息。 R/W

streamAPI配置参数

该参数对应StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象,Table模式下的配置properties参数。在 DDL 语句中,用户可以在参数前添加’properties.’来使用下述配置:

'properties.pulsar.reader.subscriptionRolePrefix' = 'pulsar-flink-',
'properties.pulsar.producer.sendTimeoutMs' = '30000',
参数 默认值 描述 生效范围
topic null Pulsar Topic。 source
topics null 使用半角逗号(,)连接的多个 Pulsar Topic。 source
topicspattern null 使用 Java 正则匹配多的多个 pulsar Topic。 source
partition.discovery.interval-millis -1 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 source
clientcachesize 100 Pulsar 客户端的缓存数量。 source、sink
auth-plugin-classname null Pulsar 客户端的鉴权类。 source、sink
auth-params null Pulsar 客户端的鉴权参数。 source、sink
flushoncheckpoint true 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 sink
failonwrite false Sink 出错时,继续确认消息。 sink
polltimeoutms 120000 等待获取下一条消息的超时时间,单位为毫秒。 source
pulsar.reader.fail-on-data-loss true 数据丢失时,是否失败。 source
pulsar.reader.use-earliest-when-data-loss false 数据丢失时,使用earliest重置offset。 source
commitmaxretries 3 向 Pulsar 消息偏移 offset 时,最大重试次数。 source
send-delay-millisecond 0 延迟消息发送(毫秒),仅限于TableApi,StreamApi请参考PulsarSerializationSchema.setDeliverAtExtractor Sink
scan.startup.mode latest 消费消息的位置。支持 earliestlatest选项。 source
enable-key-hash-range false 开启 Pulsar Key-Shared 订阅模式。 source
pulsar.reader.* Pulsar reader 的详细配置。有关详细信息,参见 Pulsar Reader source
pulsar.reader.subscriptionRolePrefix flink-pulsar- 未指定订阅者时,自动创建订阅者名称的前缀。 source
pulsar.reader.receiverQueueSize 1000 接收队列大小。 source
pulsar.producer.* Pulsar producer 的详细配置。有关详细信息,参见 Pulsar Producer Sink
pulsar.producer.sendTimeoutMs 30000 发送消息时的超时时间,单位为毫秒。 Sink
pulsar.producer.blockIfQueueFull false Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 Sink

pulsar.reader.*pulsar.producer.* 定义配置 Pulsar 行为的详细信息。星号(*)可以替换为 Pulsar 中的配置名,有关详细信息,参见 Pulsar ReaderPulsar Producer

认证配置

需要设置 properties.auth-plugin-classname、properties.auth-params 参数。

CREATE TABLE pulsar (
                       `physical_1` STRING,
                       `physical_2` INT,
                       `eventTime` TIMESTAMP(3) METADATA,
                       `properties` MAP<STRING, STRING> METADATA ,
                       `topic` STRING METADATA VIRTUAL,
                       `sequenceId` BIGINT METADATA VIRTUAL,
                       `key` STRING ,
                       `physical_3` BOOLEAN
) WITH (
    'connector' = 'pulsar',
    'topic' = 'persistent://public/default/topic82547611',
    'key.format' = 'raw',
    'key.fields' = 'key',
    'value.format' = 'avro',
    'service-url' = 'pulsar://localhost:6650',
    'admin-url' = 'http://localhost:8080',
    'scan.startup.mode' = 'earliest',
    'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    'properties.auth-params' = 'token:xxxxxxxxxx',
)

有关认证的详细信息,参见 Pulsar Security

Upsert Pulsar

Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:

  • 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件;
  • 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储在 Pulsar Topic 中,以便进一步的计算。但结果可能包含更新事件。
  • 作为实时管道的一部分,聚合数据流并将结果存储在 Pulsar Topic 中,以便进一步计算。但是结果可能包含更新事件。

基于这些需求,我们也实现了对 Upsert Pulsar 的支持。该功能支持用户以 upsert 的方式从 Pulsar Topic 中读取数据和向 Pulsar Topic 中写入数据。

在 SQL DDL 定义中,用户将 connector 设置为 upsert-pulsar,即可使用 Upsert Pulsar 连接器。

在配置方面,必须指定 Table 的主键,且不能使用 key.fieldskey.fields-prefix

作为 source,Upsert Pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果存在这个 key(,数据记录中的 value 是同一键的最后一个值的 UPDATE。如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录是 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同键的已存在行都会被覆盖。另外,值为空的消息将会被视作为 DELETE 消息。

作为 sink,Upsert Pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据作为 value 为空的 Pulsar 消息写入(表示对应键的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。