Sink 表
更新时间: 2023-04-24 20:25:16
阅读 155
Sink 表
支持的目标端
mysql、kafka、redis、hbase、kudu、hdfs、es等
使用方式
目标表不需要在数仓定义表 schema 信息,只需定义数据源即可
insert into [catalog].[database].[table] select * from source;
数据源类型 | Catalog | database | table |
---|---|---|---|
Kafka | 数据源登记的名称 | mem | 任意 |
MySQL | 数据源登记的名称 | 数据库中真实的 database | 数据库中真实的表名 |
Kudu | 数据源登记的名称 | kudu 表名规范:impala::[database].[tableName] | kudu 表名规范:impala::[database].[tableName] |
HBase | 数据源登记的名称 | HBase 的 namespace | hbase中真实的表名 |
Redis | 数据源登记的名称 | mem | 操作 redis 的命令:RPUSH、LPUSH、SADD、SET、PFADD、PUBLISH、ZADD、ZREM、HSET、INCRBY、DECRBY |
HDFS | 数据源登记的名称 | mem | 任意 |
ES | 数据源登记的名称 | mem | 任意 |
案例
SQL 案例1: source-kafka-sink-kafka
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
set 'tempTable.connector.topic'='lz_common_sink';
set 'tempTable.format.type' = 'json';
create view vi as select * from music_useraction_queue_3;
-- 因为tempTable这张流表没有登记到元数据中心,作为一张sink表,可以将database指定为mem,在Set中指定kafka topic。
insert into wt_kafka_test.mem.tempTable
select * from vi;
SQL 案例2: source-kafka-sink-mysql
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- mysql 有database的概念,所以sloth_wangtao便是mysql中真是的database。mysql表引用规范:[mysqlCatalog].[mysqlDatabase].[mysqlTableName]
INSERT INTO sloth_mysql_test.sloth_wangtao.sink_ua_os_copy
SELECT v2.os, count(0) as os_count
FROM music_useraction_queue_3 as v2
group by v2.os;
SQL 案例3: source-kafka-sink-kudu
-- music_useraction_queue_3表的配置信息
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- kudu表的配置信息
SET 'sloth_kudu_ua.primary.keys' = 'v_string';
SET 'sloth_kudu_ua.update-mode' = 'upsert';
CREATE VIEW v2 AS
SELECT os, userid, appver,logtime
from music_useraction_queue_3;
-- 因为kudu没有database的概念,所以在表名中区分database。kudu表名规范:impala::[database].[tableName]
INSERT INTO sloth_kudu_yxx.sloth_kudu_db.sloth_kudu_ua
(v_string, v_long )
SELECT v2.userid as v_string, cast(v2.userid as bigint) as v_long FROM v2;
SQL 案例4: source-kafka-sink-hbase
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- 如果不指定schema,最终hbase column famliy里的字段将会是exp$0,exp$1...
set 'sloth_ua.format.schema' ='ROW<user_id VARCHAR, base_info ROW<deviceid VARCHAR, os varchar, logtime bigint>>';
set 'sloth_ua.connector.version' = '2.2.1';
set 'sloth_ua.connector.table-name' = 'sloth_ua';
-- hbase table 引用的规范:[hbasecatalogname].[namespace].[table]
INSERT INTO sloth_hbase_v_two_test.`default`.sloth_ua
SELECT userid as user_id, ROW(deviceid, os, logtime) as base_info FROM music_useraction_queue_3 ;
SQL 案例5: source-kafka-sink-redis
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- 使用redis command作为表名,表名:SET。
-- 设置redis db index,默认0.
set 'SET.db.index' ='5';
set 'SET.primary.key' = 'userid';
-- 单位秒
set 'SET.ttl' = '60';
INSERT INTO sloth_redis_test.mem.`SET`
SELECT userid, deviceid FROM music_useraction_queue_3;
SQL案例6: source-kafka-sink-es
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connector.startup-mode' = 'latest-offset';
SET 'sloth_es_ua.connector.index' = 'sloth_es_ua_dev';
INSERT INTO sloth_es_seven_test.mem.sloth_es_ua
SELECT * FROM music_useraction_queue_3;
文档反馈
以上内容对您是否有帮助?