Kafka Connectors

简述

  • kafka输入输出,支持upsert方式,见 upsertKafka.md

示例

官方文档

  • ddl
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
-- ts AS PROCTIME(),
ts BIGINT,
ts0 AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')),
WATERMARK FOR ts0 AS ts0 - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'userlog-test',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
'format' = 'json'
);

CREATE TABLE usertest_sink (
  id VARCHAR,
  name varchar
) WITH (
  'connector' = 'kafka',
  'topic' = 'usertest_sink',
  'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:9092',
  'properties.group.id' = 'test',
  'table.append-only' = 'true',
  'sink.partitioner' = 'round-robin',
  'sink.parallelism' = '1',
  'format' = 'json'
);

insert into usertest_sink
select user_id id, '1' name from user_log group by user_id, hop(ts0, interval '10' SECOND, interval '10' SECOND)
;

KAFKA认证配置

以下内容介绍kafka 的几种认证配置方式,详情请参考kafka 官方配置

配置方法

数据源注册时(Metahub) 的方式 ,新增自定义配置,增加如下3组配置,注意user ,password 需要替换为集群自己的账号密码,最后一项配置必须带个分号;

 'properties.security.protocol' = 'SASL_PLAINTEXT',
 'properties.sasl.mechanism' = 'PLAIN',
 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'

set方式配置 同时产品也支持在任务sql 中通过 set 参数进行设置,如下:

set  'tablename.properties.security.protocol' = 'SASL_PLAINTEXT',
set  'tablename.properties.sasl.mechanism' = 'PLAIN',
set 'tablename.properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'
认证方式

首先需要确定kafka 集群使用了那种认证方式,不同的认证方式,需要使用不同的配置 kafka目前共有4种常见的认证方式
SASL/GSSAPI(kerberos):kafka0.9版本推出,即借助kerberos实现用户认证


 'properties.security.protocol' = 'SASL_PLAINTEXT',(or SASL_SSL)
 'properties.sasl.mechanism' = 'GSSAPI', 
 'properties.sasl.kerberos.service.name'='kafka',
 'properties.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
    useKeyTab=true \
    storeKey=true  \
    keyTab="/etc/security/keytabs/kafka_client.keytab" \
    principal="kafka-client-1@EXAMPLE.COM";'

SASL/PLAIN:kafka0.10推出,非常简单,简单得有些鸡肋,不建议生产环境使用,除非对这个功能二次开发

 'properties.security.protocol' = 'SASL_PLAINTEXT',
 'properties.sasl.mechanism' = 'PLAIN',
 'properties.sasl.jaas.config'='org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";'

SASL/SCRAM:kafka0.10推出,全名Salted Challenge Response Authentication Mechanism,为解决SASL/PLAIN的不足而生,缺点可能是某些客户端并不支持这种方式认证登陆(使用比较复杂)

 'properties.security.protocol'='SASL_SSL',
 'properties.sasl.mechanism'='SCRAM-SHA-256' (or SCRAM-SHA-512),
 'properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice"  password="alice-secret";'

SASL/OAUTHBEARER:kafka2.0推出,实现较为复杂,目前业内应该较少实践

 'properties.security.protocol'='SASL_SSL' (or SASL_PLAINTEXT if non-production),
 'properties.sasl.mechanism'='OAUTHBEARER',
 'properties.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="alice";'
  • 元数据方式
set 'user_log.scan.startup.mode' = 'earliest-offset';
set 'user_log.properties.group.id' = 'test';

insert into ztes.user_test_sink
select user_id id, '1' name from ztes.user_log
;

参数

参数 是否必填 默认值 备注
connector 必填 必须为:kafka
topic sink必填 kafka 读写的 topic。 允许为一个列表,e.g. ‘topic-1;topic-2’ 。 注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
topic-pattern 可选 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,’topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。
properties.bootstrap.servers 必填 逗号分隔的 Kafka broker 列表。
properties.group.id 必填 kafka消费组的id
properties.* 必填 可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 Kafka 配置文档 中定义的配置键。Flink 将移除 “properties.” 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 ‘key.deserializer’ 和 ‘value.deserializer’。
format 必填 用来序列化或反序列化 Kafka 消息的格式。 请参阅 formats目录下的配置 以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘value.format’ 二者必需其一。
key.format 可选 用来序列化或反序列化 Kafka 消息键(Key)的格式。 请参阅 formats目录下的配置 以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 ‘key.fields’ 也是必需的。 否则 Kafka 记录将使用空值作为键。
key.fields 可选 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 ‘field1;field2’。
key.fields-prefix 可选 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
value.format 可选 用来序列化或反序列化 Kafka 消息键(Key)的格式。 请参阅 formats目录下的配置 以获取更多关于格式的细节和相关配置项。 注意:该配置项和 ‘format’ 二者必需其一。
value.fields-include 可选 ALL 可选值:[ALL, EXCEPT_KEY]. 定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。
scan.startup.mode 可选 group-offsets Kafka consumer 的启动模式。有效值为:’earliest-offset’,’latest-offset’,’group-offsets’,’timestamp’ 和 ‘specific-offsets’。 请参阅下方 起始消费位点 以获取更多细节。
scan.startup.specific-offsets 可选 在使用 ‘specific-offsets’ 启动模式时为每个 partition 指定 offset,例如 ‘partition:0,offset:42;partition:1,offset:300’。
scan.startup.timestamp-millis 可选 在使用 ‘timestamp’ 启动模式时指定启动的时间戳(单位毫秒)。
scan.topic-partition-discovery.interval 可选 Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。e.g. 3d/day/h/hour/min/s/ms
sink.partitioner 可选 ‘default’ Flink partition 到 Kafka partition 的分区映射关系,可选值有:
- default:使用 Kafka 默认的分区器对消息进行分区。
- fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
- round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
- 自定义 FlinkKafkaPartitioner 的子类:例如 ‘org.mycompany.MyPartitioner’。
- 请参阅下方 Sink 分区 以获取更多细节。
sink.semantic 可选 at-least-once 定义 Kafka sink 的语义。有效值为 ‘at-least-once’,’exactly-once’ 和 ‘none’。请参阅 一致性保证 以获取更多细节。
sink.parallelism 可选 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。
parallelism 可选 指定kafka source 并行度

简述

  • kafka输入输出,表字段支持json消息格式

示例:

  • flink kafka connector官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
  • Time、Date、Timestamp类型

    • For example: 2018-01-01 for date, 20:43:59Z for time, and 2018-01-01T20:43:59Z for timestamp.
  • source ``` CREATE TABLE user_log ( user_id BIGINT, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( ‘connector.type’ = ‘kafka’, ‘connector.version’ = ‘universal’, ‘connector.topic’ = ‘xxx’, ‘connector.startup-mode’ = ‘earliest-offset’, ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, ‘update-mode’ = ‘append’, ‘format.type’ = ‘json’, ‘format.derive-schema’ = ‘true’, ‘format.fail-on-missing-field’ = ‘false’ );


* sink
```SQL
CREATE TABLE user_age (
  user_id BIGINT,
  item_id VARCHAR,
  age INT,
  behavior VARCHAR,
  ts TIMESTAMP
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'update-mode' = 'append',
  'connector.sink-partitioner' = 'round-robin',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);

with参数

source参数

  • connector.type
    • ‘kafka’ 必选。
  • connector.version
    • 某kafka版本;必选。universal。版本对应关系见Kafka版本对应关系。
  • connector.topic
    • kafka的topic;必选
  • connector.startup-mode
    • 启动方式,可选。earliest-offset、latest-offset、group-offsets和specific-offsets,当指定为specific-offsets时,还需要配置如下:
      'connector.specific-offsets.0.partition' = '0',
      'connector.specific-offsets.0.offset' = '42',
      'connector.specific-offsets.1.partition' = '1',
      'connector.specific-offsets.1.offset' = '300'
  • connector.properties.*.key
    • 配置kafka的property key,必选。
  • connector.properties.*.value
    • 配置kafka的property value,必选。
      'connector.properties.zookeeper.connect' = 'xxx',
      'connector.properties.bootstrap.servers' = 'xxx',
      'connector.properties.group.id' = 'xxx',
      或者是
      'connector.properties.0.key' = 'zookeeper.connect', -- optional: connector specific properties
      'connector.properties.0.value' = 'localhost:2181',
      'connector.properties.1.key' = 'bootstrap.servers',
      'connector.properties.1.value' = 'localhost:9092',
      'connector.properties.2.key' = 'group.id',
      'connector.properties.2.value' = 'testGroup',
  • flink.partition-discovery.interval-millis
    • 定时检查是否有新分区产生;可选;默认不启用,单位为毫秒。

sink 参数

参数 注释说明 备注
connector.type 类型 必填:kafka
connector.version kafka 版本 必填:universal
connector.topic kafka 的 topic 必填
connector.properties.*.key 配置kafka的property key 必填
connector.properties.*.value 配置kafka的property value 必填: ‘connector.properties.zookeeper.connect’ = ‘xxx’, ‘connector.properties.bootstrap.servers’ = ‘xxx’, ‘connector.properties.group.id’ = ‘xxx’, 或者是 ‘connector.properties.0.key’ = ‘zookeeper.connect’, — optional: connector specific properties ‘connector.properties.0.value’ = ‘localhost:2181’,’connector.properties.1.key’ = ‘bootstrap.servers’, ‘connector.properties.1.value’ = ‘localhost:9092’, ‘connector.properties.2.key’ = ‘group.id’, ‘connector.properties.2.value’ = ‘testGroup’
update-mode 更新模式 必填:目前只支持 append
format.type 输出数据格式 必填。e.g. ‘json’。
format.derive-schema 数据schema是否来源于table schema 选填。e.g. ‘true’。
format.json-schema 使用JSON schema 选填。derive-schema已设置的话,该项不用设置。
connector.sink-partitioner 配置flink数据输出到kafka的partition 选填。e.g. fixed、round-robin、custom。
connector.sink-partitioner-class ‘org.mycompany.MyPartitioner’ sink partitioner为custom才生效 选填

kafka版本对应配置

version Kafka版本
universal 0.8.2.2
universal 0.9.0.1
universal 0.10.2.1
universal 0.11.0.2
universal 0.11+