本章主要通过维表 Join 的案例的准备工作来介绍如何进行数据源登记和流表创建。 为实现维表 Join,我们首先需要创建本地 MySQL 表。

创建维表和输出表

本案例需在本地 MySQL 数据库(数据库名:poc)中创建好两张表,一张是名为 goods_info 的维表,一张是名为 goods_join_mysql_sink 的输出表。建表语句如下:

---维表DDL
CREATE TABLE `goods_info` (
  `itemID` varchar(100) NOT NULL,
  `itemName` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`itemID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
---输出表DDL
CREATE TABLE `goods_join_mysql_sink` (
  `itemID` varchar(100) DEFAULT NULL,
  `itemType` varchar(100) DEFAULT NULL,
  `onSellTime` timestamp NULL DEFAULT NULL,
  `price` double DEFAULT NULL,
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `onSellTimeOrigin` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00',
  `itemName` varchar(100) DEFAULT NULL,
  `maxPrice` double DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

登记数据源

本案例需要在数仓页面登记两个数据源,一个 Kafka,一个 MySQL,同时在数仓流表管理的页面创建一张流表,数据源登记和流表登记的详细说明请参见 数仓管理 章节。

登记 Kafka 数据源

入口:数仓 - 数据源管理 - 登记数据源

数据源类型选择 kafka,在配置中填入 Broker 地址。

新建Kafka数据源

登记 MySQL 数据源

入口:数仓 - 数据源管理 - 登记数据源 数据源类型选择 MySQL,在配置中填入密码、用户名和访问地址。

新建MySQL数据源

其他类型的数据源登记请参考数据源登记章节。

登记流表

登记流表需要先新建一个数据库。当前仅项目负责人、管理员、实时管理员角色用户具备创建流表库权限。

新建数据库入口:实时开发→流表管理→数据库右上角。

新建数据库

新建数据库成功后就可以开始新建流表。其中流表的topic需要在登记的kafka中创建好。

新建流表入口:实时开发→流表管理→选择数据库后在页面右上角点击新建流表。

新建流表

流表创建完成后就可以在 SQL 任务中通过 [数据源].数据库.表 三元组的方式直接使用,无需通过 Flink DDL 方式调用。