NDC Format
更新时间: 2023-08-01 17:47:21
阅读 77
NDC Format
NDC Format是网易内部自研读取数据库的消息格式,能够获取数据变更详细信息。目前用于NDC任务采用行级订阅,并且序列化方式选用NDC的方式时,在 Easystream 消费处理需要设置该format 格式进行数据获取.。推荐使用ndc-hub.使用DDL 方式直接指定即可,如使用元数据引入方式,必须在sql 里面进行set 设置。
Flink1.10
format=ndc
CREATE TABLE source ( seqno bigint, lastEvent BOOLEAN, `timestamp` bigint, rowChanges varchar ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'mysql_blogbench', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '10.122.173.110:2181/dev_test', 'connector.properties.bootstrap.servers' = '10.122.173.110:9092', 'connector.properties.group.id' = 'sink_topic_yxx_es_ndc', 'format.type' = 'ndc', 'format.derive-schema' = 'true' );
format=ndc-hub (推荐使用该方式)
CREATE TABLE source ( ndc_seqNo_ BIGINT, timestamp_ BIGINT, ndc_type_ STRING, id INT, id_before_ INT ) WITH( 'connector.type' = 'ndc-kafka', 'format.type' = 'ndc-dbhub', 'format.filter' = '.*testTable', 'ndc.project' = 'Mammut-test_bdms', 'ndc.source' = 'mysql165', 'ndc.source.table' = 'test.qa_all_field_mysql', 'group.id' = 'hh' );
Flink1.12(推荐)
format=ndc-hub
- DDL任务
CREATE TABLE source ( ndc_seqNo_ BIGINT, timestamp_ BIGINT, ndc_type_ STRING, id INT, id_before_ INT ) WITH( 'connector' = 'kafka', 'properties.bootstrap.servers' = '', 'format' = 'ndc-dbhub', 'properties.group.id' = 'hh', 'ndc-dbhub.filter' = '*table', );
- metahub任务
--序列化格式,统一是ndc-dbhub SET '源表名称.format'= 'ndc-dbhub' SET '源表名称.connector.type' = 'ndc-kafka' --以下参数根据实际需要设置即可,元数据方式只需要设置上面两个参数即可,其他参数使用默认的源表注册参数,如无必要可不调整。 --过滤筛选需要消费的kafka的表名 SET 'kafka_pallet_task.ndc-dbhub.filter' = '.*t_pallet_task'; --消费组名字可以随意指定,不和其他消费组重复即可 SET 'kafka_pallet_task.properties.group.id' = 't_pallet_task_alca_pro1'; --kafka最新位置点开始消费 SET 'kafka_pallet_task.scan.startup.mode' = 'latest-offset'; SET 'kafka_pallet_task.properties.zookeeper.connect' = '****'; SET 'kafka_pallet_task.properties.bootstrap.servers' = '****'; SET 'kafka_pallet_task.primary.keys' = 'id'; CREATE view view_pallet_task as select * from kafka_pallet_task ;
文档反馈
以上内容对您是否有帮助?