Pulsar SQL 连接器
Pulsar Connector
支持版本>=2.8
Flink-1.12
简述
支持pulsar的读写
示例
CREATE TABLE user_log (
user_id VARCHAR,
item_id INT,
is_new BOOLEAN,
test_tiny TINYINT,
test_small SMALLINT,
test_big BIGINT,
test_float FLOAT,
test_double DOUBLE,
test_null VARCHAR,
topic STRING METADATA
) WITH (
'connector' = 'pulsar',
'topic' = 'allType',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650',
'value.format' = 'json',
-- 'scan.startup.mode' = 'latest'
'scan.startup.mode' = 'earliest'
);
参数配置
参数 | 默认值 | 描述 | 必填 |
---|---|---|---|
connector | null | 使用的连接器,可选择pulsar和upsert-pulsar。 | 是 |
topic | null | 输入或输出的 Topic。如果有多个 Topic,使用半角逗号 (, 连接。与 topic-pattern 参数互斥。 |
否 |
topic-pattern | null | 使用正则获得匹配的 Topic。与topic 参数互斥。 |
否 |
service-url | null | Pulsar broker 的服务地址。如:pulsar://localhost:6650 | 是 |
admin-url | null | Pulsar Admin 的服务地址。如:http://localhost:8080 | 是 |
scan.startup.mode | latest | Source 的启动模式。支持 earliest 、latest 、external-subscription 、timestamp 、specific-offsets 选项。earliest :从最早可用的位置开始消费;latest :从最新可用的位置开始消费;external-subscription :从订阅组的消费位置开始消费;timestamp :从指定的消息发布时间的位置开始消费;specific-offsets :从指定的offset开始消费 |
否 |
scan.startup.sub-name | null | 当使用 external-subscription 参数时,必须设置该参数。 |
否 |
scan.startup.timestamp-millis | null | 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。 |
否 |
scan.startup.specific-offsets | null | 指定消费消息的offset, 格式:LedgerId | EntryId|PartitionIndex|Topic(full);LedgerId|EntryId|PartitionIndex|Topic(full), e.g. 42|1012|0|persistent://public/default/topic1;44|1011|1|persistent://public/default/topic1 或当只有一个topic时:LedgerId|EntryId|PartitionIndex。当使用 specific-offsets 参数时,必须设置该参数。 |
否 |
partition.discovery.interval-millis | null | 分区发现的时间间隔,单位为毫秒。 | 否 |
sink.message-router | key-hash | 写消息到 Pulsar 分区的路由方式。支持 key-hash 、 round-robin 、自定义 MessageRouter 实现类的引用路径。 |
否 |
sink.semantic | at-least-once | Sink 写出消息的保障级别。支持 at-least-once 、exactly-once 、none 选项。 |
否 |
properties | empty | Pulsar 可选的配置集,格式为 properties.key='value' 。有关详细信息,参见streamAPI配置参数。 |
否 |
key.format | null | Pulsar 消息的键序列化格式。支持 raw 、avro 、json 等格式。 |
否 |
key.fields | null | 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。 | 否 |
key.fields-prefix | null | 为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。默认情况下,前缀为空。如果定义了自定义前缀,则 Table 模式和 'key.fields' 都将使用带前缀的名称。构造密钥格式的数据类型时,前缀将被删除,并且密钥格式内使用非前缀名称。 |
否 |
format或value.format | null | Pulsar 消息正文的序列化格式。支持 json 、avro 等格式。有关详细信息,参见 Flink 格式 |
是 |
value.fields-include | ALL | Pulsar 消息正文包含的字段策略。支持 ALL 和 EXCEPT_KEY 选项。 |
否 |
generic | true | 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名 | 否 |
Pulsar 消息的元数据配置
METADATA
标志用于读写 Pulsar 消息中的元数据,如下所示
说明
R/W 列定义了元数据字段是否可读(R)和/或可写(W)。只读列必须声明为 VIRTUAL,以便在 INSERT INTO 操作中排除它们。**
元数据 | 数据类型 | 描述 | R/W |
---|---|---|---|
topic | STRING NOT NULL | Pulsar 消息所在的 topic 的名称。 | R |
messageId | BYTES NOT NULL | Pulsar 消息 ID。 | R |
sequenceId | BIGINT NOT NULL | Pulsar 消息的序列号。 | R |
publishTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的发布时间。 | R |
eventTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的生成时间。 | R/W |
properties | MAP |
Pulsar 消息的扩展信息。 | R/W |
streamAPI配置参数
注意:properties.开头的这些参数需要使用中划线来命名,参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix
该参数对应StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象,Table模式下的配置properties参数。在 DDL 语句中,用户可以在参数前添加'properties.'来使用下述配置:
'properties.pulsar.reader.subscription-role-prefix' = 'pulsar-flink-',
'properties.pulsar.producer.send-timeout-ms' = '30000',
参数 | 默认值 | 描述 | 生效范围 |
---|---|---|---|
topic | null | Pulsar Topic。 | source |
topics | null | 使用半角逗号(,)连接的多个 Pulsar Topic。 | source |
topicspattern | null | 使用 Java 正则匹配多的多个 pulsar Topic。 | source |
partition.discovery.interval-millis | -1 | 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 | source |
clientcachesize | 100 | Pulsar 客户端的缓存数量。 | source、sink |
auth-plugin-classname | null | Pulsar 客户端的鉴权类。 | source、sink |
auth-params | null | Pulsar 客户端的鉴权参数。 | source、sink |
flushoncheckpoint | true | 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 | sink |
failonwrite | false | Sink 出错时,继续确认消息。 | sink |
polltimeoutms | 120000 | 等待获取下一条消息的超时时间,单位为毫秒。 | source |
pulsar.reader.fail-on-data-loss | true | 数据丢失时,是否失败。 | source |
pulsar.reader.use-earliest-when-data-loss | false | 数据丢失时,使用earliest重置offset。 | source |
commitmaxretries | 3 | 向 Pulsar 消息偏移 offset 时,最大重试次数。 | source |
send-delay-millisecond | 0 | 延迟消息发送(毫秒),仅限于TableApi,StreamApi请参考PulsarSerializationSchema.setDeliverAtExtractor |
Sink |
scan.startup.mode | latest | 消费消息的位置。支持 earliest 和 latest 选项。 |
source |
enable-key-hash-range | false | 开启 Pulsar Key-Shared 订阅模式。 | source |
pulsar.reader.* | Pulsar reader 的详细配置。参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix。有关详细信息,参见 Pulsar Reader。 | source | |
pulsar.reader.subscription-role-prefix | flink-pulsar- | 未指定订阅者时,自动创建订阅者名称的前缀。 | source |
pulsar.reader.receiver-queue-size | 1000 | 接收队列大小。 | source |
pulsar.producer.* | Pulsar producer 的详细配置。参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。sendTimeoutMs,在flink任务中需要改为properties.pulsar.producer.send-timeout-ms。有关详细信息,参见 Pulsar Producer。 | Sink | |
pulsar.producer.send-timeout-ms | 30000 | 发送消息时的超时时间,单位为毫秒。 | Sink |
pulsar.producer.block-if-queue-full | false | Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 | Sink |
pulsar.reader.*
和 pulsar.producer.*
定义配置 Pulsar 行为的详细信息。星号(*)可以替换为 Pulsar 中的配置名,参数pulsar官方文档中的参数需要将驼峰命名改为中划线命名。如pulsar官方reader文档中参数subscriptionRolePrefix,在flink任务中需要改为properties.pulsar.reader.subscription-role-prefix。有关详细信息,参见 Pulsar Reader和 Pulsar Producer。
认证配置
需要设置 properties.auth-plugin-classname、properties.auth-params 参数。
CREATE TABLE pulsar (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/topic82547611',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest',
'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
'properties.auth-params' = 'token:xxxxxxxxxx',
)
有关认证的详细信息,参见 Pulsar Security。
Upsert Pulsar
Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:
- 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件;
- 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储在 Pulsar Topic 中,以便进一步的计算。但结果可能包含更新事件。
- 作为实时管道的一部分,聚合数据流并将结果存储在 Pulsar Topic 中,以便进一步计算。但是结果可能包含更新事件。
基于这些需求,我们也实现了对 Upsert Pulsar 的支持。该功能支持用户以 upsert 的方式从 Pulsar Topic 中读取数据和向 Pulsar Topic 中写入数据。
在 SQL DDL 定义中,用户将 connector
设置为 upsert-pulsar
,即可使用 Upsert Pulsar 连接器。
在配置方面,必须指定 Table 的主键,且不能使用 key.fields
、key.fields-prefix
。
作为 source,Upsert Pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果存在这个 key(,数据记录中的 value 是同一键的最后一个值的 UPDATE。如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录是 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同键的已存在行都会被覆盖。另外,值为空的消息将会被视作为 DELETE 消息。
作为 sink,Upsert Pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据作为 value 为空的 Pulsar 消息写入(表示对应键的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
Flink-1.14
简述
支持pulsar的读写
示例
CREATE TABLE user_log (
user_id VARCHAR,
item_id INT,
is_new BOOLEAN,
test_tiny TINYINT,
test_small SMALLINT,
test_big BIGINT,
test_float FLOAT,
test_double DOUBLE,
test_null VARCHAR,
topic STRING METADATA
) WITH (
'connector' = 'pulsar',
'topic' = 'allType',
'admin-url' = 'http://localhost:8080',
'service-url' = 'pulsar://localhost:6650',
'value.format' = 'json',
-- 'scan.startup.mode' = 'latest'
'scan.startup.mode' = 'earliest'
);
参数配置
参数 | 默认值 | 描述 | 必填 | |
---|---|---|---|---|
connector | null | 使用的连接器,可选择pulsar和upsert-pulsar。 | 是 | |
topic | null | 输入或输出的 Topic。如果有多个 Topic,使用半角逗号 (, 连接。与 topic-pattern 参数互斥。 |
否 | |
topic-pattern | null | 使用正则获得匹配的 Topic。与topic 参数互斥。 |
否 | |
service-url | null | Pulsar broker 的服务地址。如:pulsar://localhost:6650 | 是 | |
admin-url | null | Pulsar Admin 的服务地址。如:http://localhost:8080 | 是 | |
scan.startup.mode | latest | Source 的启动模式。支持 earliest 、latest 、external-subscription 、timestamp 、specific-offsets 选项。earliest :从最早可用的位置开始消费;latest :从最新可用的位置开始消费;external-subscription :从订阅组的消费位置开始消费;timestamp :从指定的消息发布时间的位置开始消费;specific-offsets :从指定的offset开始消费 |
否 | 否 |
scan.startup.sub-name | null | 当使用 external-subscription 参数时,必须设置该参数。 |
否 | |
scan.startup.timestamp-millis | null | 消费消息的时间offset,该时间为消息发布时间,单位毫秒。当使用 timestamp 参数时,必须设置该参数。 |
否 | |
scan.startup.specific-offsets | null | 指定消费消息的offset, 格式:LedgerId | EntryId|PartitionIndex|Topic(full);LedgerId|EntryId|PartitionIndex|Topic(full), e.g. 42|1012|0|persistent://public/default/topic1;44|1011|1|persistent://public/default/topic1 或当只有一个topic时:LedgerId|EntryId|PartitionIndex。当使用 specific-offsets 参数时,必须设置该参数。 |
否 | |
partition.discovery.interval-millis | null | 分区发现的时间间隔,单位为毫秒。 | 否 | |
sink.message-router | key-hash | 写消息到 Pulsar 分区的路由方式。支持 key-hash 、 round-robin 、自定义 MessageRouter 实现类的引用路径。 |
否 | |
sink.semantic | at-least-once | Sink 写出消息的保障级别。支持 at-least-once 、exactly-once 、none 选项。 |
否 | |
properties | empty | Pulsar 可选的配置集,格式为 properties.key='value' 。有关详细信息,参见streamAPI配置参数。 |
否 | |
key.format | null | Pulsar 消息的键序列化格式。支持 raw 、avro 、json 等格式。 |
否 | |
key.fields | null | 序列化键时需要使用的 SQL 定义字段。如有多个字段,使用半角逗号(,)连接。 | 否 | |
key.fields-prefix | null | 为键格式的所有字段定义一个自定义前缀,以避免名称与值格式的字段冲突。默认情况下,前缀为空。如果定义了自定义前缀,则 Table 模式和 'key.fields' 都将使用带前缀的名称。构造密钥格式的数据类型时,前缀将被删除,并且密钥格式内使用非前缀名称。 |
否 | |
format或value.format | null | Pulsar 消息正文的序列化格式。支持 json 、avro 等格式。有关详细信息,参见 Flink 格式 |
是 | |
value.fields-include | ALL | Pulsar 消息正文包含的字段策略。支持 ALL 和 EXCEPT_KEY 选项。 |
否 | |
generic | true | 是否flink原生的表。如果不是原生的表,会使用当前的库名+'/'+表名作为topic名称;否则topic名称为表名 | 否 |
Pulsar 消息的元数据配置
METADATA
标志用于读写 Pulsar 消息中的元数据,如下所示
说明
R/W 列定义了元数据字段是否可读(R)和/或可写(W)。只读列必须声明为 VIRTUAL,以便在 INSERT INTO 操作中排除它们。**
元数据 | 数据类型 | 描述 | R/W |
---|---|---|---|
topic | STRING NOT NULL | Pulsar 消息所在的 topic 的名称。 | R |
messageId | BYTES NOT NULL | Pulsar 消息 ID。 | R |
sequenceId | BIGINT NOT NULL | Pulsar 消息的序列号。 | R |
publishTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的发布时间。 | R |
eventTime | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL | Pulsar 消息的生成时间。 | R/W |
properties | MAP |
Pulsar 消息的扩展信息。 | R/W |
streamAPI配置参数
该参数对应StreamAPI中的FlinkPulsarSource、FlinkPulsarSink构造参数中的Properties对象,Table模式下的配置properties参数。在 DDL 语句中,用户可以在参数前添加'properties.'来使用下述配置:
'properties.pulsar.reader.subscriptionRolePrefix' = 'pulsar-flink-',
'properties.pulsar.producer.sendTimeoutMs' = '30000',
参数 | 默认值 | 描述 | 生效范围 |
---|---|---|---|
topic | null | Pulsar Topic。 | source |
topics | null | 使用半角逗号(,)连接的多个 Pulsar Topic。 | source |
topicspattern | null | 使用 Java 正则匹配多的多个 pulsar Topic。 | source |
partition.discovery.interval-millis | -1 | 自动发现增减 Topic,单位为毫秒。取值为-1,则表示禁用该功能。 | source |
clientcachesize | 100 | Pulsar 客户端的缓存数量。 | source、sink |
auth-plugin-classname | null | Pulsar 客户端的鉴权类。 | source、sink |
auth-params | null | Pulsar 客户端的鉴权参数。 | source、sink |
flushoncheckpoint | true | 在 Flink snapshotState 时,向 Pulsar Topic 中写入消息。 | sink |
failonwrite | false | Sink 出错时,继续确认消息。 | sink |
polltimeoutms | 120000 | 等待获取下一条消息的超时时间,单位为毫秒。 | source |
pulsar.reader.fail-on-data-loss | true | 数据丢失时,是否失败。 | source |
pulsar.reader.use-earliest-when-data-loss | false | 数据丢失时,使用earliest重置offset。 | source |
commitmaxretries | 3 | 向 Pulsar 消息偏移 offset 时,最大重试次数。 | source |
send-delay-millisecond | 0 | 延迟消息发送(毫秒),仅限于TableApi,StreamApi请参考PulsarSerializationSchema.setDeliverAtExtractor |
Sink |
scan.startup.mode | latest | 消费消息的位置。支持 earliest 和 latest 选项。 |
source |
enable-key-hash-range | false | 开启 Pulsar Key-Shared 订阅模式。 | source |
pulsar.reader.* | Pulsar reader 的详细配置。有关详细信息,参见 Pulsar Reader。 | source | |
pulsar.reader.subscription-role-prefix | flink-pulsar- | 未指定订阅者时,自动创建订阅者名称的前缀。 | source |
pulsar.reader.receiverQueueSize | 1000 | 接收队列大小。 | source |
pulsar.producer.* | Pulsar producer 的详细配置。有关详细信息,参见 Pulsar Producer。 | Sink | |
pulsar.producer.sendTimeoutMs | 30000 | 发送消息时的超时时间,单位为毫秒。 | Sink |
pulsar.producer.blockIfQueueFull | false | Producer 写入消息的队列满时,支持阻塞方法,而不是抛出异常。 | Sink |
pulsar.reader.*
和 pulsar.producer.*
定义配置 Pulsar 行为的详细信息。星号(*)可以替换为 Pulsar 中的配置名,有关详细信息,参见 Pulsar Reader和 Pulsar Producer。
认证配置
需要设置 properties.auth-plugin-classname、properties.auth-params 参数。
CREATE TABLE pulsar (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/topic82547611',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest',
'properties.auth-plugin-classname' = 'org.apache.pulsar.client.impl.auth.AuthenticationToken',
'properties.auth-params' = 'token:xxxxxxxxxx',
)
有关认证的详细信息,参见 Pulsar Security。
Upsert Pulsar
Flink 社区用户对 Upsert 模式消息队列有很高的需求,主要原因有三个:
- 将 Pulsar Topic 解释为一个 changelog 流,它将带有键的记录解释为 upsert 事件;
- 作为实时管道的一部分,将多个流连接起来进行充实,并将结果存储在 Pulsar Topic 中,以便进一步的计算。但结果可能包含更新事件。
- 作为实时管道的一部分,聚合数据流并将结果存储在 Pulsar Topic 中,以便进一步计算。但是结果可能包含更新事件。
基于这些需求,我们也实现了对 Upsert Pulsar 的支持。该功能支持用户以 upsert 的方式从 Pulsar Topic 中读取数据和向 Pulsar Topic 中写入数据。
在 SQL DDL 定义中,用户将 connector
设置为 upsert-pulsar
,即可使用 Upsert Pulsar 连接器。
在配置方面,必须指定 Table 的主键,且不能使用 key.fields
、key.fields-prefix
。
作为 source,Upsert Pulsar 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,如果存在这个 key(,数据记录中的 value 是同一键的最后一个值的 UPDATE。如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录是 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同键的已存在行都会被覆盖。另外,值为空的消息将会被视作为 DELETE 消息。
作为 sink,Upsert Pulsar 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Pulsar 消息写入,并将 DELETE 数据作为 value 为空的 Pulsar 消息写入(表示对应键的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
以上内容对您是否有帮助?