混编

现已支持ddl写法与metahub写法混合在同一个任务里,因此同样一个任务可以有四种写法:

  • ddl source -> ddl sink
  • ddl source -> metahub sink
  • metahub source -> metahub sink
  • metahub source -> ddl sink

1. ddl source -> metahub sink

CREATE TABLE kafka_source (
    userid string,
    text string,
    ts TIMESTAMP(3),
    ts_str string,
    `action` string,
    appver string

) WITH (
  'connector' = 'kafka',
  'topic' = 'wt-kafka-magina',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
  'format' = 'json',
  'properties.group.id' = 'test2'
);

-- set 'kafka_source.topic' = 'wt-kafka-magina';

SET 'hdfs_test1.path' = 'hdfs://bdms-test/user/sloth/lyb_hdfs_metahub';
SET 'hdfs_test1.is.related.mammunt' = 'false';
SET 'hdfs_test1.format' = 'parquet';
set 'hdfs_test1.connector' = 'filesystem';
SET 'hdfs_test1.krb.keytab' = 'sloth.keytab';
SET 'hdfs_test1.krb.principal' = 'sloth/dev@BDMS.163.COM';
set 'hdfs_test1.krb.conf' = 'krb5.conf';
set 'hdfs_test1.is.file.name.encrypt' = 'true';
set 'hdfs_test1.auth.method' = 'kerberos';

INSERT INTO sloth_hdfs_krb.mem.hdfs_test1
SELECT v2.userid,
v2.action,
v2.appver,
v2.ts_str,
cast('111' as double) as logtime
 FROM kafka_source  v2;

2. metahub source -> ddl sink

set 'source1.topic'='lyb-source';
set 'source1.value.format' = 'json';

CREATE TABLE kafka_sink (
    userid string,
    text string,
    ts TIMESTAMP(3),
    ts_str string,
    `action` string,
    appver string
) WITH (
  'connector' = 'kafka',
  'topic' = 'lyb-sink',
  'scan.startup.mode' = 'latest-offset',
  'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
  'format' = 'json',
  'properties.group.id' = 'test2'
);

INSERT INTO kafka_sink
SELECT v2.userid,
v2.action,
v2.appver,
v2.ts_str
 FROM wt_kafka_test.source1  v2;