PB Format(BETA)

PB Format 能读写 proto2/proto3 格式的数据。当前,PB schema 是从 table schema 中自动推导而得的。

适用版本(easystream)

ne-Flink = 1.12.4

plugin > 1.3.1

如何创建一张基于 PB Format 的表

以下是一个利用proto文件编译成的jar包、 Kafka 以及PB Format 构建表的例子。

基于proto文件编译生成java类文件,然后对文件进行打包成jar。

syntax = "proto3";
package pb;
option java_package = "pb";
option java_multiple_files = true;

message Person {  
  int32 id = 1;//同上  
  string name = 2;//必须字段,在后面的使用中必须为该段设置值  
  string email = 3;//可选字段,在后面的使用中可以自由决定是否为该字段设置值
}

创建一张表

create table source (
    id int,
    name string,
    email string
) with (
    'connector'= 'kafka',
    'topic' = 'pb_der_sink',
    'scan.startup.mode' = 'earliest-offset',
    'properties.group.id' = 'test3111',
    'properties.bootstrap.servers' = '10.200.164.48:19092,10.200.164.48:29092,10.200.164.48:39092',
    'format' = 'protobuf',
    'protobuf.message-class-name' = 'pb.Person'
);

Format 参数

参数 是否必须 默认值 类型 描述
format
必选 (none) String 声明使用的格式,这里应为'protobuf'
ignore-parse-errors
可选 false Boolean 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
read-default-values
选填 false Boolean 当反序列化中不存在某些字段时,标识是读取默认值还是null;默认为 false。如果 proto 语法为 proto3,该值将被强制设置为 true,因为 proto3 的标准是使用默认值。

注意

  • 表schema中的数据类型必须要与PB的java类型严格匹配,如下:
Pb JavaType LogicalType
STRING VARCHAR or CHAR
ENUM VARCHAR or CHAR
BOOLEAN BOOLEAN
BYTE_STRING BINARY
INT INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
MESSAGE ROW
REPEATED ARRAY
MAP Map

默认值

pb type default value
boolean false
int 0
long 0L
float 0.0f
double 0.0
string ""
enum first enum value
binary ByteString.EMPTY
list empty list
map empty map
message message class's default instance

空值

在序列化过程中,flink row 可能会在 row/map/array 中包含 null 值。但是 protobuf 对 null 值的处理方式不同,我们应该注意它。如果 flink 行包含 null 元素,则此序列化程序不会将该字段写入 protobuf 流中。 如果下游用户读取此流:

  1. 使用proto2类,用户可以调用proto.hasXXX()方法来知道该字段是否存在于流中。
  2. 使用 proto3 类,proto.hasXXX() 已被弃用,因此无法知道该字段是否不存在或巧合设置了默认值。
row value pb value
map(<"a", null>) map(("a", ""))
map() map(("", "a"))
map(null, 1) map(0, 1)
map(1, null) map(1, 0)
map(null, 1) map(0, 1)
map(1, null) map(1, 0)
map(null, true) map(false, true)
map(true, null) map(true, false)
map("key", null) map("key", 0)
map("key", null) map("key", 0)
map("key", null) map("key", )
map("key", null) map("key", ByteString.EMPTY)
map("key", null) map("key", MESSAGE.getDefaultInstance())
array(null) array("")
array(null) array(0)
array(null) array(0)
array(null) array(false)
array(null) array(0)
array(null) array(0)
array(null) array()
array(null) array(ByteString.EMPTY)
array(null) array(MESSAGE.getDefaultInstance())

Oneof字段

在序列化过程中,不能保证 oneof 组的 flink 行字段只包含至少一个非空值。所以在序列化中,我们按照 flink schema 的顺序来设置每个字段,所以在同一个 one-of 组中,高位的字段会覆盖然后低位的字段。

枚举类型

pb 的枚举值将转换为字符串,反之亦然,以 pb 中的枚举值定义的名称。