Debezium Format
更新时间: 2023-04-24 20:25:04
阅读 268
Debezium Format
Debezium Format是CDC(Changelog Data Capture)工具,能在实时系统中处理mysql binlog数据。内部默认使用JSON和Avro进行序列化。
应用场景:
- 从数据库同步增量数据到其他系统
- 审计
- 数据库的实时物化视图
- 与数据库历史数据temporal join
Flink RowKind
- INSERT(+I)
- DELETE(-D)
- UPDATE_BEFORE(-U)
- UPDATE_ARTER(+U)
Debezium格式
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
用例
CREATE TABLE topic_products (
-- schema 与 MySQL 的 products 表完全相同
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
-- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
'format' = 'debezium-json' -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
)
SET 'topic_products.format'='debezium-json';
Debezium Metadata
下面Key可以作为虚拟字段定义在DDL中
Key | 类型 | 描述 |
---|---|---|
schema | STRING NULL | 使用Json String表示Schema |
ingestion-timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NULL | connector 处理数据的时间,对应ts_ms字段 |
source.timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NULL | 原系统处理数据的时间,对应source.ts_ms字段 |
source.database | STRING NULL | 数据来源于database,对应source.db字段 |
source.schema | STRING NULL | 数据来源于schema,对应source.schema字段 |
source.table | STRING NULL | 数据来源于table,对应source.table字段 |
source.properties | MAP |
数据来源的配置,对应source字段 |
Format参数
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format | 必填 | none | String | 'debezium-json' |
debezium-avro-confluent.basic-auth.credentials-source | 选填 | (none) | String | Basic auth credentials source for Schema Registry |
debezium-avro-confluent.basic-auth.user-info | 选填 | (none) | String | Basic auth user info for schema registry |
debezium-avro-confluent.bearer-auth.credentials-source | 选填 | (none) | String | Bearer auth credentials source for Schema Registry |
debezium-avro-confluent.bearer-auth.token | 选填 | (none) | String | Bearer auth token for Schema Registry |
debezium-avro-confluent.ssl.keystore.location | 选填 | (none) | String | Location / File of SSL keystore |
debezium-avro-confluent.ssl.keystore.password | 选填 | (none) | String | Password for SSL keystore |
debezium-avro-confluent.ssl.truststore.location | 选填 | (none) | String | Location / File of SSL truststore |
debezium-avro-confluent.ssl.truststore.password | 选填 | (none) | String | Password for SSL truststore |
debezium-avro-confluent.schema-registry.subject | 选填 | (none) | String | The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use ' |
debezium-avro-confluent.schema-registry.url | required | (none) | String | The URL of the Confluent Schema Registry to fetch/register schemas. |
文档反馈
以上内容对您是否有帮助?