Upsert Kafka SQL 连接器
更新时间: 2021-11-23 10:41:41
阅读 1458
Upsert Kafka 连接器
Flink-1.12
- 官方文档
upsert
简述
Upsert Kafka 连接器支持以 upsert 方式从 Kafka topic 中读取数据并将数据写入 Kafka topic。
作为 source,upsert-kafka 连接器生产 changelog 流,其中每条数据记录代表一个更新或删除事件。更准确地说,数据记录中的 value 被解释为同一 key 的最后一个 value 的 UPDATE,如果有这个 key(如果不存在相应的 key,则该更新被视为 INSERT)。用表来类比,changelog 流中的数据记录被解释为 UPSERT,也称为 INSERT/UPDATE,因为任何具有相同 key 的现有行都被覆盖。另外,value 为空的消息将会被视作为 DELETE 消息。
作为 sink,upsert-kafka 连接器可以消费 changelog 流。它会将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。
示例
- 必须设置’table.append-only’ = ‘false’
- 必须指定主键:
- PRIMARY KEY (user_region) NOT ENFORCED
- 元数据的方式:set ‘XXXX.key.fields’ = ‘field1;field2’
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'value.format' = 'json',
'table.append-only' = 'false'
);
CREATE TABLE sink (
`app` VARCHAR,
uv BIGINT,
pv BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior_join',
'properties.bootstrap.servers' = '10.122.173.110:9292',
'table.append-only' = 'false',
'properties.group.id' = 'test',
'format' = 'json'
);
参数
参数 | 是否必填 | 默认值 | 备注 |
---|---|---|---|
connector | 必填 | 无 | 必须为:kafka |
topic | 必填 | 无 | kafka 读写的 topic。 允许为一个列表,e.g. ‘topic-1;topic-2’ 。 注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。 |
properties.bootstrap.servers | 必填 | 无 | 逗号分隔的 Kafka broker 列表 |
key.format | 必填 | 无 | 用来序列化或反序列化 Kafka 消息键(Key)的格式。 请参阅 formats目录下的配置 以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 ‘key.fields’ 也是必需的。 否则 Kafka 记录将使用空值作为键。因 avro 格式不支持主键,而 upsert 必须指定主键,所以不支持 avro。 |
key.fields | 可选 | 无 | 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 ‘field1;field2’。 |
key.fields-prefix | 可选 | 无 | 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。 |
value.format | 必填 | 无 | 用来序列化或反序列化 Kafka 消息键(Key)的格式。 请参阅 formats目录下的配置 以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘format’ 二者必需其一。因 avro 格式不支持主键,而 upsert 必须指定主键,所以不支持 avro。 |
value.fields-include | 可选 | ALL | 可选值:[ALL, EXCEPT_KEY]. 定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。 |
sink.parallelism | 可选 | 无 | 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
parallelism | 可选 | 无 | 指定kafka source 并行度 |
文档反馈
以上内容对您是否有帮助?