SQL 语法

本文档大致介绍下SQL相关的一些语法知识,主要着重我们和官方不同的一些东西。

Flink 使用 Apache Calcite 解析 SQL ,它支持标准的 ANSI SQL; 本章节主要关注在我们一些常用的语法以及我们自定义的一些语法上。

另外,我们也兼容了原有的 Flink 1.10 的 DDL 语法,但不支持两者混编。我们通过是否包含 create table 语句来区分 DDL 与元数据任务。

本章节涉及的内容包括 Set 语句使用、关键词转义、表资源定位符、创建临时表、创建 UDF、写入数据等。

Set 语句使用

元数据任务的 Source、Sink、Dim 表已经在数仓进行登记,包含了Table Schema 与 Table Properties。 但是仍有部分 Table Properties 需要在任务中定义。可以通过 Set 语法定义配置信息。

语法

SET '[KEY]' = '[VALUE]';

示例:设置状态超时时间

SET 'sloth.sql.state.idleTime' = '6h';

设置表的属性

SET '[TABLE_NAME].[PROP_KEY]' = '[PROP_VALUE]';

示例:设置 source 表 kafka 表的 group.id

SET 'userlog.connections.group.id' = 'userlog_consumergroup';

特别注意

同一个任务里不要使用两个同名的表,为了实现的简单,SET 语法里是按表名来确认三元组的。如果两张同名的表会有无法预知的问题。

关键词转义

FLINK SQL 使用标准的 ANSI SQL 语法,本身还有大量的关键字,关键字列表参考官方文档 关键词转义 中的 Reserved Keywords 章节,当我们使用字段名或者表名和关键字冲突时需要进行转义,转义方法是在在关键字两边添加”`”符号,如 SQL:

SELECT
  logtime AS `timestamp`,
  'music-mall-visual' AS appName,
  'albumpageview' AS dataSource,
  MAP['albumId', props ['id']] as `data`
FROM
  ods_queue_8

当中的 timestamp 和 data 因为和关键字冲突,所以都进行了转义操作。

表资源定位符

和一般数据库采用 [DB].[TABLE] 来定位一张表不同,我们采用 [CATALOG].[DB].[TABLE] 来定位一张表,Catalog 用来区分不同的数据源,其中 Catalog 是可选项,当用户不选时会使用默认的 Catalog。目前 Easystream 系统默认的 Catalog 为流表的 Catalog (最新版的猛犸的元数据中心为每个产品生成了一个虚拟的 catalog),在数仓模块的元数据登记中登记的流表都属于这个 Catalog。

例1:读取 ipaly_ods 库中的流表 ods_iplay_ua_log

SELECT * FROM ipaly_ods.ods_iplay_ua_log;

等同于

SELECT * FROM [defaultcatalog].ipaly_ods.ods_iplay_ua_log;

例2:写入 kudu 表

SET 'ads_ab_user_group_detail.connections.group.id' = 'ab_user_group';
INSERT INTO `kudu_online`.`kudu_internal`.`ab_user_group_hour_rt`
SELECT
  dt,
  os,
  ab_id,
  `hour`,
  group_id,
  group_type,
  userid
FROM ads_abtest.ads_ab_user_group_detail
;

创建临时表

语法:

CREATE VIEW [TABLE NAME] AS [QUERY STATEMENT];

示例:

CREATE VIEW play_view AS SELECT userid, action, logtime FROM user_action WHERE action = 'play';

创建 UDF

语法:

CREATE FUNCTION [FUNCTION_NAME] AS '[FUNCTION_CLASS]';

示例:

CREATE FUNCTION build_metric AS 'com.netease.da.abtest.rt.udf.MetricTableFunc';

写入数据

语法:

INSERT INTO [CATALOG].[DB].[TABLE] [QUERY STATEMENT]

示例:写入结果数据到 kudu

SET 'ads_ab_user_group_detail.connections.group.id' = 'ab_user_group';
INSERT INTO `kudu_online`.`kudu_internal`.`ab_user_group_hour_rt`
SELECT
  dt,
  os,
  ab_id,
  `hour`,
  group_id,
  group_type,
  userid
FROM ads_abtest.ads_ab_user_group_detail
;