内置connectors


flink 1.11 支持了 3种内置的connectors。

connector 描述 使用场景
‘connector’=’datagen’ 用于生成随机数据的source 常用于测试
‘connector’=’blackhole’ 不做任何处理的 sink 常用于性能测试
‘connector’=’print’ 打印到标准输出流(.out文件)的 sink 常用于调试

在外部 connector 环境还没有 ready 时,用户可以选择 datagen source 和 print sink 快速构建作业熟悉 Flink SQL 对于想要测试 Flink SQL 性能的用户,可以使用 blackhole 作为 sink;对于调试排错场景,print sink 会将计算结果打到标准输出(在“任务页面”Flink web ui 查看Task Managers “Stdout” 选项卡),使得定位问题的成本大大降低。

DataGen

datagen 可以根据指定的数据类型,构造source 表数据,支持指定’number-of-rows’ 构造有界流,不指定默认为无界流。支持的数据类型及表创建的参数请参考connector DataGen DataGen 可以单独作为source 使用,进行flink 的功能熟悉。但更多的时候是搭配CREATE TABLE LIKE 语法使用,Mock 数据,进行开发测试,能够极大的提升开发的效率。

使用方式如下:

定义源表表结构

CREATE TABLE random_source (
       amount int,
       course_code string,
       `event_time` TIMESTAMP(3) ,
       WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'sensor',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

使用dataGen mock 数据:

create table datagen_source 
with(  'connector' = 'datagen',
       'number-of-rows' = '10000') like random_source(EXCLUDING OPTIONS);

如果使用了元数据方式去定义的表结构,like 后指定的表名,使用数据库.表名即可

sql 替换源表为datagen_source
insert xxx select * from datagen_source;

Print

Print sink 会将计算结果打到标准输出(在“任务页面”Flink web ui 查看Task Managers “Stdout” 选项卡)中,在测试过程中,可以不用访问远程数据库 进行查看。支持的数据类型及表创建的参数请参考connector Print 搭配CREATE TABLE LIKE 语法使用能够快速测试数据结果 使用方式如下:

create catalog hive_catalog WITH (
    'type' = 'hive',       
    'default-database'='sloth', 
    'hive-version'='2.1.1', 
    'hive-site'='hive-site.xml', 
    'hdfs-site'='hdfs-site.xml', 
    'core-site'='core-site.xml', 
    'warehouse'='hdfs://bdms-test/user/sloth/hive_db', 
    'krb.keytab'='sloth.keytab', 
    'krb.conf'='krb5.conf',
    'krb.principal'='sloth/dev@BDMS.163.COM',
    'auth.method'='kerberos',
    'sys.db.url'='',
    'sys.db.user'='',
    'sys.db.password'=''
);

使用print :

CREATE TABLE print_table WITH ('connector' = 'print')
LIKE hive_catalog (EXCLUDING ALL)

如果是使用元数据方式注册的表,使用print 可能存在PROCTIME字段类型不匹配的问题,建议使用ddl 方式注册结果表

BlackHole

BlackHole 类似 linux 系统的 /dev/null .其本质是忽略了sink 对性能的影响,测试整个作业消费能力。支持的数据类型及表创建的参数请参考connector BlackHole .BlackHole 和其他两个内置连接器相似,大多情况下使用create table like 语法进行使用。需要注意的是BlackHole和Print 相同只能用在sink 连接器的定义上。

使用BlackHole:

CREATE TABLE blackhole_table WITH ('connector' = 'blackhole')
LIKE sink_table (EXCLUDING ALL)

关于LIKE子句的说明


LIKE 子句可以用于定义表的多个部分,不仅仅是 schema 部分。
可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,例如可以向Hive 中定义的表添加 watermark 定义。

可以控制合并的表属性如下:
CONSTRAINTS - 主键和唯一键约束
GENERATED - 计算列
OPTIONS - 连接器信息、格式化方式等配置项
PARTITIONS - 表分区信息
WATERMARKS - watermark 定义

并且有三种不同的表属性合并策略:
INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性。
EXCLUDING - 新表不包含源表指定的任何表属性。
OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。

并且你可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 属性才会被包含进新表。
详情请参考LIKE 语句