NDC Format

NDC Format是网易内部自研读取数据库的消息格式,能够获取数据变更详细信息。目前用于NDC任务采用行级订阅,并且序列化方式选用NDC的方式时,在 Easystream 消费处理需要设置该format 格式进行数据获取.。推荐使用ndc-hub.使用DDL 方式直接指定即可,如使用元数据引入方式,必须在sql 里面进行set 设置。

Flink1.10

  • format=ndc

    CREATE TABLE source (
      seqno bigint,
      lastEvent BOOLEAN,
      `timestamp` bigint,
      rowChanges varchar
    ) WITH (
      'connector.type' = 'kafka',
      'connector.version' = 'universal',
      'connector.topic' = 'mysql_blogbench',
      'connector.startup-mode' = 'earliest-offset',
      'connector.properties.zookeeper.connect' = '10.122.173.110:2181/dev_test',
      'connector.properties.bootstrap.servers' = '10.122.173.110:9092',
      'connector.properties.group.id' = 'sink_topic_yxx_es_ndc',
      'format.type' = 'ndc',
      'format.derive-schema' = 'true'
    );
  • format=ndc-hub (推荐使用该方式)

    CREATE TABLE source (
      ndc_seqNo_ BIGINT,
      timestamp_ BIGINT,
      ndc_type_ STRING,
      id INT,
      id_before_ INT
    ) WITH(
      'connector.type' = 'ndc-kafka',
      'format.type' = 'ndc-dbhub',
      'format.filter' = '.*testTable',
      'ndc.project' = 'Mammut-test_bdms',
      'ndc.source' = 'mysql165',
      'ndc.source.table' = 'test.qa_all_field_mysql',
      'group.id' = 'hh'
    );

Flink1.12(推荐)

format=ndc-hub

  • DDL任务
    CREATE TABLE source (
      ndc_seqNo_ BIGINT,
      timestamp_ BIGINT,
      ndc_type_ STRING,
      id INT,
      id_before_ INT
    ) WITH(
    'connector' = 'kafka',
    'properties.bootstrap.servers' = '',
    'format' = 'ndc-dbhub',
    'properties.group.id' = 'hh',
    'ndc-dbhub.filter' = '*table',
    );
  • metahub任务
    --序列化格式,统一是ndc-dbhub
    SET  '源表名称.format'= 'ndc-dbhub'
    SET  '源表名称.connector.type' = 'ndc-kafka' 
    --以下参数根据实际需要设置即可,元数据方式只需要设置上面两个参数即可,其他参数使用默认的源表注册参数,如无必要可不调整。
    --过滤筛选需要消费的kafka的表名
    SET 'kafka_pallet_task.ndc-dbhub.filter' = '.*t_pallet_task';
    --消费组名字可以随意指定,不和其他消费组重复即可
    SET 'kafka_pallet_task.properties.group.id' = 't_pallet_task_alca_pro1';
    --kafka最新位置点开始消费
    SET 'kafka_pallet_task.scan.startup.mode' = 'latest-offset'; 
    SET 'kafka_pallet_task.properties.zookeeper.connect' = '****';
    SET 'kafka_pallet_task.properties.bootstrap.servers' = '****';
    SET 'kafka_pallet_task.primary.keys' = 'id';
    CREATE view view_pallet_task as select * from kafka_pallet_task ;