本文将为您介绍 EasyStream 中如何创建 Kafka 类型流表。

操作步骤

  1. 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
  2. 单击 创建表 按键,进行表创建。
  3. 选择 数据源类型Kafka
  4. 完成剩余表单内容,创建 Kafka 流表。

流表配置

配置名称 是否必填 配置生效类型 数据类型 默认值 参数说明
topic 目标表必填; 源表必须设置'topic'或't opic-pattern' 源表、目标表 String - 当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。
topic-pattern 可选 源表 String - 匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。
properties.group.id 必填 源表 String - Kafka source 的消费组 id
scan.startup.mode 可选 源表 String group-offsets Kafka consumer 的启动模式。有效值为:'earliest-offset','latest-offset','group-offsets','timestamp' 和 'specific-offsets'
format 可选 源表、目标表 String - 用来序列化或反序列化 Kafka 消息的格式。注意:该配置项和 'value.format' 二者必需其一。
key.format - 源表、目标表 String - 用来序列化和反序列化 Kafka 消息键(Key)的格式。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。
key.fields 可选 源表、目标表 String - 表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'。
key.fields-prefix 可选 源表、目标表 String - 为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'。
value.format 可选 源表、目标表 String - 序列化和反序列化 Kafka 消息体时使用的格式。注意:该配置项和 'format' 二者必需其一。
value.fields-include 可选 源表、目标表 枚举类型 ALL 定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。可选值:[ALL, EXCEPT_KEY]
scan.startup.specific-offsets 可选 源表 String - 在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'。
scan.startup.timestamp-millis 可选 源表 Long - 在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。
scan.topic-partition-discovery.interval 可选 源表 Duration - Consumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。
sink.partitioner 可选 目标表 String default Flink partition 到 Kafka partition 的分区映射关系,可选值有: default:使用 Kafka 默认的分区器对消息进行分区。 fixed:每个 Flink partition 最终对应最多一个 Kafka partition。 round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。 自定义 FlinkKafkaPartitioner的子类:例如 'org.mycompany.MyPartitioner'。
sink.parallelism 可选 目标表 Integer - 定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。
sink.semantic 可选 目标表 String - 定义kafka的sink语义。有效的枚举值为['at-least-once', 'exactly-once' 和 'none']
table.append-only 可选 目标表 Boolean TRUE 是否为追加模式
parallelism 可选 源表 Integer - kafka source算子的并行度