0gg Format

仅1.14支持

Oracle cdc商业化工具 GoldenGate 输出到kafka后的序列化格式,可以基于kafka连接器对该数据格式进行读写

数据格式样例

INSERT
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"I","op_ts":"2022-12-30 16:34:44.000097","current_ts":"2022-12-30T16:34:47.913000","pos":"00000000050008785432","after":{"ID":1,"NAME":"name1","AGE":1}}
UPDATE
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"U","op_ts":"2022-12-30 16:35:11.000101","current_ts":"2022-12-30T16:35:16.948000","pos":"00000000050008785722","before":{"ID":1,"NAME":"name1","AGE":1},"after":{"ID":1,"NAME":"name1_new","AGE":1}}
DELETE
{"table":"HELLO_HELLO_1.POC_TEST1","op_type":"D","op_ts":"2022-12-30 16:35:42.000091","current_ts":"2022-12-30T16:35:46.978000","pos":"00000000050008785833","before":{"ID":1,"NAME":"name1_new","AGE":1}}

用例

set 'table.exec.source.cdc-events-duplicate' = 'false';  // 该配置需要加上
create table source (
    ID  bigint,
    NAME string,
    AGE    int,
    PRIMARY KEY(ID) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'mytopic',
    'scan.startup.mode' = 'latest-offset',
    'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
    'format' = 'ogg-json'
);
create table sink (
    id  bigint,
    name string,
    age    int
) WITH (
  'connector' = 'print'
);
insert into source select - from sink;

Format参数

Option Required Default Type Description
format
required (none) String Specify what format to use, here should be 'ogg-json'.
ogg-json.ignore-parse-errors
optional false Boolean Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors.
ogg-json.timestamp-format.standard
optional 'SQL' String Specify the input and output timestamp format. Currently supported values are 'SQL' and 'ISO-8601':
  • Option 'SQL' will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.
  • Option 'ISO-8601'will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.
ogg-json.map-null-key.mode
optional 'FAIL' String Specify the handling mode when serializing null keys for map data. Currently supported values are 'FAIL', 'DROP' and 'LITERAL':
  • Option 'FAIL' will throw exception when encountering map with null key.
  • Option 'DROP' will drop null key entries for map data.
  • Option 'LITERAL' will replace null key with string literal. The string literal is defined by ogg-json.map-null-key.literal option.
ogg-json.map-null-key.literal
optional 'null' String Specify string literal to replace null key when 'ogg-json.map-null-key.mode' is LITERAL.