混编
更新时间: 2023-04-24 20:24:05
阅读 127
混编
现已支持ddl写法与metahub写法混合在同一个任务里,因此同样一个任务可以有四种写法:
- ddl source -> ddl sink
- ddl source -> metahub sink
- metahub source -> metahub sink
- metahub source -> ddl sink
1. ddl source -> metahub sink
CREATE TABLE kafka_source (
userid string,
text string,
ts TIMESTAMP(3),
ts_str string,
`action` string,
appver string
) WITH (
'connector' = 'kafka',
'topic' = 'wt-kafka-magina',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
'format' = 'json',
'properties.group.id' = 'test2'
);
-- set 'kafka_source.topic' = 'wt-kafka-magina';
SET 'hdfs_test1.path' = 'hdfs://bdms-test/user/sloth/lyb_hdfs_metahub';
SET 'hdfs_test1.is.related.mammunt' = 'false';
SET 'hdfs_test1.format' = 'parquet';
set 'hdfs_test1.connector' = 'filesystem';
SET 'hdfs_test1.krb.keytab' = 'sloth.keytab';
SET 'hdfs_test1.krb.principal' = 'sloth/dev@BDMS.163.COM';
set 'hdfs_test1.krb.conf' = 'krb5.conf';
set 'hdfs_test1.is.file.name.encrypt' = 'true';
set 'hdfs_test1.auth.method' = 'kerberos';
INSERT INTO sloth_hdfs_krb.mem.hdfs_test1
SELECT v2.userid,
v2.action,
v2.appver,
v2.ts_str,
cast('111' as double) as logtime
FROM kafka_source v2;
2. metahub source -> ddl sink
set 'source1.topic'='lyb-source';
set 'source1.value.format' = 'json';
CREATE TABLE kafka_sink (
userid string,
text string,
ts TIMESTAMP(3),
ts_str string,
`action` string,
appver string
) WITH (
'connector' = 'kafka',
'topic' = 'lyb-sink',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
'format' = 'json',
'properties.group.id' = 'test2'
);
INSERT INTO kafka_sink
SELECT v2.userid,
v2.action,
v2.appver,
v2.ts_str
FROM wt_kafka_test.source1 v2;
文档反馈
以上内容对您是否有帮助?