在开发过程中,我们需要通过Flink SQL代码将实际存在的物理表,以Flink Table的方式进行映射来获得流表,进一步对流表进行逻辑加工和计算最终得到结果,再输出到物理表使数据落地。

流表管理 - 图1

为提高任务开发效率,避免同一张物理表在不同任务中需要反复映射,EasyStream 实时计算通过构建 “逻辑流库-逻辑流表”的方式,对 Flink Table 进行统一管理,实现便捷快速的开发体验。

操作步骤

EasyStream 实时计算通过 [数据源.]数据库.表 三元组的方式识别并定位到具体的物理表。流表本身作为物理表的映射,并不归属于任何数据库下,因此我们需要为流表创建一个逻辑的数据库,用于管理并使用流表。

1. 创建逻辑库

  1. 使用实时管理员及以上角色登录平台,选择 实时开发
  2. 选择左侧边栏 实时数仓 ,打开数仓选择页面。
  3. 单击 流表管理 - 图2 流表管理,进入流表管理页面。
  4. 单击 流表管理 - 图3 创建库 , 进行逻辑库创建。

流表管理 - 图4

说明: 库名项目内唯一,建议根据 业务主题域实时数仓分层 进行库名定义。

2. 创建流表

  1. 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
  2. 单击 创建表 按键,进行表创建。
  3. 单击 保存 ,完成流表注册。

文档以Kafka类型流表创建作为样例进行说明。

表单参数 说明
数据库 当前流表归属的逻辑库,默认置灰不可修改。用户可通过切换逻辑库重新创建表来更改数据库的值。
表名 流表名称,任务开发时将直接使用此名称。
描述 流表描述,通常用于描述流表的业务含义。
数据源类型 支持选择不同类型的数据源,所选类型会限制下方 数据源 选项内容。
数据源 选择对应类型下已登记且当前项目有权限的数据源名称,平台将自动拉取对应链接信息。
Topic 手动输入需要映射的Kafka Topic名称,存在多个数据结构相同的Topic时可通过 ; 进行分隔。
说明: 通常多个Topic注册一个流表使用在流表场景。
序列化方式 Kafka Topic内数据的编译方式,支持Json、csv、avro、string、ua、ndc、canal-json、debezium-json、maxwell-json 等方式。
说明:流表序列化方式需要与Topic数据实际序列化方式保持一致。
字段信息获取方式 支持 DDL解析数据解析从数据库获取 三种方式。
所有流表数据源类型均支持自动解析的方式。
Kafka、RocketMQ、Pulsar数据源类型且序列化方式为 json、canal-json、debezium-json、maxwell-json的数据支持数据解析的方式。
MySQL、Oracle、Elasticsearch、HBase、Kudu 数据源类型支持从数据库获取的方式。
字段信息 支持用户手动设置字段名与类型,当字段类型为array、map、row类型时支持设置嵌套字段,
说明:目前嵌套层数最多为5层,具体映射规则请参考 流表映射规则
配置 支持自定义配置Flink高级参数,由于不同引擎版本参数不同,在使用过程中请注意对应引擎版本。
DDL解析说明

在创建流表时,支持用户输入 Flink Table 的 DDL 语句,平台通过语句解析获取字段和配置信息,来快速创建流表。 在流表登记的表单中已包含 DDL 中的部分参数,且部分特别参数如 group id 不适合在流表登记时统一配置,故下列参数在 DDL 解析时不会被解析到配置列表中,如需登记请手动配置。

DDL解析忽略的配置项 含义
connector Flink 所连接的外部系统的类型。
properties.bootstrap.servers Kafka 消费者/生产者的启动服务器列表。
properties.group.id Kafka 消费者群组的唯一标识符。
topic Kafka 消息队列的主题名称。
format 外部系统的数据格式。
name.server.address 消息队列服务器的地址。
group 消费者群组的唯一标识符。
service-url 服务 URL。
admin-url 管理 URL。
url Flink 所连接的外部系统的 URL。
table-name 数据库中的表名。
username 连接外部系统时使用的用户名。
password 连接外部系统时使用的密码。
hosts 集群中节点的主机名或 IP 地址列表。
index Elasticsearch 中的索引名称。
zookeeper.quorum HBase 集群的 ZooKeeper 地址。
zookeeper.znode.parent HBase 在 ZooKeeper 中的父节点路径。
masters 集群的主节点列表。
primary.keys 表的主键。
数据解析说明

仅支持自动解析序列化方式为 json、canal-json、debezium-json、maxwell-json 的数据,具体说明规则如下:

  1. canal-jsonmaxwell-json 数据仅解析 data 字段中的内容。

    {
    "data":[
        {
            "id":22,
            "name":"opt",
            "age":123,
            "address":"北京市"
        }
    ],
    "type":"INSERT"
    }

    以上样例数据, 解析结果为4个字段 id、name、age、addressdata、type字段不展示。

  2. debezium-json 数据根据 op 字段不同解析不同字段中的内容。

    1. op 字段为“c” 时,解析 after 字段中的内容,如样例数据:

      {
      "before":null,
      "after":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "op":"c"
      }

      解析结果为3个字段 id、name、agebefore、after、op 字段均不展示。

    2. op 字段为“d”时,解析 before 字段中的内容,如样例数据:

      {
      "before":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "after":null,
      "op":"d"
      }

      解析结果为3个字段id、name、agebefore、after、op 字段均不展示。

    3. op 字段为“u”时,解析 before 字段中的内容,如样例数据:

      {
      "before":{
       "id":22,
       "name":"opt1",
       "age":null
      },
      "after":{
       "id":22,
       "name":"opt1",
       "age":11
      },
      "op":"u" 
      "ts_ms": 1589362330904,
      "transaction": null
      }

      解析结果为3个字段id、name、agebefore、after、op、ts_ms、transaction 字段均不展示。

数据解析时字段的映射规则如下:

原字段类型 解析后的字段类型
string string
int、bigint、tinyint、smallint bigint
float、double、decimal decimal
row、map row
array array
time、timestamp、date、byte、varbinary 无法解析 统一定义成string
从数据库获取方式说明

仅支持MySQL、Oracle、Elasticsearch、HBase、Kudu 数据源类型使用此方式获取流表字段信息。此方式需用户选择流表对应的物理表的源、库、表等信息后方可使用,点击此功能按钮后,平台将连接数据源获取对应表的元数据信息(包含字段名称、字段类型、是否为主键),并填充至流表字段信息列表中。获取到的字段类型与流表字段类型的映射关系详见各数据源类型流表介绍页面。

3. 使用流表

  1. 进入 实时开发 页面,选择任意SQL任务。
  2. 点击开发页面左侧流表管理 - 图5 实时数仓,选择数据源类型为 流表
  3. 选择目标库与目标表,可拖动表名称至代码区域,或 直接使用 数据库.表 的方式使用指定流表。

    流表管理 - 图6

流表支持

目前,实时计算平台支持将KafkaPulsarRocketMQMySQLOracleHBASEElasticSearchKudu 8种数据源类型登记为流表。

数据源具体使用及配置项使用说明请参阅SQL开发文档