MongoDB 连接器

维表

  • DDL
-- source
create table source_collection (
  id int,
  name varchar,
  proctime as proctime()
) with (
    'connector' = 'memory',
    'table.name' = 'source_collection'
  );

CREATE TABLE mongo_source
(
  _id BIGINT,
  f1 STRING,
  f2 BOOLEAN,
  f3 BINARY,
  f4 TINYINT,
  f5 SMALLINT,
  f6 INTEGER,
  f7 TIMESTAMP_LTZ(6),
  f8 TIMESTAMP_LTZ(3),
  f9 FLOAT,
  f10 DOUBLE,
  f11 DECIMAL(10, 2),
  f12 MAP<STRING, INTEGER>,
  f13 ROW<k INTEGER>,
  f14 ARRAY<STRING>,
  f15 ARRAY<ROW<k STRING>>
) WITH (
'connector' = 'mongodb',
'uri'='mongodb://user:password@localhost:53334',
'database'='test_db',
'collection'='dim_collection',
'lookup.cache.type'= 'lru',
'lookup.max-retries'='4',
'lookup.cache.max-rows'='1000',
'lookup.cache.ttl'='60000',
'lookup.cache.metric.enable'='true'
);

set 'sink_mongo.primary.keys'='_id';

-- yxx_local_mongodb 是登记好的 MongoDB 数据源
insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT S.id, S.name, D._id, D.f1, D.f2 FROM source_collection
        AS S JOIN mongo_source for system_time as of S.proctime AS D ON S.id = D._id
;
  • 元数据方式

需在猛犸平台登记 MongoDB 数据源

-- source
create table source_collection (
  id int,
  name varchar,
  proctime as proctime()
) with (
    'connector' = 'memory',
    'table.name' = 'source_collection'
  );

create view vi as select *, proctime() as proctime from  yxx_local_mongodb.test_db.dim_collection;

insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT S.id, S.name, D._id, D.f1, D.f2 FROM source_collection
        AS S JOIN vi for system_time as of S.proctime AS D ON S.id = D._id

SINK 表

  • DDL

    -- sink
    CREATE TABLE sink_target_collection (
      _id Integer,
      name String,
      primary key (_id) not enforced
    ) WITH (
      'connector' = 'mongodb',
      'uri'='mongodb://user:password@localhost:53334',
      'database'='test_db',
      'collection'='sink_target_collection',
     'sink.transaction.enable'='true',
     'sink.flush.on-checkpoint'='false',
     'sink.flush.size'='1000',
     'sink.bulk-flush.max-actions'='1000',
     'sink.parallelism'='1'
    );
  • 元数据方式

set 'sink_mongo.primary.keys'='_id';

-- yxx_local_mongodb 是登记好的 MongoDB 数据源
insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT id as _id, name, age FROM source_collection;

With 参数 / set 参数

  • 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version
参数 注释说明 备注
connector source 数据源 必填:mongoDB
uri 数据库链接地址 mongoDB 链接地址,必填
database source 数据库 mongoDB database,必填
collection mongoDB collection mongoDB Collection 必填
sink.transaction.enable 是否开启sink exactly-once 机制 选填。默认为false
sink.flush.on-checkpoint 是否开启在checkpoint snapshot执行时保存数据 选填。默认为false
sink.flush.size flush size 整数,选填。默认为1000
sink.bulk-flush.max-actions bulk flush 最大条数 整数,选填。默认为1000
sink.parallelism sink并行度 整数,选填。无默认值
lookup.cache.type 作为维表时查询的缓存策略 选填。不区分大小写,all/none/lru。默认none
lookup.max-retries 作为维表时查询的缓存最大失败重试次数 选填。默认3
lookup.cache.ttl 作为维表时查询的缓存ttl 选填。默认常驻,不会失效,单位毫秒
lookup.cache.max-rows 作为维表时查询的缓存最大行数 选填。默认10000
lookup.cache.metric.enable 是否开启缓存监控。
监控指标有:
XXX.cache.hit-rate:命中率=hit-count/(hit-count+miss-count);
XXX.cache.hit-count:命中数;
XXX.cache.miss-count:未命中数,=加载成功数+加载异常数;
XXX.cache.average-load-cost:每条记录平均耗时,单位ms,total-load-time/(load-success-count+load-exception-count);
XXX.cache.load-success-count:缓存加载成功数;
XXX.cache.load-exception-count:缓存加载异常数,外部数据库未匹配到join key;
XXX.cache.load-exception-rate:缓存加载异常率,当异常率很高时建议开启缓存空值;
XXX.cache.total-load-time:总的缓存加载耗时,单位s;
XXX.cache.cache-record-count:缓存的记录数
选填。默认false