Canal Format
更新时间: 2023-12-29 15:26:55
阅读 1556
Canal Format
Canal Format是CDC(Changelog Data Capture)工具,能在实时系统中处理mysql binlog数据。内部默认使用JSON和Protobuf进行序列化。
应用场景:
- 从数据库同步增量数据到其他系统
- 审计
- 数据库的实时物化视图
- 与数据库历史数据temporal join
Flink RowKind
- INSERT(+I)
- DELETE(-D)
- UPDATE_BEFORE(-U)
- UPDATE_ARTER(+U)
Canal格式
{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}
MySQL products 表有4列(id,name,description 和 weight)。上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。
用例
CREATE TABLE topic_products (
-- schema is totally the same to the MySQL "products" table
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- using canal-json as the format
)
SET 'topic_products.format'='canal-json';
Format参数
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必填 | none | String | 'canal-json' |
canal-json.ignore-parse-errors | 选填 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 |
canal-json.timestamp-format.standard | 选填 | 'SQL' | String | 指定输入和输出时间戳格式。当前支持的值是 'SQL' 和 'ISO-8601':选项 'SQL' 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。选项 'ISO-8601' 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。 |
canal-json.map-null-key.mode | 选填 | 'FAIL' | String | 指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL', 'DROP' 和 'LITERAL':Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'canal-json.map-null-key.literal' 定义。 |
canal-json.map-null-key.literal | 选填 | 'null' | String | 当 'canal-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 |
canal-json.database.include | 选填 | none | String | 仅读取指定数据库的 changelog 记录(通过对比 Canal 记录中的 "database" 元数据字段) |
canal-json.table.include | 选填 | none | String | 仅读取指定表的 changelog 记录(通过对比 Canal 记录中的 "table" 元数据字段)。 |
文档反馈
以上内容对您是否有帮助?