Pulsar Connector

支持版本>=2.8

简述

支持pulsar的读写

示例

CREATE TABLE user_log (
      user_id VARCHAR,
      item_id INT,
      is_new BOOLEAN,
      test_tiny TINYINT,
      test_small SMALLINT,
      test_big BIGINT,
      test_float FLOAT,
      test_double DOUBLE,
      test_null VARCHAR,
      topic STRING METADATA
) WITH (
    'connector' = 'pulsar',
    'topic' = 'allType',
    'admin-url' = 'http://localhost:8080',
    'service-url' = 'pulsar://localhost:6650',
    'value.format' = 'json',
--     'scan.startup.mode' = 'latest'
    'scan.startup.mode' = 'earliest'
);

参数配置

参数 默认值 描述 必填
connector null 使用的连接器,可选择pulsar和upsert-pulsar。
topic null 输入或输出的 Topic。如果有多个 Topic,使用半角逗号 (, 连接。与 topic-pattern 参数互斥。
topic-pattern null 使用正则获得匹配的 Topic。与topic 参数互斥。
service-url null Pulsar broker 的服务地址。如:pulsar://localhost:6650
admin-url null Pulsar Admin 的服务地址。如:http://localhost:8080
scan.startup.mode latest Source 的启动模式。支持 earliestlatestexternal-subscriptiontimestampspecific-offsets 选项。earliest:从最早可用的位置开始消费;latest:从最新可用的位置开始消费;external-subscription:从订阅组的消费位置开始消费;timestamp:从指定的消息发布时间的位置开始消费;specific-offsets:从指定的offset开始消费
scan.startup.sub-name null 当使用 external-subscription 参数时,必须设置该参数。
scan.startup.timestamp-millis null 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。
scan.startup.specific-offsets null 指定消费消息的offset\, 格式:LedgerId \ EntryId\ PartitionIndex\ Topic(full);LedgerId\ EntryId\ PartitionIndex\ Topic(full)\, e.g. 42\ 1012\ 0\ persistent://public/default/topic1;44\ 1011\ 1\ persistent://public/default/topic1 或当只有一个topic时:LedgerId\ EntryId\ PartitionIndex。当使用 specific-offsets 参数时,必须设置该参数。
partition.discovery.interval-millis null 分区发现的时间间隔,单位为毫秒。
sink.message-router key-hash 写消息到 Pulsar 分区的路由方式。支持 key-hashround-robin、自定义 MessageRouter 实现类的引用路径。
sink.semantic at-least-once Sink 写出消息的保障级别。支持 at-least-onceexactly-oncenone 选项。
properties empty Pulsar 可选的配置集,格式为 properties.key='value'。有关详细信息,参见streamAPI配置参数
key.format null Pulsar 消息的键序列化格式。支持 rawavrojson 等格式。
key.fields null 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
key.fields-prefix null 为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。默认情况下,前缀为空。如果定义了自定义前缀,则 Table 模式和 'key.fields' 都将使用带前缀的名称。构造密钥格式的数据类型时,前缀将被删除,并且密钥格式内使用非前缀名称。
format或value.format null Pulsar 消息正文的序列化格式。支持 jsonavro 等格式。有关详细信息,参见 Flink 格式
value.fields-include ALL Pulsar 消息正文包含的字段策略。支持 ALLEXCEPT_KEY 选项。
generic true 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名

Pulsar 消息的元数据配置

METADATA 标志用于读写 Pulsar 消息中的元数据,如下所示

说明R/W 列定义了元数据字段是否可读(R)和/或可写(W)。只读列必须声明为 VIRTUAL,以便在 INSERT INTO 操作中排除它们。**

元数据 数据类型 描述 R/W
topic STRING NOT NULL Pulsar 消息所在的 topic 的名称。 R
messageId BYTES NOT NULL Pulsar 消息 ID。 R
sequenceId BIGINT NOT NULL Pulsar 消息的序列号。 R
publishTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的发布时间。 R
eventTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的生成时间。 R/W
properties MAP\ NOT NULL Pulsar 消息的扩展信息。 R/W

streamAPI配置参数

注意:properties.开头的这些参数需要使用中划线来命名,参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix

该参数对应StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象,Table模式下的配置properties参数。在 DDL 语句中,用户可以在参数前添加'properties.'来使用下述配置:

'properties.pulsar.reader.subscription-role-prefix' = 'pulsar-flink-',
'properties.pulsar.producer.send-timeout-ms' = '30000',
参数 默认值 描述 生效范围
topic null Pulsar Topic。 source
topics null 使用半角逗号(,)连接的多个 Pulsar Topic。 source
topicspattern null 使用 Java 正则匹配多的多个 pulsar Topic。 source
partition.discovery.interval-millis -1 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 source
clientcachesize 100 Pulsar 客户端的缓存数量。 source、sink
auth-plugin-classname null Pulsar 客户端的鉴权类。 source、sink
auth-params null Pulsar 客户端的鉴权参数。 source、sink
flushoncheckpoint true 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 sink
failonwrite false Sink 出错时,继续确认消息。 sink
polltimeoutms 120000 等待获取下一条消息的超时时间,单位为毫秒。 source
pulsar.reader.fail-on-data-loss true 数据丢失时,是否失败。 source
pulsar.reader.use-earliest-when-data-loss false 数据丢失时,使用earliest重置offset。 source
commitmaxretries 3 向 Pulsar 消息偏移 offset 时,最大重试次数。 source
send-delay-millisecond 0 延迟消息发送(毫秒),仅限于TableApi,StreamApi请参考PulsarSerializationSchema.setDeliverAtExtractor Sink
scan.startup.mode latest 消费消息的位置。支持 earliestlatest选项。 source
enable-key-hash-range false 开启 Pulsar Key-Shared 订阅模式。 source
pulsar.reader.* Pulsar reader 的详细配置。参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix。有关详细信息,参见 Pulsar Reader source
pulsar.reader.subscription-role-prefix flink-pulsar- 未指定订阅者时,自动创建订阅者名称的前缀。 source
pulsar.reader.receiver-queue-size 1000 接收队列大小。 source
pulsar.producer.* Pulsar producer 的详细配置。参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。sendTimeoutMs,在flink任务中需要改为properties.pulsar.producer.send-timeout-ms。有关详细信息,参见 Pulsar Producer Sink
pulsar.producer.send-timeout-ms 30000 发送消息时的超时时间,单位为毫秒。 Sink
pulsar.producer.block-if-queue-full false Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 Sink

pulsar.reader.*pulsar.producer.* 定义配置 Pulsar 行为的详细信息。星号(*)可以替换为 Pulsar 中的配置名,参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix。有关详细信息,参见 Pulsar ReaderPulsar Producer

认证配置

需要设置 properties.auth-plugin-classname、properties.auth-params 参数。

CREATE TABLE pulsar (
                       `physical_1` STRING,
                       `physical_2` INT,
                       `eventTime` TIMESTAMP(3) METADATA,
                       `properties` MAP<STRING, STRING> METADATA ,
                       `topic` STRING METADATA VIRTUAL,
                       `sequenceId` BIGINT METADATA VIRTUAL,
                       `key` STRING ,
                       `physical_3` BOOLEAN
) WITH (
    'connector' = 'pulsar',
    'topic' = 'persistent://public/default/topic82547611',
    'key.format' = 'raw',
    'key.fields' = 'key',
    'value.format' = 'avro',
    'service-url' = 'pulsar://localhost:6650',
    'admin-url' = 'http://localhost:8080',
    'scan.startup.mode' = 'earliest',
    'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    'properties.auth-params' = 'token:xxxxxxxxxx',
)

有关认证的详细信息,参见 Pulsar Security

Upsert Pulsar

Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:

  • 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件;
  • 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储在 Pulsar Topic 中,以便进一步的计算。但结果可能包含更新事件。
  • 作为实时管道的一部分,聚合数据流并将结果存储在 Pulsar Topic 中,以便进一步计算。但是结果可能包含更新事件。

基于这些需求,我们也实现了对 Upsert Pulsar 的支持。该功能支持用户以 upsert 的方式从 Pulsar Topic 中读取数据和向 Pulsar Topic 中写入数据。

在 SQL DDL 定义中,用户将 connector 设置为 upsert-pulsar,即可使用 Upsert Pulsar 连接器。

在配置方面,必须指定 Table 的主键,且不能使用 key.fieldskey.fields-prefix

作为 source,Upsert Pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果存在这个 key(,数据记录中的 value 是同一键的最后一个值的 UPDATE。如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录是 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同键的已存在行都会被覆盖。另外,值为空的消息将会被视作为 DELETE 消息。

作为 sink,Upsert Pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据作为 value 为空的 Pulsar 消息写入(表示对应键的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

简述

支持pulsar的读写

示例

CREATE TABLE user_log (
      user_id VARCHAR,
      item_id INT,
      is_new BOOLEAN,
      test_tiny TINYINT,
      test_small SMALLINT,
      test_big BIGINT,
      test_float FLOAT,
      test_double DOUBLE,
      test_null VARCHAR,
      topic STRING METADATA
) WITH (
    'connector' = 'pulsar',
    'topic' = 'allType',
    'admin-url' = 'http://localhost:8080',
    'service-url' = 'pulsar://localhost:6650',
    'value.format' = 'json',
--     'scan.startup.mode' = 'latest'
    'scan.startup.mode' = 'earliest'
);

参数配置

配置名称 是否必填 配置生效类型 数据类型 默认值 参数说明
connector 必填 源表、目标表 String - 使用的连接器,可选择pulsar和upsert-pulsar。
service-url 必填 源表、目标表 String - Pulsar broker 的服务地址。如:pulsar://localhost:6650
admin-url 必填 源表、目标表 String - Pulsar Admin 的服务地址。如:http://localhost:8080
generic 可选 源表、目标表 Boolean true 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名
format 可选 源表、目标表 String - 用来序列化或反序列化 pulsar 消息的格式。
key.format 可选 源表、目标表 String - Pulsar 消息的键序列化格式。支持 raw、avro、json 等格式。
key.fields 可选 源表、目标表 String - 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
key.fields-prefix 可选 源表、目标表 String - 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。
value.format 可选 源表、目标表 String - Pulsar 消息正文的序列化格式。支持 json、avro 等格式。
value.fields-include 可选 源表、目标表 String ALL Pulsar 消息正文包含的字段策略。支持 ALLEXCEPT_KEY 选项。
topic 目标表必填;
源表必须设置topictopic-pattern
源表、目标表 String - 输入或输出的 Topic。如果有多个 Topic,使用半角逗号(,)连接。与 topic-pattern 参数互斥。
topic-pattern 源表必须设置topictopic-pattern 源表 String - 使用正则获得匹配的 Topic。与topic 参数互斥。
scan.startup.mode 可选 源表 String latest Source 的启动模式。支持 earliestlatestexternal-subscriptiontimestampspecific-offsets 选项。earliest:从最早可用的位置开始消费;latest:从最新可用的位置开始消费;external-subscription:从订阅组的消费位置开始消费; timestamp:从指定的消息发布时间的位置开始消费;specific-offsets:从指定的offset开始消费
scan.startup.specific-offsets 可选 源表 String - 指定消费消息的offset。当启动模式使用 specific-offsets 参数时,必须设置该参数。
scan.startup.sub-name 可选 源表 String - 当启动模式使用 external-subscription 参数时,必须设置该参数。
scan.startup.sub-start-offset 可选 源表 String latest 当启动模式使用 external-subscription 参数时,可以设置该参数。
scan.startup.timestamp-millis 可选 源表 Long - 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。
partition.discovery.interval-millis 可选 源表 Long -1 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。
sink.semantic 可选 目标表 String at-least-once Sink 写出消息的保障级别。支持 at-least-onceexactly-oncenone 选项。
sink.message-router 可选 目标表 String - 写消息到 Pulsar 分区的路由方式。支持 key-hashround-robin自定义 MessageRouter 实现类的引用路径。
sink.parallelism 可选 目标表 Integer - sink算子并行度。

Pulsar 消息的元数据配置

METADATA 标志用于读写 Pulsar 消息中的元数据,如下所示

说明R/W 列定义了元数据字段是否可读(R)和/或可写(W)。只读列必须声明为 VIRTUAL,以便在 INSERT INTO 操作中排除它们。**

元数据 数据类型 描述 R/W
topic STRING NOT NULL Pulsar 消息所在的 topic 的名称。 R
messageId BYTES NOT NULL Pulsar 消息 ID。 R
sequenceId BIGINT NOT NULL Pulsar 消息的序列号。 R
publishTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的发布时间。 R
eventTime TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL Pulsar 消息的生成时间。 R/W
properties MAP\ NOT NULL Pulsar 消息的扩展信息。 R/W

streamAPI配置参数

该参数对应StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象,Table模式下的配置properties参数。在 DDL 语句中,用户可以在参数前添加'properties.'来使用下述配置:

'properties.pulsar.reader.subscriptionRolePrefix' = 'pulsar-flink-',
'properties.pulsar.producer.sendTimeoutMs' = '30000',
参数 默认值 描述 生效范围
topic null Pulsar Topic。 source
topics null 使用半角逗号(,)连接的多个 Pulsar Topic。 source
topicspattern null 使用 Java 正则匹配多的多个 pulsar Topic。 source
partition.discovery.interval-millis -1 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 source
clientcachesize 100 Pulsar 客户端的缓存数量。 source、sink
auth-plugin-classname null Pulsar 客户端的鉴权类。 source、sink
auth-params null Pulsar 客户端的鉴权参数。 source、sink
flushoncheckpoint true 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 sink
failonwrite false Sink 出错时,继续确认消息。 sink
polltimeoutms 120000 等待获取下一条消息的超时时间,单位为毫秒。 source
pulsar.reader.fail-on-data-loss true 数据丢失时,是否失败。 source
pulsar.reader.use-earliest-when-data-loss false 数据丢失时,使用earliest重置offset。 source
commitmaxretries 3 向 Pulsar 消息偏移 offset 时,最大重试次数。 source
send-delay-millisecond 0 延迟消息发送(毫秒),仅限于TableApi,StreamApi请参考PulsarSerializationSchema.setDeliverAtExtractor Sink
scan.startup.mode latest 消费消息的位置。支持 earliestlatest选项。 source
enable-key-hash-range false 开启 Pulsar Key-Shared 订阅模式。 source
pulsar.reader.* Pulsar reader 的详细配置。有关详细信息,参见 Pulsar Reader source
pulsar.reader.subscription-role-prefix flink-pulsar- 未指定订阅者时,自动创建订阅者名称的前缀。 source
pulsar.reader.receiverQueueSize 1000 接收队列大小。 source
pulsar.producer.* Pulsar producer 的详细配置。有关详细信息,参见 Pulsar Producer Sink
pulsar.producer.sendTimeoutMs 30000 发送消息时的超时时间,单位为毫秒。 Sink
pulsar.producer.blockIfQueueFull false Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 Sink

pulsar.reader.*pulsar.producer.* 定义配置 Pulsar 行为的详细信息。星号(*)可以替换为 Pulsar 中的配置名,有关详细信息,参见 Pulsar ReaderPulsar Producer

认证配置

需要设置 properties.auth-plugin-classname、properties.auth-params 参数。

CREATE TABLE pulsar (
                       `physical_1` STRING,
                       `physical_2` INT,
                       `eventTime` TIMESTAMP(3) METADATA,
                       `properties` MAP<STRING, STRING> METADATA ,
                       `topic` STRING METADATA VIRTUAL,
                       `sequenceId` BIGINT METADATA VIRTUAL,
                       `key` STRING ,
                       `physical_3` BOOLEAN
) WITH (
    'connector' = 'pulsar',
    'topic' = 'persistent://public/default/topic82547611',
    'key.format' = 'raw',
    'key.fields' = 'key',
    'value.format' = 'avro',
    'service-url' = 'pulsar://localhost:6650',
    'admin-url' = 'http://localhost:8080',
    'scan.startup.mode' = 'earliest',
    'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
    'properties.auth-params' = 'token:xxxxxxxxxx',
)

有关认证的详细信息,参见 Pulsar Security

Upsert Pulsar

Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:

  • 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件;
  • 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储在 Pulsar Topic 中,以便进一步的计算。但结果可能包含更新事件。
  • 作为实时管道的一部分,聚合数据流并将结果存储在 Pulsar Topic 中,以便进一步计算。但是结果可能包含更新事件。

基于这些需求,我们也实现了对 Upsert Pulsar 的支持。该功能支持用户以 upsert 的方式从 Pulsar Topic 中读取数据和向 Pulsar Topic 中写入数据。

在 SQL DDL 定义中,用户将 connector 设置为 upsert-pulsar,即可使用 Upsert Pulsar 连接器。

在配置方面,必须指定 Table 的主键,且不能使用 key.fieldskey.fields-prefix

作为 source,Upsert Pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果存在这个 key(,数据记录中的 value 是同一键的最后一个值的 UPDATE。如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录是 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同键的已存在行都会被覆盖。另外,值为空的消息将会被视作为 DELETE 消息。

作为 sink,Upsert Pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据作为 value 为空的 Pulsar 消息写入(表示对应键的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。