Sink 表

支持的目标端

mysql、kafka、redis、hbase、kudu、hdfs、es等

使用方式

目标表不需要在数仓定义表 schema 信息,只需定义数据源即可

insert into [catalog].[database].[table] select * from source;

数据源类型 Catalog database table
Kafka 数据源登记的名称 mem 任意
MySQL 数据源登记的名称 数据库中真实的 database 数据库中真实的表名
Kudu 数据源登记的名称 kudu 表名规范:impala::[database].[tableName] kudu 表名规范:impala::[database].[tableName]
HBase 数据源登记的名称 HBase 的 namespace hbase中真实的表名
Redis 数据源登记的名称 mem 操作 redis 的命令:RPUSH、LPUSH、SADD、SET、PFADD、PUBLISH、ZADD、ZREM、HSET、INCRBY、DECRBY
HDFS 数据源登记的名称 mem 任意
ES 数据源登记的名称 mem 任意

案例

SQL 案例1: source-kafka-sink-kafka

set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';

set 'tempTable.connector.topic'='lz_common_sink';
set 'tempTable.format.type' = 'json';

create view vi as select * from music_useraction_queue_3;

-- 因为tempTable这张流表没有登记到元数据中心,作为一张sink表,可以将database指定为mem,在Set中指定kafka topic

insert into wt_kafka_test.mem.tempTable
select * from vi;

SQL 案例2: source-kafka-sink-mysql

set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';

-- mysql database的概念,所以sloth_wangtao便是mysql中真是的databasemysql表引用规范:[mysqlCatalog].[mysqlDatabase].[mysqlTableName]

INSERT INTO sloth_mysql_test.sloth_wangtao.sink_ua_os_copy
 SELECT v2.os, count(0) as os_count
 FROM music_useraction_queue_3 as v2
 group by v2.os;

SQL 案例3: source-kafka-sink-kudu

-- music_useraction_queue_3表的配置信息
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- kudu表的配置信息
SET 'sloth_kudu_ua.primary.keys' = 'v_string';
SET 'sloth_kudu_ua.update-mode' = 'upsert';

CREATE VIEW v2 AS
    SELECT os, userid, appver,logtime
    from music_useraction_queue_3;

-- 因为kudu没有database的概念,所以在表名中区分databasekudu表名规范:impala::[database].[tableName]
INSERT INTO sloth_kudu_yxx.sloth_kudu_db.sloth_kudu_ua
 (v_string, v_long )
 SELECT v2.userid as v_string, cast(v2.userid as bigint) as v_long FROM v2;

SQL 案例4: source-kafka-sink-hbase

set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';

-- 如果不指定schema,最终hbase column famliy里的字段将会是exp$0,exp$1...
set 'sloth_ua.format.schema' ='ROW<user_id VARCHAR, base_info ROW<deviceid VARCHAR, os varchar, logtime bigint>>';
set 'sloth_ua.connector.version' = '2.2.1';
set 'sloth_ua.connector.table-name' = 'sloth_ua';

-- hbase table 引用的规范:[hbasecatalogname].[namespace].[table]
INSERT INTO sloth_hbase_v_two_test.`default`.sloth_ua
    SELECT userid as user_id, ROW(deviceid, os, logtime) as base_info  FROM music_useraction_queue_3 ;

SQL 案例5: source-kafka-sink-redis

set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';

-- 使用redis command作为表名,表名:SET
-- 设置redis db index,默认0.
set 'SET.db.index' ='5';
set 'SET.primary.key' = 'userid';
-- 单位秒
set 'SET.ttl' = '60';

INSERT INTO sloth_redis_test.mem.`SET`
 SELECT userid, deviceid FROM music_useraction_queue_3;

SQL案例6: source-kafka-sink-es
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connector.startup-mode' = 'latest-offset';

SET 'sloth_es_ua.connector.index' = 'sloth_es_ua_dev';

INSERT INTO sloth_es_seven_test.mem.sloth_es_ua
 SELECT * FROM music_useraction_queue_3;