Canal Format

Canal Format是CDC(Changelog Data Capture)工具,能在实时系统中处理mysql binlog数据。内部默认使用JSON和Protobuf进行序列化。

应用场景:

  1. 从数据库同步增量数据到其他系统
  2. 审计
  3. 数据库的实时物化视图
  4. 与数据库历史数据temporal join
  1. INSERT(+I)
  2. DELETE(-D)
  3. UPDATE_BEFORE(-U)
  4. UPDATE_ARTER(+U)

Canal格式

{
  "data": [
    {
      "id": "111",
      "name": "scooter",
      "description": "Big 2-wheel scooter",
      "weight": "5.18"
    }
  ],
  "database": "inventory",
  "es": 1589373560000,
  "id": 9,
  "isDdl": false,
  "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
  },
  "old": [
    {
      "weight": "5.15"
    }
  ],
  "pkNames": [
    "id"
  ],
  "sql": "",
  "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
  },
  "table": "products",
  "ts": 1589373560798,
  "type": "UPDATE"
}

MySQL products 表有4列(id,name,description 和 weight)。上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

用例

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'canal-json'  -- using canal-json as the format
)
SET 'topic_products.format'='canal-json';

Format参数

参数 是否必填 默认值 类型 描述
format 必填 none String ‘canal-json’
canal-json.ignore-parse-errors 选填 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
canal-json.timestamp-format.standard 选填 ‘SQL’ String 指定输入和输出时间戳格式。当前支持的值是 ‘SQL’ 和 ‘ISO-8601’:选项 ‘SQL’ 将解析 “yyyy-MM-dd HH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30 12:13:14.123’,并以相同格式输出时间戳。选项 ‘ISO-8601’ 将解析 “yyyy-MM-ddTHH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30T12:13:14.123’,并以相同的格式输出时间戳。
canal-json.map-null-key.mode 选填 ‘FAIL’ String 指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:Option ‘FAIL’ 将抛出异常,如果遇到 Map 中 key 值为空的数据。Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘canal-json.map-null-key.literal’ 定义。
canal-json.map-null-key.literal 选填 ‘null’ String 当 ‘canal-json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
canal-json.database.include 选填 none String 仅读取指定数据库的 changelog 记录(通过对比 Canal 记录中的 “database” 元数据字段)
canal-json.table.include 选填 none String 仅读取指定表的 changelog 记录(通过对比 Canal 记录中的 “table” 元数据字段)。