Ogg Format
更新时间: 2023-03-02 16:10:32
阅读 169
0gg Format
仅1.14支持
Oracle cdc商业化工具 GoldenGate 输出到kafka后的序列化格式,可以基于kafka连接器对该数据格式进行读写
数据格式样例
INSERT
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"I","op_ts":"2022-12-30 16:34:44.000097","current_ts":"2022-12-30T16:34:47.913000","pos":"00000000050008785432","after":{"ID":1,"NAME":"name1","AGE":1}}
UPDATE
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"U","op_ts":"2022-12-30 16:35:11.000101","current_ts":"2022-12-30T16:35:16.948000","pos":"00000000050008785722","before":{"ID":1,"NAME":"name1","AGE":1},"after":{"ID":1,"NAME":"name1_new","AGE":1}}
DELETE
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"D","op_ts":"2022-12-30 16:35:42.000091","current_ts":"2022-12-30T16:35:46.978000","pos":"00000000050008785833","before":{"ID":1,"NAME":"name1_new","AGE":1}}
用例
set 'table.exec.source.cdc-events-duplicate' = 'false'; // 该配置需要加上
create table source (
ID bigint,
NAME string,
AGE int,
PRIMARY KEY(ID) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'mytopic',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'format' = 'ogg-json'
);
create table sink (
id bigint,
name string,
age int
) WITH (
'connector' = 'print'
);
insert into source select * from sink;
Format参数
文档反馈
以上内容对您是否有帮助?