仅1.12支持
Avro模式注册表(Avro-confluent)允许你读取通过io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,写入可由 io.confluent.kafka.serializers.KafkaAvroDeserializer读取的记录
用例
CREATE TABLE source (
ID BIGINT,
USER_NAME STRING,
USER_AGES INT,
COL_DECIMAL STRING,
COL_DATE STRING,
COL_TIMESTAMP STRING,
PRIMARY KEY (ID) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.group.id' = 'testGroup',
'topic' = 'cdckafka.cdctest.sourcedb.myschema.test_1',
'properties.bootstrap.servers' = 'localhost:9093',
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.format' = 'avro-confluent',
'table.append-only' = 'false',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
'value.fields-include' = 'EXCEPT_KEY'
)
Option |
Required |
Default |
Type |
Description |
|
required |
(none) |
String |
Specify what format to use, here should be ‘avro-confluent’ . |
avro-confluent.basic-auth.credentials-source |
optional |
(none) |
String |
Basic auth credentials source for Schema Registry |
avro-confluent.basic-auth.user-info |
optional |
(none) |
String |
Basic auth user info for schema registry |
avro-confluent.bearer-auth.credentials-source |
optional |
(none) |
String |
Bearer auth credentials source for Schema Registry |
avro-confluent.bearer-auth.token |
optional |
(none) |
String |
Bearer auth token for Schema Registry |
avro-confluent.properties |
optional |
(none) |
Map |
Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence. |
avro-confluent.ssl.keystore.location |
optional |
(none) |
String |
Location / File of SSL keystore |
avro-confluent.ssl.keystore.password |
optional |
(none) |
String |
Password for SSL keystore |
avro-confluent.ssl.truststore.location |
optional |
(none) |
String |
Location / File of SSL truststore |
avro-confluent.ssl.truststore.password |
optional |
(none) |
String |
Password for SSL truststore |
avro-confluent.subject |
optional |
(none) |
String |
The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, ‘kafka’ and ‘upsert-kafka’ connectors use ‘<topic_name>-value’ or ‘<topic_name>-key’ as the default subject name if this format is used as the value or key format. But for other connectors (e.g. ‘filesystem’), the subject option is required when used as sink. |
avro-confluent.url |
required |
(none) |
String |
The URL of the Confluent Schema Registry to fetch/register schemas. |
以上内容对您是否有帮助?