PB Format
更新时间: 2023-04-24 20:23:50
阅读 182
PB Format(BETA)
PB Format 能读写 proto2/proto3 格式的数据。当前,PB schema 是从 table schema 中自动推导而得的。
适用版本(easystream)
ne-Flink = 1.12.4
plugin > 1.3.1
如何创建一张基于 PB Format 的表
以下是一个利用proto文件编译成的jar包、 Kafka 以及PB Format 构建表的例子。
基于proto文件编译生成java类文件,然后对文件进行打包成jar。
syntax = "proto3";
package pb;
option java_package = "pb";
option java_multiple_files = true;
message Person {
int32 id = 1;//同上
string name = 2;//必须字段,在后面的使用中必须为该段设置值
string email = 3;//可选字段,在后面的使用中可以自由决定是否为该字段设置值
}
创建一张表
create table source (
id int,
name string,
email string
) with (
'connector'= 'kafka',
'topic' = 'pb_der_sink',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'test3111',
'properties.bootstrap.servers' = '10.200.164.48:19092,10.200.164.48:29092,10.200.164.48:39092',
'format' = 'protobuf',
'protobuf.message-class-name' = 'pb.Person'
);
Format 参数
参数 | 是否必须 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
format |
必选 | (none) | String | 声明使用的格式,这里应为'protobuf' 。 |
ignore-parse-errors |
可选 | false | Boolean | 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null 。 |
read-default-values |
选填 | false |
Boolean | 当反序列化中不存在某些字段时,标识是读取默认值还是null ;默认为 false 。如果 proto 语法为 proto3 ,该值将被强制设置为 true ,因为 proto3 的标准是使用默认值。 |
注意
- 表schema中的数据类型必须要与PB的java类型严格匹配,如下:
Pb JavaType | LogicalType |
---|---|
STRING | VARCHAR or CHAR |
ENUM | VARCHAR or CHAR |
BOOLEAN | BOOLEAN |
BYTE_STRING | BINARY |
INT | INT |
LONG | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
MESSAGE | ROW |
REPEATED | ARRAY |
MAP | Map |
默认值
pb type | default value |
---|---|
boolean | false |
int | 0 |
long | 0L |
float | 0.0f |
double | 0.0 |
string | "" |
enum | first enum value |
binary | ByteString.EMPTY |
list | empty list |
map | empty map |
message | message class's default instance |
空值
在序列化过程中,flink row 可能会在 row/map/array 中包含 null 值。但是 protobuf 对 null 值的处理方式不同,我们应该注意它。如果 flink 行包含 null 元素,则此序列化程序不会将该字段写入 protobuf 流中。 如果下游用户读取此流:
- 使用proto2类,用户可以调用proto.hasXXX()方法来知道该字段是否存在于流中。
- 使用 proto3 类,proto.hasXXX() 已被弃用,因此无法知道该字段是否不存在或巧合设置了默认值。
row value | pb value |
---|---|
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
map |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
array |
Oneof字段
在序列化过程中,不能保证 oneof 组的 flink 行字段只包含至少一个非空值。所以在序列化中,我们按照 flink schema 的顺序来设置每个字段,所以在同一个 one-of 组中,高位的字段会覆盖然后低位的字段。
枚举类型
pb 的枚举值将转换为字符串,反之亦然,以 pb 中的枚举值定义的名称。
文档反馈
以上内容对您是否有帮助?