Confluent Avro Format

仅1.12支持

Avro模式注册表(Avro-confluent)允许你读取通过io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,写入可由 io.confluent.kafka.serializers.KafkaAvroDeserializer读取的记录

用例

CREATE TABLE source (
    ID BIGINT,
    USER_NAME STRING,
    USER_AGES INT,
    COL_DECIMAL STRING,
    COL_DATE STRING,
    COL_TIMESTAMP STRING,
    PRIMARY KEY (ID) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'properties.group.id' = 'testGroup',
    'topic' = 'cdckafka.cdctest.sourcedb.myschema.test_1',
    'properties.bootstrap.servers' = 'localhost:9093',
    'key.format' = 'avro-confluent',
    'key.avro-confluent.schema-registry.url' = 'http://localhost:8081',
    'value.format' = 'avro-confluent',
    'table.append-only' = 'false',
    'value.avro-confluent.schema-registry.url' = 'http://localhost:8081',
    'value.fields-include' = 'EXCEPT_KEY'
)

Format参数

Option Required Default Type Description
format
required (none) String Specify what format to use, here should be ‘avro-confluent’.
avro-confluent.basic-auth.credentials-source
optional (none) String Basic auth credentials source for Schema Registry
avro-confluent.basic-auth.user-info
optional (none) String Basic auth user info for schema registry
avro-confluent.bearer-auth.credentials-source
optional (none) String Bearer auth credentials source for Schema Registry
avro-confluent.bearer-auth.token
optional (none) String Bearer auth token for Schema Registry
avro-confluent.properties
optional (none) Map Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.
avro-confluent.ssl.keystore.location
optional (none) String Location / File of SSL keystore
avro-confluent.ssl.keystore.password
optional (none) String Password for SSL keystore
avro-confluent.ssl.truststore.location
optional (none) String Location / File of SSL truststore
avro-confluent.ssl.truststore.password
optional (none) String Password for SSL truststore
avro-confluent.subject
optional (none) String The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, ‘kafka’ and ‘upsert-kafka’ connectors use ‘<topic_name>-value’ or ‘<topic_name>-key’ as the default subject name if this format is used as the value or key format. But for other connectors (e.g. ‘filesystem’), the subject option is required when used as sink.
avro-confluent.url
required (none) String The URL of the Confluent Schema Registry to fetch/register schemas.