数仓建表中定义嵌套JSON字段

数据样例

{
    "funcName":"1001",
    "flink":456,
    "mykey":1.23,
    "isSuccess":true,
    "mytimestamp":1646287937000,
    "mytype":1,
    "mydata":{
        "snapshots":[
            {
                "content_type":"application/x-gzip-compressed-jpeg",
                "myurl":"https://netease.com/snapshots"
            }
        ],
        "audio":[
            {
                "content_type":"audio/wav",
                "myurl":"https://netease.com/audio"
            }
        ]
    },
    "meta":{
        "video_type":"normal"
    }
}

建表示例

当我们通过DDL方式建表时,根据上面JSON串的格式,我们定义表结构如下:

CREATE TABLE kafka_source (
  funcName STRING,
  flink INT,
  mykey DOUBLE,
  isSuccess BOOLEAN,
  mytimestamp BIGINT,
  mytype INT,
  mydata ROW <snapshots ARRAY<ROW<content_type STRING,myurl STRING>>,audio ARRAY<ROW<content_type STRING,myurl STRING>>>,
  meta MAP<STRING,STRING>,
  ts AS TO_TIMESTAMP(FROM_UNIXTIME(mytimestamp/1000, 'yyyy-MM-dd HH:mm:ss')),
  WATERMARK FOR ts AS ts
) WITH (
  'connector' = 'kafka',
  'format' = 'json',
  .......
);

在通过数仓建表时,普通字段可以正常创建,对于嵌套类型的字段我们只需要选择对应的嵌套类型,再将嵌套内容添加到描述中即可。如下图所示: FAQ-数仓建表中定义嵌套JSON字段 - 图1

演示任务示例

set 'parse_nested_json.scan.startup.mode' = 'latest-offset';
set 'parse_nested_json.properties.group.id' = 'test';

CREATE TABLE print_table (
  funcName STRING,
  flink INT,
  mykey DOUBLE,
  isSuccess BOOLEAN,
  mytimestamp BIGINT,
  mytype STRING,
  scontent_type STRING,
  amyurl STRING,
  metavalue STRING
) WITH (
'connector' = 'print'
);

INSERT INTO print_table
SELECT 
funcName,
flink,
mykey,
isSuccess,
mytimestamp,
CASE mytype WHEN 1 THEN 'ONE' WHEN 2 THEN 'TWO' END AS mytype,
mydata.snapshots[1].content_type AS scontent_type,
mydata.audio[1].myurl AS amyurl,
meta['video_type'] AS metavalue
FROM demo_test.parse_nested_json;

输出结果 FAQ-数仓建表中定义嵌套JSON字段 - 图2


作者:邓崃翔