MongoDB SQL 连接器
更新时间: 2024-01-18 15:53:19
阅读 294
MongoDB 连接器
FLINK-1.14
维表
- DDL
-- source
create table source_collection (
id int,
name varchar,
proctime as proctime()
) with (
'connector' = 'memory',
'table.name' = 'source_collection'
);
CREATE TABLE mongo_source
(
_id BIGINT,
f1 STRING,
f2 BOOLEAN,
f3 BINARY,
f4 TINYINT,
f5 SMALLINT,
f6 INTEGER,
f7 TIMESTAMP_LTZ(6),
f8 TIMESTAMP_LTZ(3),
f9 FLOAT,
f10 DOUBLE,
f11 DECIMAL(10, 2),
f12 MAP<STRING, INTEGER>,
f13 ROW<k INTEGER>,
f14 ARRAY<STRING>,
f15 ARRAY<ROW<k STRING>>
) WITH (
'connector' = 'mongodb',
'uri'='mongodb://user:password@localhost:53334',
'database'='test_db',
'collection'='dim_collection',
'lookup.cache.type'= 'lru',
'lookup.max-retries'='4',
'lookup.cache.max-rows'='1000',
'lookup.cache.ttl'='60000',
'lookup.cache.metric.enable'='true'
);
set 'sink_mongo.primary.keys'='_id';
-- yxx_local_mongodb 是登记好的 MongoDB 数据源
insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT S.id, S.name, D._id, D.f1, D.f2 FROM source_collection
AS S JOIN mongo_source for system_time as of S.proctime AS D ON S.id = D._id
;
- 元数据方式
需在猛犸平台登记 MongoDB 数据源
-- source
create table source_collection (
id int,
name varchar,
proctime as proctime()
) with (
'connector' = 'memory',
'table.name' = 'source_collection'
);
create view vi as select *, proctime() as proctime from yxx_local_mongodb.test_db.dim_collection;
insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT S.id, S.name, D._id, D.f1, D.f2 FROM source_collection
AS S JOIN vi for system_time as of S.proctime AS D ON S.id = D._id
SINK 表
DDL
-- sink CREATE TABLE sink_target_collection ( _id Integer, name String, primary key (_id) not enforced ) WITH ( 'connector' = 'mongodb', 'uri'='mongodb://user:password@localhost:53334', 'database'='test_db', 'collection'='sink_target_collection', 'sink.transaction.enable'='true', 'sink.flush.on-checkpoint'='false', 'sink.flush.size'='1000', 'sink.bulk-flush.max-actions'='1000', 'sink.parallelism'='1' );
元数据方式
set 'sink_mongo.primary.keys'='_id';
-- yxx_local_mongodb 是登记好的 MongoDB 数据源
insert into yxx_local_mongodb.test_db.sink_target_collection
/*+ OPTIONS( 'sink.transaction.enable'='true'
,'sink.flush.on-checkpoint'='false',
'sink.flush.size'='1000',
'sink.bulk-flush.max-actions'='1000',
'sink.parallelism'='1') */
SELECT id as _id, name, age FROM source_collection;
With 参数 / set 参数
- 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version
参数 | 注释说明 | 备注 |
---|---|---|
connector | source 数据源 | 必填:mongoDB |
uri | 数据库链接地址 | mongoDB 链接地址,必填 |
database | source 数据库 | mongoDB database,必填 |
collection | mongoDB collection | mongoDB Collection 必填 |
sink.transaction.enable | 是否开启sink exactly-once 机制 | 选填。默认为false |
sink.flush.on-checkpoint | 是否开启在checkpoint snapshot执行时保存数据 | 选填。默认为false |
sink.flush.size | flush size | 整数,选填。默认为1000 |
sink.bulk-flush.max-actions | bulk flush 最大条数 | 整数,选填。默认为1000 |
sink.parallelism | sink并行度 | 整数,选填。无默认值 |
lookup.cache.type | 作为维表时查询的缓存策略 | 选填。不区分大小写,all/none/lru。默认none |
lookup.max-retries | 作为维表时查询的缓存最大失败重试次数 | 选填。默认3 |
lookup.cache.ttl | 作为维表时查询的缓存ttl | 选填。默认常驻,不会失效,单位毫秒 |
lookup.cache.max-rows | 作为维表时查询的缓存最大行数 | 选填。默认10000 |
lookup.cache.metric.enable | 是否开启缓存监控。 监控指标有: XXX.cache.hit-rate:命中率=hit-count/(hit-count+miss-count); XXX.cache.hit-count:命中数; XXX.cache.miss-count:未命中数,=加载成功数+加载异常数; XXX.cache.average-load-cost:每条记录平均耗时,单位ms,total-load-time/(load-success-count+load-exception-count); XXX.cache.load-success-count:缓存加载成功数; XXX.cache.load-exception-count:缓存加载异常数,外部数据库未匹配到join key; XXX.cache.load-exception-rate:缓存加载异常率,当异常率很高时建议开启缓存空值; XXX.cache.total-load-time:总的缓存加载耗时,单位s; XXX.cache.cache-record-count:缓存的记录数 |
选填。默认false |
文档反馈
以上内容对您是否有帮助?