Debezium Format

Debezium Format是CDC(Changelog Data Capture)工具,能在实时系统中处理mysql binlog数据。内部默认使用JSON和Avro进行序列化。

应用场景:

  1. 从数据库同步增量数据到其他系统
  2. 审计
  3. 数据库的实时物化视图
  4. 与数据库历史数据temporal join
  1. INSERT(+I)
  2. DELETE(-D)
  3. UPDATE_BEFORE(-U)
  4. UPDATE_ARTER(+U)

Debezium格式

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  },
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null
}

用例

CREATE TABLE topic_products (
  -- schema  MySQL  products 表完全相同
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
  -- 使用 'debezium-json' format 来解析 Debezium  JSON 消息
  -- 如果 Debezium  Avro 编码消息,请使用 'debezium-avro-confluent'
 'format' = 'debezium-json'  -- 如果 Debezium  Avro 编码消息,请使用 'debezium-avro-confluent'
)
SET 'topic_products.format'='debezium-json';

Debezium Metadata

下面Key可以作为虚拟字段定义在DDL中

Key 类型 描述
schema STRING NULL 使用Json String表示Schema
ingestion-timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NULL connector 处理数据的时间,对应ts_ms字段
source.timestamp TIMESTAMP(3) WITH LOCAL TIME ZONE NULL 原系统处理数据的时间,对应source.ts_ms字段
source.database STRING NULL 数据来源于database,对应source.db字段
source.schema STRING NULL 数据来源于schema,对应source.schema字段
source.table STRING NULL 数据来源于table,对应source.table字段
source.properties MAP NULL 数据来源的配置,对应source字段

Format参数

参数 是否必填 默认值 类型 描述
format 必填 none String 'debezium-json'
debezium-avro-confluent.basic-auth.credentials-source 选填 (none) String Basic auth credentials source for Schema Registry
debezium-avro-confluent.basic-auth.user-info 选填 (none) String Basic auth user info for schema registry
debezium-avro-confluent.bearer-auth.credentials-source 选填 (none) String Bearer auth credentials source for Schema Registry
debezium-avro-confluent.bearer-auth.token 选填 (none) String Bearer auth token for Schema Registry
debezium-avro-confluent.ssl.keystore.location 选填 (none) String Location / File of SSL keystore
debezium-avro-confluent.ssl.keystore.password 选填 (none) String Password for SSL keystore
debezium-avro-confluent.ssl.truststore.location 选填 (none) String Location / File of SSL truststore
debezium-avro-confluent.ssl.truststore.password 选填 (none) String Password for SSL truststore
debezium-avro-confluent.schema-registry.subject 选填 (none) String The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '-value' or '-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.
debezium-avro-confluent.schema-registry.url required (none) String The URL of the Confluent Schema Registry to fetch/register schemas.