在流表管理中,消息中间件作为实时数仓构建过程中的核心媒介,由于其序列化方式不同,对应数据的解析结果会存在较大差异。本文将围绕通过消息中间件构建流表进行展开说明。

序列化方式

目前,实时计算平台支持jsoncsvavrocanal-jsondebezium-jsonmaxwell-json6种序列化方式,下表列出了各序列化方式支持的消息队列数据源类型和配置时的特殊规则。

序列化方式 支持的数据源类型 主键规则 特殊字段类型
JSON Kafka、Pulsar、RocketMQ 不可设置主键 Map 的 Key 必须是 String
CSV Kafka、Pulsar 不可设置主键 不支持 Map 字段类型;只支持一层嵌套字段
Avro Kafka、Pulsar 不可设置主键 - Map 的 Key 必须是 String;
- Timestamp 精度需小于等于3;
- 字段名称1-128字符,只能包含字母、数字、下划线,以字母开头;
Canal-JSON Kafka、Pulsar 可设置多个主键 Map 的 Key 必须是 String
Debezium-JSON Kafka、Pulsar 可设置多个主键 Map 的 Key 必须是 String
Maxwell-JSON Kafka、Pulsar 可设置多个主键 Map 的 Key 必须是 String

数据类型映射关系

以Kafka为例,Kafka 将消息键值以二进制进行存储,因此 Kafka 并不存在 schema 或数据类型。Kafka 消息使用格式配置进行序列化和反序列化,例如 json,csv,avro等。 因此,数据类型映射取决于使用的格式。可以参阅以下表格或Apache Flink Documentation 以获取更多细节。

1. JSON

目前 JSON Schema 将会自动从 Table Schema 之中自动推导得到。不支持显式地定义 JSON Schema。在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL 类型 JSON 类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONE string with format: date-time (with UTC time zone)
INTERVAL number
ARRAY array
MAP / MULTISET object
ROW object

2. CSV

目前 CSV 的 Schema 都是从 Table Schema 推断而来的。暂不支持显式地定义 CSV Schema 。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。

下面的表格列出了flink数据和CSV数据的对应关系。

Flink SQL 类型 CSV 类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY string with encoding: base64
DECIMAL number
TINYINT number
SMALLINT number
INT number
BIGINT number
FLOAT number
DOUBLE number
DATE string with format: date
TIME string with format: time
TIMESTAMP string with format: date-time
INTERVAL number
ARRAY array
ROW object

3. Avro

目前,Avro Schema 通常是从 Table Schema 中推导而来。尚不支持显式定义 Avro schema。因此,下表列出了从 Flink 类型到 Avro 类型的类型映射。

Flink SQL 类型 Avro 类型 Avro 逻辑类型
CHAR / VARCHAR / STRING string
BOOLEAN boolean
BINARY / VARBINARY bytes
DECIMAL fixed decimal
TINYINT int
SMALLINT int
INT int
BIGINT long
FLOAT float
DOUBLE double
DATE int date
TIME int time-millis
TIMESTAMP long timestamp-millis
ARRAY array
MAP (key 必须是 string/char/varchar 类型) map
MULTISET (元素必须是 string/char/varchar 类型) map
ROW record

4. Canal-JSON

目前,Canal Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。

5. Debezium-JSON

目前,Canal Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。

6. Maxwell-JSON

目前,Maxwell Format 使用 JSON Format 进行序列化和反序列化,请参阅上方 JSON 序列化方式说明。