在开发过程中,我们需要通过Flink SQL代码将实际存在的物理表,以Flink Table的方式进行映射来获得流表,进一步对流表进行逻辑加工和计算最终得到结果,再输出到物理表使数据落地。

流表管理操作 - 图1

为提高任务开发效率,避免同一张物理表在不同任务中需要反复映射,EasyStream 实时计算通过构建 “逻辑流库-逻辑流表”的方式,对 Flink Table 进行统一管理,实现便捷快速的开发体验。

操作步骤

EasyStream 实时计算通过 [数据源.]数据库.表 三元组的方式识别并定位到具体的物理表。流表本身作为物理表的映射,并不归属于任何数据库下,因此我们需要为流表创建一个逻辑的数据库,用于管理并使用流表。

1. 创建逻辑库

  1. 使用实时管理员及以上角色登录平台,选择 实时开发
  2. 选择左侧边栏 实时数仓 ,打开数仓选择页面。
  3. 单击 流表管理操作 - 图2 流表管理,进入流表管理页面。
  4. 单击 流表管理操作 - 图3 创建库 , 进行逻辑库创建。

流表管理操作 - 图4

说明: 库名项目内唯一,建议根据 业务主题域实时数仓分层 进行库名定义。

2. 创建流表

  1. 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
  2. 单击 创建表 按键,进行表创建。
  3. 单击 保存 ,完成流表注册。

文档以Kafka类型流表创建作为样例进行说明。

表单参数 说明
数据库 当前流表归属的逻辑库,默认置灰不可修改。用户可通过切换逻辑库重新创建表来更改数据库的值。
表名 流表名称,任务开发时将直接使用此名称。
描述 流表描述,通常用于描述流表的业务含义。
数据源类型 支持选择不同类型的数据源,所选类型会限制下方 数据源 选项内容。
数据源 选择对应类型下已登记且当前项目有权限的数据源名称,平台将自动拉取对应链接信息。
Topic 手动输入需要映射的Kafka Topic名称,存在多个数据结构相同的Topic时可通过 ; 进行分隔。
说明: 通常多个Topic注册一个流表使用在流表场景。
序列化方式 Kafka Topic内数据的编译方式,支持Json、csv、avro、string、ua、ndc、canal-json、debezium-json、maxwell-json 等方式。
说明:流表序列化方式需要与Topic数据实际序列化方式保持一致。
字段信息获取方式 支持 DDL解析数据解析从数据库获取 三种方式。
所有流表数据源类型均支持自动解析的方式。
Kafka、RocketMQ、Pulsar数据源类型且序列化方式为 json、canal-json、debezium-json、maxwell-json的数据支持数据解析的方式。
MySQL、Oracle、Elasticsearch、HBase、Kudu 数据源类型支持从数据库获取的方式。
字段信息 支持用户手动设置字段名与类型,当字段类型为array、map、row类型时支持设置嵌套字段,
说明:目前嵌套层数最多为5层,具体映射规则请参考 流表序列化及映射规则章节。
配置 支持自定义配置Flink高级参数,由于不同引擎版本参数不同,在使用过程中请注意对应引擎版本。
DDL解析说明

在创建流表时,支持用户输入 Flink Table 的 DDL 语句,平台通过语句解析获取字段和配置信息,来快速创建流表。 在流表登记的表单中已包含 DDL 中的部分参数,且部分特别参数如 group id 不适合在流表登记时统一配置,故下列参数在 DDL 解析时不会被解析到配置列表中,如需登记请手动配置。

DDL解析忽略的配置项 含义
connector Flink 所连接的外部系统的类型。
properties.bootstrap.servers Kafka 消费者/生产者的启动服务器列表。
properties.group.id Kafka 消费者群组的唯一标识符。
topic Kafka 消息队列的主题名称。
format 外部系统的数据格式。
name.server.address 消息队列服务器的地址。
group 消费者群组的唯一标识符。
service-url 服务 URL。
admin-url 管理 URL。
url Flink 所连接的外部系统的 URL。
table-name 数据库中的表名。
username 连接外部系统时使用的用户名。
password 连接外部系统时使用的密码。
hosts 集群中节点的主机名或 IP 地址列表。
index Elasticsearch 中的索引名称。
zookeeper.quorum HBase 集群的 ZooKeeper 地址。
zookeeper.znode.parent HBase 在 ZooKeeper 中的父节点路径。
masters 集群的主节点列表。
primary.keys 表的主键。
数据解析说明

仅支持自动解析序列化方式为 json、canal-json、debezium-json、maxwell-json、ogg-json 的数据,具体说明规则如下:

  1. canal-jsonmaxwell-json 数据仅解析 data 字段中的内容。

    {
    "data":[
        {
            "id":22,
            "name":"opt",
            "age":123,
            "address":"北京市"
        }
    ],
    "type":"INSERT"
    }

    以上样例数据, 解析结果为4个字段 id、name、age、addressdata、type字段不展示。

  2. debezium-json 数据根据 op 字段不同解析不同字段中的内容。

    1. op 字段为“c” 时,解析 after 字段中的内容,如样例数据:

      {
      "before":null,
      "after":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "op":"c"
      }

      解析结果为3个字段 id、name、agebefore、after、op 字段均不展示。

    2. op 字段为“d”时,解析 before 字段中的内容,如样例数据:

      {
      "before":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "after":null,
      "op":"d"
      }

      解析结果为3个字段id、name、agebefore、after、op 字段均不展示。

    3. op 字段为“u”时,解析 after 字段中的内容,如样例数据:

      {
      "before":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "after":{
       "id":22,
       "name":"opt1",
       "age":11
      },
      "op":"u" 
      "ts_ms": 1589362330904,
      "transaction": null
      }

      解析结果为3个字段id、name、agebefore、after、op、ts_ms、transaction 字段均不展示。

  3. ogg-json 数据根据 op_type 字段不同解析不同字段中的内容。

    1. op_type 字段为“I”时,解析 after 字段中的内容,如样例数据:

      {
      "after":{
       "id": 111,
       "name": "op1",
       "weight": 5.15
      },
      "op_type": "I"
      }

      解析结果为3个字段 id、name、weightafter、op_type 字段均不展示。

    2. op_type 字段为“D”时,解析 before 字段中的内容,如样例数据:

      {
      "before":{
       "id": 111,
       "name": "op1",
       "weight": 5.18
      },
      "op_type": "D"
      }

      解析结果为3个字段 id、name、weightbefore、op_type 字段均不展示。

    3. op_type 字段为“U”时,解析 after 字段中的内容,如样例数据:

{
    "before":{
        "id": 111,
        "name": "op1",
        "weight": 5.18
    },
    "after":{
        "id": 111,
        "name": "op1",
        "weight": 5.15
    },
    "op_type": "U"
}

解析结果为3个字段 id、name、weightbefore、after、op_type 字段均不展示。

数据解析时字段的映射规则如下:

原字段类型 解析后的字段类型
string string
int、bigint、tinyint、smallint bigint
float、double、decimal decimal(默认精度标度为(10, 0))
boolean boolean
row、map row
array array
time、timestamp、date、byte、varbinary 无法解析 统一定义成string
从数据库获取方式说明

仅支持MySQL、Oracle、Elasticsearch、HBase、Kudu 数据源类型使用此方式获取流表字段信息。此方式需用户选择流表对应的物理表的源、库、表等信息后方可使用,点击此功能按钮后,平台将连接数据源获取对应表的元数据信息(包含字段名称、字段类型、是否为主键),并填充至流表字段信息列表中。获取到的字段类型与流表字段类型的映射关系详见各数据源类型流表介绍页面。

流表支持的字段类型
字段类型 备注
string
int
bigint
tinyint
smallint
float
double
decimal 支持配置精度和标度,默认值为(10,0)
boolean
row 部分数据源类型流表不支持此字段类型,详见各数据源类型流表说明
map value不支持row、map、array、varbinary
部分数据源类型流表不支持此字段类型,详见各数据源类型流表说明
array 部分数据源类型流表不支持此字段类型,详见各数据源类型流表说明
time 默认精度为(0),由于社区实现问题,暂不支持变更
timestamp 支持配置精度,默认值为(3)
date
varbinary 部分数据源类型流表不支持此字段类型,详见各数据源类型流表说明
metadata 支持输入metadata字段配置
仅 Kafka 数据源类型流表支持配置此字段
计算列 支持输入计算列配置
创建公共流表

创建或编辑流表时,流表可用范围选择公共流表,之后按需配置流表公开范围。

注意:如选择指定项目-集群公开,当前所在的项目-集群必须在指定公开范围内,否则流表将无法再次编辑。

流表管理操作 - 图5

  • 功能使用注意事项:
    • 公共流表使用时需按三元组方式使用。
    • 公共流表的使用权限通过数据源使用权限控制,即如需使用已公开的公共流表,则该项目需具备该流表实际对应的数据源的使用权限。
    • 公共流表的编辑权限归属于创建流表时所在的项目-集群的项目负责人、管理员、实时管理员、实时开发、实时运维。其他项目-集群即使在流表公开范围内也无法对流表进行编辑、删除操作。
复制流表时订阅源端表变更

支持用户复制流表时选择订阅源端表变更,当被复制的流表字段信息或配置信息有更新时,在订阅变更的表的详情页有更新提示。

  • 功能使用注意事项:
    • 仅可在通过复制按钮进入新建流表流程时订阅源端表变更,直接新建流表无此功能。
    • 用户复制流表时如未订阅变更或后续取消订阅,不可重新恢复订阅。
  • 功能详细使用步骤:
    • 订阅变更:在流表管理列表中找到需要复制的流表,点击复制按钮进入新建流表页面,在页面底部勾选订阅源端表变更。
      流表管理操作 - 图6
    • 查看变更:在复制后的流表详情页里,复制源端表字段将在源端表有变更时展示有更新icon。
      流表管理操作 - 图7
    • 接收变更:点击有更新icon,选择接收更新,进入流表编辑页面查看变更后的字段信息和配置信息,可基于变更后的内容修改后保存。保存后有更新提示消失。
    • 取消订阅:在复制流表时可以选择取消订阅源端表变更的勾选。已订阅变更的在对应流表详情页面,复制源端表字段旁可取消订阅。
      流表管理操作 - 图8

3. 使用流表

  1. 进入 实时开发 页面,选择任意SQL任务。
  2. 点击开发页面左侧流表管理操作 - 图9 实时数仓,选择数据源类型为 流表
  3. 选择目标库与目标表,可拖动表名称至代码区域,或 直接使用 数据库.表 的方式使用指定流表。

    流表管理操作 - 图10

  4. 公共流表使用时需按 [catalog].[db].[table] 三元组方式使用

流表支持

目前,实时计算平台支持将KafkaPulsarRocketMQMySQLOracleHBASEElasticSearchKudu 8种数据源类型登记为流表。

数据源具体使用及配置项使用说明请参阅SQL开发文档