流表管理
在开发过程中,我们需要通过Flink SQL代码将实际存在的物理表,以Flink Table的方式进行映射来获得流表,进一步对流表进行逻辑加工和计算最终得到结果,再输出到物理表使数据落地。
为提高任务开发效率,避免同一张物理表在不同任务中需要反复映射,EasyStream 实时计算通过构建 “逻辑流库-逻辑流表”的方式,对 Flink Table 进行统一管理,实现便捷快速的开发体验。
操作步骤
EasyStream 实时计算通过 [数据源.]数据库.表
三元组的方式识别并定位到具体的物理表。流表本身作为物理表的映射,并不归属于任何数据库下,因此我们需要为流表创建一个逻辑的数据库,用于管理并使用流表。
1. 创建逻辑库
- 使用实时管理员及以上角色登录平台,选择 实时开发 。
- 选择左侧边栏 实时数仓 ,打开数仓选择页面。
- 单击 流表管理,进入流表管理页面。
- 单击 创建库 , 进行逻辑库创建。
说明: 库名项目内唯一,建议根据 业务主题域 或 实时数仓分层 进行库名定义。 |
2. 创建流表
- 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
- 单击 创建表 按键,进行表创建。
- 单击 保存 ,完成流表注册。
文档以Kafka类型流表创建作为样例进行说明。
表单参数 | 说明 | |
---|---|---|
数据库 | 当前流表归属的逻辑库,默认置灰不可修改。用户可通过切换逻辑库重新创建表来更改数据库的值。 | |
表名 | 流表名称,任务开发时将直接使用此名称。 | |
描述 | 流表描述,通常用于描述流表的业务含义。 | |
数据源类型 | 支持选择不同类型的数据源,所选类型会限制下方 数据源 选项内容。 | |
数据源 | 选择对应类型下已登记且当前项目有权限的数据源名称,平台将自动拉取对应链接信息。 | |
Topic | 手动输入需要映射的Kafka Topic名称,存在多个数据结构相同的Topic时可通过 ; 进行分隔。
|
|
序列化方式 | Kafka Topic内数据的编译方式,支持Json、csv、avro、string、ua、ndc、canal-json、debezium-json、maxwell-json 等方式。
|
|
字段信息获取方式 | 支持 DDL解析 和 自动解析 两种方式,仅有Kafka类型且序列化方式为 json、canal-json、debezium-json、maxwell-json的数据支持自动解析。 | |
字段信息 | 支持用户手动设置字段名与类型,当字段类型为array、map、row类型时支持设置嵌套字段,
|
|
配置 | 支持自定义配置Flink高级参数,由于不同引擎版本参数不同,在使用过程中请注意对应引擎版本。 |
DDL解析说明
在创建流表时,支持用户输入 Flink Table 的 DDL 语句,平台通过语句解析获取字段和配置信息,来快速创建流表。 在流表登记的表单中已包含 DDL 中的部分参数,且部分特别参数如 group id 不适合在流表登记时统一配置,故下列参数在 DDL 解析时不会被解析到配置列表中,如需登记请手动配置。
DDL解析忽略的配置项 | 含义 |
---|---|
connector | Flink 所连接的外部系统的类型。 |
properties.bootstrap.servers | Kafka 消费者/生产者的启动服务器列表。 |
properties.group.id | Kafka 消费者群组的唯一标识符。 |
topic | Kafka 消息队列的主题名称。 |
format | 外部系统的数据格式。 |
name.server.address | 消息队列服务器的地址。 |
group | 消费者群组的唯一标识符。 |
service-url | 服务 URL。 |
admin-url | 管理 URL。 |
url | Flink 所连接的外部系统的 URL。 |
table-name | 数据库中的表名。 |
username | 连接外部系统时使用的用户名。 |
password | 连接外部系统时使用的密码。 |
hosts | 集群中节点的主机名或 IP 地址列表。 |
index | Elasticsearch 中的索引名称。 |
zookeeper.quorum | HBase 集群的 ZooKeeper 地址。 |
zookeeper.znode.parent | HBase 在 ZooKeeper 中的父节点路径。 |
masters | 集群的主节点列表。 |
primary.keys | 表的主键。 |
自动解析说明
流表字段信息获取仅支持自动解析序列化方式为 json、canal-json、debezium-json、maxwell-json 的数据,具体说明规则如下:
canal-json 和 maxwell-json 数据仅解析 data 字段中的内容。
{ "data":[ { "id":22, "name":"opt", "age":123, "address":"北京市" } ], "type":"INSERT" }
以上样例数据, 解析结果为4个字段
id、name、age、address
,data、type
字段不展示。debezium-json 数据根据 op 字段不同解析不同字段中的内容。
op 字段为“c” 时,解析 after 字段中的内容,如样例数据:
{ "before":null, "after":{ "id":22, "name":"opt1", "age":null }, "op":"c" }
解析结果为3个字段
id、name、age
,before、after、op
字段均不展示。op 字段为“d”时,解析 before 字段中的内容,如样例数据:
{ "before":{ "id":22, "name":"opt1", "age":null }, "after":null, "op":"d" }
解析结果为3个字段
id、name、age
,before、after、op
字段均不展示。
3. 使用流表
- 进入 实时开发 页面,选择任意SQL任务。
- 点击开发页面左侧 实时数仓,选择数据源类型为 流表。
选择目标库与目标表,可拖动表名称至代码区域,或 直接使用
数据库.表
的方式使用指定流表。
流表支持
目前,实时计算平台支持将Kafka
、Pulsar
、RocketMQ
、MySQL
、Oracle
、HBASE
、ElasticSearch
、Kudu
8种数据源类型登记为流表。
数据源具体使用及配置项使用说明请参阅EasyStream Engine SQL 连接器 。
以上内容对您是否有帮助?