数据传输OpenAPI
此手册用于记录大数据开发与管理平台中,数据传输产品所有对外开放的OpenAPI,阅读此手册,你将了解数据传输产品开放的OpenAPI能力与调用方法。
一、 数据传输公共参数
1.1 响应格式
| 名称 | 类型 | 描述 |
|---|---|---|
| code | Number | 响应码 |
| reqId | String | 请求ID |
| cost | Number | 耗时,单位:ms |
| msg | String | 响应消息 |
| result | Object | 响应结果 |
1.2 响应码
数据传输以4位中的首位区分功能模块,第二位区分业务子模块,后面两位递增,如不足则顺延。
1.2.1 1000~1999 数据源校验异常码
| 响应码 | 说明 |
|---|---|
| 1001 | 数据源被引用 |
| 1002 | 数据源URL非法平台模块查询异常 |
| 1003 | 数据源名称已存在 |
| 1004 | 数据源已存在(url,user) |
| 1005 | 数据源不存在 |
| 1006 | 数据源不支持读 |
| 1007 | 数据源不支持写 |
| 1101 | 表不存在 |
| 1102 | 同义词不是表 |
| 1103 | 同义词没有读权限 |
| 1200 | SQL解析异常 |
| 1201 | hql不能包含db |
| 1202 | hql语法错误 |
| 1203 | sql的表名解析异常 |
1.2.2 2000~2999 离线任务异常码
| 响应码 | 说明 |
|---|---|
| 2001 | 任务已被引用 |
| 2002 | 任务名称已存在 |
| 2003 | 任务不存在 |
| 2004 | 任务已提交 |
| 2005 | 任务不存在实例, 请先运行任务 |
| 2006 | 任务Reader写库异常 |
| 2007 | 任务Writer写库异常 |
| 2201 | 校验脱敏规则是否存在 |
1.2.3 3000~3999 实时任务异常码
1.2.4 4000~5999 其他子模块业务异常码
| 响应码 | 说明 |
|---|---|
| 4000 | 统一错误跳转web页面,任务权限校验失败code |
| 4001 | 统一错误浮窗提示 |
1.2.5 6000~7999 预留业务异常码
1.2.6 8000~9999 调用其他子产品异常码
| 响应码 | 说明 |
|---|---|
| 8001 | 单点认证失败 |
| 8002 | ACC登录异常 |
| 8100 | 元数据中心异常 |
| 8101 | 任务血缘发送异常 |
| 8202 | Azkaban请求错误 |
| 8203 | Azkaban响应错误 |
| 8204 | AZKABAN连接错误 |
| 8400 | 离线开发平台异常 |
| 8401 | 离线开发平台的任务名称已存在 |
| 8402 | 离线任务来源表名称重复 |
| 8403 | 离线开发任务与数据同步节点同名,造成环路异常 |
| 8600 | 安全中心异常 |
| 8700 | sloth异常 |
| 8701 | sloth校验语法异常 |
1.3 枚举列表
1.3.1 DataSourceTypeEnum
数据源类型枚举
| 枚举值 | 类型 | 枚举code | 名称 |
|---|---|---|---|
| mysql | String | 1 | mysql |
| hive | String | 2 | Hive |
| ddb | String | 3 | 网易自研数据库DDB |
| ddbqs | String | 4 | 网易自研数据库DDB的qs版本 |
| oracle | String | 5 | Oracle |
| hbase | String | 6 | Hbase |
| kudu | String | 7 | Kudu |
| kafka | String | 8 | Kafka |
| rocketmq | String | 9 | RocketMq |
| mongodb | String | 10 | MongoDB |
| es | String | 11 | elasticsearch |
| api | String | 12 | Http api数据源 |
| hdfs | String | 13 | HDFS |
| greenplum | String | 14 | Greenplum |
| db2 | String | 15 | DB2 |
| hana | String | 16 | Hana |
| dm | String | 17 | 达梦数据库 |
| sqlserver | String | 18 | Sqlserver |
| postgresql | String | 19 | PostgreSQL |
| ftp | String | 20 | Ftp |
| clickhouse | String | 21 | ClickHouse |
| tidb | String | 22 | TiDB |
| doris | String | 23 | Doris |
| vertica | String | 24 | Vertica |
| cassandra | String | 25 | Cassandra |
| phoenix | String | 26 | phoenix |
| localfile | String | 27 | 本地文件数据源 |
| maxcompute | String | 28 | 阿里maxcompute |
| redis | String | 29 | Redis |
1.3.2 HiveType
| 枚举 | 类型 | 描述 |
|---|---|---|
| mammut | String | 中台内部hive,枚举为空也为此值 |
| hiveExt | String | 外部hive数据源 |
1.3.3 TableNameType
| 枚举 | 类型 | 描述 |
|---|---|---|
| normal | String | 库表选择 |
| regular | String | 正则匹配 |
| synonym | String | 同义词 |
1.3.4 DatasourceMode
| 枚举 | 类型 | 描述 |
|---|---|---|
| logical | String | 逻辑数据源 |
| physical | String | 物理数据源 |
1.4 参数实体
1.4.1 Reader
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| conf | Map<String, String> | 读取高级配置 | |
| setting |
Map<String,Object> |
读取功能参数 | 由于数据源太多,且特性各异,见第三章功能参数说明 |
| datasources | List<Datasource> |
数据源信息 | 不支持多库表的数据源,size=1 Mysql,Oracle,Sqlserver支持多库表 |
1.4.2 Writer
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| conf | List<Map<String, String>> | 写入高级配置 | |
| setting | Map<string,Object> | 写入功能参数 | 由于数据源太多,且特性各异,见第三章读写功能参数说明 |
| datasource | Datasource | 数据源信息 | 具体详见数据源库表参数说明 |
1.4.3 Datasource
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| datasourceId | Long | 数据源ID | 元数据中心登记参数 |
| db | String | 数据库名称 | |
| table |
List<String> |
表名称 | 不支持多库表的数据源,size=1,Mysql,Oracle,Sqlserver支持多库表 去向数据源时,size=1 |
| tableNameType | TableNameType | 表名称规则类型 | 见TableNameType枚举 |
| hiveType | HiveType | hive的类型 | 见HiveType枚举 只有hive数据源支持 |
| datasourceMode | DatasourceMode | 枚举,逻辑数据源,物理数据源 | 无 |
1.4.4 Handler
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| map | List<ColumnMap> | 列映射列表 | 无 |
1.4.5 ColumnMap
这里包含了很多参数,有些参数是特定场景下使用,具体见[4.2 ColumnMap的构造]
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| columnSelectValue | String | 只用于Hbase中 | 见Hbase实践说明 |
| oldName | String | 来源表字段名称 | |
| oldSourceType | String | 来源表字段类型 | 值="$CUSTOM_EXPR$"时,oldName为自定义表示 非结构化数据源,Hbase,HDFS,FTP,使用此类型 |
| oldType |
String |
来源表字段类型 | 用于实时任务 |
| comment |
String | 来源字段注释 |
|
| oldExpression | String | 来源表字段自定义表达式 | |
| newColumnSelectValue | String | 只用于Hbase中 | 见Hbase实践说明 |
| newName | String | 去向表字段名称 | |
| newSourceType | String | 去向表字段类型 | 只用于web展示 |
| newType | String | 去向表字段类型 | 实时任务 |
| newComment | String | 去向字段注释 | 无 |
1.5 响应实体
二、OpenAPI列表
2.1 OpenAPI总览
| 模块 | 名称 | 支持版本 | 状态 | 请求方法 | 请求路径 |
|---|---|---|---|---|---|
| 离线任务管理 | 任务创建 | v2.7.0 | 已发布 | POST | /task/v2/add |
| 实时任务管理 | 任务启动 | v3.10.0 | POST | /stream/v2/job/exec | |
| 实时任务管理 | 任务停止 | v3.10.0 | POST | /stream/v2/job/stop |
2.2 离线任务管理OpenAPI
2.2.1 任务创建
POST /api/openapi/task/v2/add
产品版本:v2.5.0
描述:创建任务
入参
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| product | String | 项目名称 | |
| clusterId | String | 集群Id | |
| user | String | 用户负责人邮箱 | |
| name |
String | 任务名称 | 256字符长度 mysql可识别字符 |
| taskId |
String | 任务Id |
非必须参数,不填由server生成 <64字符 整型字符串 |
| description | String | 任务描述 | 1024字符 |
| conf | Map<String,String> | 高级配置 | 任务级别的高级配置 |
| readerType | DataSourceTypeEnum | 读取数据源 | |
| reader | Reader | 读取实体 | |
| writerType | DataSourceTypeEnum | 写入数据源 | |
| writer | Writer | 写入实体 | |
| paramSetIds | List<Long> |
参数组 | 离线开发平台登记参数组,传输使用参数组ID |
| handlers | List<Handler> | 列处理器 | openAPI 只支持列映射处理器 size=1 见Handler说明 |
| columns |
List<String> |
同名列映射处理 server端根据List<String> 和去向端的字段,自动同名映射生成handlers |
与handlers 只可存其一,以columns 优先 参见最佳实践4.1的说明 |
| setting | Map | 任务功能参数 | 见3.1节功能参数说明 |
出参: String 任务ID
示例 Mysql2Mysql任务:
入参示例:
{
"email": "tangjiafu@corp.netease.com",
"product": "data\_transform",
"clusterId": "dev4",
"conf": {},
"name": "参数校验mysql2mysql\_1",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"datasources": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"conf": {},
"setting": {
"postSQL": \[\],
"datasourceId": 2727,
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newType": "bigint",
"newComment": "主键",
"oldType": "bigint",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newType": "string",
"newComment": "姓名",
"oldType": "string",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
},
{
"newType": "string",
"newComment": "手机号码",
"oldType": "string",
"newSourceType": "VARCHAR(16)",
"newName": "phone",
"oldSourceType": "VARCHAR(16)",
"oldName": "phone",
"comment": "手机号码"
},
{
"newType": "timestamp",
"newComment": "时间1",
"oldType": "timestamp",
"newSourceType": "TIMESTAMP(0)",
"newName": "time\_1",
"oldSourceType": "TIMESTAMP(0)",
"oldName": "time\_1",
"comment": "时间1"
}
\]
}
\]
}
出参:
| 参数 | 参数类型 | 描述 |
|---|---|---|
| code | Number | 响应码 |
| reqId | String | 请求ID |
| cost | Number | 耗时,单位:ms |
| msg | String | 响应消息 |
| result | String | 任务Id |
1649397381105018为成功创建任务的ID
{
"code": 200,
"message": "success",
"result": "1649397381105018",
"reqId": "0773215919a348d48c435df103e606fe",
"cost": 550
}
2.3 实时任务管理
2.3.1 启动任务
POST /api/openapi/stream/v2/job/exec
产品版本:v3.10.0
描述:启动任务
入参
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| user | User | 用户信息 | |
| request | JobRunConfigReqDTO | 请求信息 | 无 |
User
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| product | String | 项目名称 | |
| clusterId | String | 集群id | |
| user | String | 用户负责人邮箱 | 无 |
JobRunConfigReqDTO
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| taskId | String | 任务id | |
| runConfig | RunConfig | 运行参数 | |
| checkPointConfig | CheckPointConfig | cp参数 | |
| restartStrategy | RestartStrategy | 重启策略 | |
| advancedConfig | List<Pair> | 高级参数 | |
| customConfig | List<Pair> | 自定义参数 | 无 |
RunConfig
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| yarnQueue | Integer | 实例id | |
| slots | Integer | slot数量 | |
| tmMemory | Integer | TM内存,到位MB | |
| jmMemory | Integer | JM内存,单位MB | |
| parallelism | Integer | 并行度 | 无 |
CheckPointConfig
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| cpInterval | Integer | cp间隔时间,单位:秒 | |
| cpMode | String | 模式,可选值:EXACTLY_ONCE,AT_LEAST_ONCE | |
| cpTimeout | Integer | 超时时间,单位:秒 | |
| cpSwitch | Boolean | 是否开启checkpoint | |
| checkPointPath | String | cp或sp启动时,状态文件的路径 | |
| checkPointType | String | 启动位点类型,可选值:cp, sp, none, custom | cp: 从cp启动; sp:从sp启动 none:直接启动 custom:自定义位点启动 |
RestartStrategy
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| strategy | String | 重启策略,可选值:failure-rate, fixed-delay | failure-rate: 失败率重启, fixed-delay: 固定间隔重启 |
| restartFailedCount | Integer | 失败重启次数 | |
| restartTimeInterval | Integer | 重试时间间隔, 单位:秒 | 无 |
Pair
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| key | String | 属性的键 | |
| value | String | 属性的值 | 无 |
出参:
| 参数 | 参数类型 | 描述 |
|---|---|---|
| code | Number | 响应码 |
| reqId | String | 请求ID |
| cost | Number | 耗时,单位:ms |
| msg | String | 响应消息 |
| result | RunJobRespDTO | 停止作业响应 |
JobRunRespDTO
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| execId | Integer | 实例id | 无 |
2.3.2 停止任务
POST /api/openapi/stream/v2/job/stop
产品版本:v3.10.0
描述:启动任务
入参
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| user | User | 用户信息 | |
| request | StopJobReqDTO | 请求信息 | 无 |
User
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| product | String | 项目名称 | |
| clusterId | String | 集群Id | |
| user | String | 用户负责人邮箱 | 无 |
StopJobReqDTO
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| taskId | String | 任务Id | |
| savepoint | Boolean | 是否保存savepoint | 无 |
出参:
| 参数 | 参数类型 | 描述 |
|---|---|---|
| code | Number | 响应码 |
| reqId | String | 请求ID |
| cost | Number | 耗时,单位:ms |
| msg | String | 响应消息 |
| result | StopJobRespDTO | 停止作业响应 |
StopJobRespDTO
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| stopSuccess | Boolean | 是否停止成功 | 无 |
三、数据源参数说明
当前的setting根据功能各有差异,有些功能需要多个参数组合使用,主要分为特殊字符替换替换,并发读取,流量控制等;
3.1节列举了一些任务级别的功能参数。
3.2节列举Reader和Writer setting里的一些常规参数字典,另外有一些专用于固定一个数据源的,具体查看数据源。
tips:
非必选参数应该为null或者不填,比如没有分区,比不是"partition":[]
读写支持
| 数据源 | 读 | 写 |
|---|---|---|
| Mysql | Y | Y |
| SQLserver | Y | Y |
| Oracle | Y | Y |
| PostgreSQL | Y | Y |
| TiDB | Y | Y |
| Hive | Y | Y |
| Greenplum | Y | Y |
| Doris | Y | Y |
| Hbase | Y | Y |
| ElasticSearch | Y | Y |
| API | Y | |
| ClickHouse | Y | Y |
| FTP | Y | Y |
| HDFS | Y | Y |
| Kafka | Y | |
| Kudu | Y | Y |
| Mongo | Y | Y |
| StarRocks | Y | Y |
3.1 Task功能参数说明
task功能参数指针对该任务生效的参数组合,大多数任务全面支持。
3.1.1 流量控制
{
// 省略了其他任务参数
"setting": {
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
上述参数,在task-json中的setting里面增加flowControl键表示流量控制,指为json,参数说明如下:
| 参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
|---|---|---|---|---|
| enable | 否 | 布尔 | false | 是否开启流量控制,开启时,type,value必须 |
| type | 否 | 字符串 | null | 流量控制类型枚举: size:流量控制按照字节数控制,单位MB count: 流量控制按照行数控制 |
| value | 否 | 整数 | null | 流量控制限速值 |
3.1.2 脏数据管理参数
脏数据管理是为了解决数据一旦出现错误,任务即会中断的场景,从而使得容许一定数据量的脏数据,并把脏数据存储到另外一张表中
{
"setting": {
"saveMessyData": false,
"saveConfig": {
"datasource": {
"name": "",
"type": "mysql",
"datasourceId": 0,
"db": "",
"table": \[
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
}
}
| 参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
|---|---|---|---|---|
| saveMessyData | 是 | 布尔 | false | 脏数据保存开关 |
| tolerateMessyData | 是 | 字符串 | true | 脏数据容忍开关 |
| maxRejectedCount | 否 | 整数 | 0 | 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。 |
| failedThreshold | 否 | 浮点数 | 0.0 | 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。 |
| saveConfig | 否 |
Object | null | 当开启脏数据容忍是,必须参数 脏数据保存配置参数配置 |
| datasource | 是 | Object | null | 脏数据数据源配置 saveMessyData !=null时为必须参数 |
| name | 是 | 字符串 | null | 数据源名称 |
| type | 是 | 字符串 | null | 数据源类型 |
| datasourceId | 是 | 整型 | null | 数据源id |
| db | 是 | 字符串 | null | 数据源数据库名称 |
| table | 是 | 字符串列表 | null | 数据源表名, size=1,多个表只取第一个 |
3.1.3 来源表结构变化策略
{
"setting": {
"alterTableStrategy": {
"lastTable": \[
\],
"columnAlterStrategy": 1
}
}
}
alterTableStrategy参数如下
| 参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
|---|---|---|---|---|
| lastTable | 否 | 字符串列表 | 空列表 | 上一次修改任务(创建任务)来源表的表结构列名列表 |
| columnAlterStrategy | 否 | 整型枚举 | 1 | 表结构策略 1:忽略表结构变更策略 2:变更表结构并变更映射关系 3:抛异常 |
3.1.4 脱敏支持
{
"setting": {
"maskScanSetting":{
"scanResult":\[
{
"maskName":"邮编遮盖脱敏",
"column":"name"
},
{
"maskName":"银行卡遮盖脱敏",
"column":"age"
}
\]
}
}
}
maskScanSetting参数
| 参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
|---|---|---|---|---|
| scanResult | 必须 | List<ScanResultDTO> | 无 | 手动指定脱敏列名及其脱敏规则 |
ScanResultDTO参数如下
| 参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
|---|---|---|---|---|
| maskName | 必须 | String | 无 | 脱敏规则名称 |
| column | 必须 | String | 无 | 脱敏列名 |
3.2 读写功能参数说明
读写功能参数指针对读取或者写入时的功能参数设置
3.1.1 一般参数字典
一般参数字典指大多数数据源包含这些参数,在此说明,供开发者查阅
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| customSql | String | SQL模式任务的sql | 仅mysql,sqlserver,oracle,hana支持 |
| conditions | string |
数据过滤 where之后的sql |
和流水型任务参数互斥 |
| transformType | String | 传输类型 | 见枚举 |
| insertType | String | 写入规则类型 | 根据数据源而定有不同枚举 |
| postSQL | List<String> | 写入开始前SQL | 至多5条sql |
| preSQL | List<String> | 写入结束后SQL | 至多5条sql |
3.1.2 特殊字符替换
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| enableStrReplace | boolean | 开启字符串替换 | |
| originalStrs | String | 原始字符串 | |
| replacedStr | String | 替换字符串 | 无 |
3.1.3 并发读取
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| forbiddenSplit | boolean | 是否禁用并发读取 | 与是否开启含义相反 |
| split | String | 切分键(字段) | 要求为主键 类型为 Integer 或TimeStamp |
| partitionNum | String | 并发度 | 无 |
3.1.4 流水型任务
流水型任务与非组合参数中数据过滤参数(conditions)互斥
| 参数 | 参数类型 | 描述 | 约束与补充 |
|---|---|---|---|
| initialValue | String | 流水型起始值 | |
| column | String | 流水型字段名称 | |
| columnSourceType | String | 流水型字段类型 | 用于web页面展示 用户sqlserver的特殊处理,值为timestamp时,使用转换函数 |
| columnDataType | Integer | 流水型字段类型,任务运行时使用 参考jdbc的枚举code值 java.sql.Types |
支持整型和timestamp spark类型的枚举值 |
3.3 具体数据源特性参数
3.2.1 Mysql
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) enableStrReplace originalStrs replacedStr |
否 | 查看前文参数字典 | |
| 并发读取(3个参数) forbiddenSplit split partitionNum |
否 | 查看前文参数字典 | |
| 流水型任务(3个参数) initialValue column columnSourceType columnDataType |
否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| conditions | 否 | String | where 条件 |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 否 | 枚举 | 写入规则类型(对于mysql语法) into插入 ignore忽略主键冲突 overwrite覆写 insertonduplicatekeyupdate 插入时主键冲突更新 |
| postSQL | 否 | List<String> | 写入后执行sql |
| preSQL | 否 | List<String> | 写入结束后SQL |
任务Mysql2Mysql
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"conf": {
"1": "2"
},
"name": "参数校验mysql2mysql",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
"key1": "value1",
"key2": "value2"
},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"initialValue": 0,
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"column": "",
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"conf": {},
"datasource": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"setting": {
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\],
"postSQL": \[\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
\]
}
3.2.2 SQLserver
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| 并发读取(3个参数) | 否 | 查看前文参数字典 | |
| 流水型任务(3个参数) | 否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| conditions | 否 | String | where 条件 |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\]
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 否 | 枚举 | 写入规则类型 into插入 |
| postSQL | 否 | List<String> | 写入后执行sql |
| preSQL | 否 | List<String> | 写入结束后SQL |
3.2.3 Oracle
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| 并发读取(3个参数) | 否 | 查看前文参数字典 | |
| 流水型任务(3个参数) | 否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| conditions | 否 | String | where 条件 |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 否 | 枚举 | 写入规则类型 into |
| postSQL | 否 | List<String> | 写入后执行sql |
| preSQL | 否 | List<String> | 写入结束后SQL |
3.2.4 PostgreSQL
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| 并发读取(3个参数) | 否 | 查看前文参数字典 | |
| 流水型任务(3个参数) | 否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\]
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 否 | 枚举 | 写入规则类型 into插入 |
| postSQL | 否 | List<String> | 写入后执行sql |
| preSQL | 否 | List<String> | 写入结束后SQL |
3.2.5 TiDB
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[""\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| 并发读取(3个参数) | 否 | 查看前文参数字典 | |
| 流水型任务(3个参数) | 否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
}
}
参数说明
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 否 | 枚举 | 写入规则类型 into upsert主键冲突是覆写 ignore主键冲突时忽略 |
| postSQL | 否 | List<String> | 写入后执行sql |
| preSQL | 否 | List<String> | 写入结束后SQL |
3.2.6 Hive
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"conf": {},
"setting": {
"conditions": "",
"readMode": "",
"partitions": \[
{
"value": ""
}
\],
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| readMode | 是 | 枚举 | hive 读取方式 copy:hadoop distcp文件拷贝 impala:impala读取 jdbc: hiveserver2读取 spark: spark读取 |
| partitions | 否 | 对象 | 分区信息 readMode=copy时支持 |
| conditions | 否 | String | 过滤条件 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"setting": {
"insertType": "",
"partitionList": \[
{
"key": "",
"partitionType": "",
"value": "",
"valueType": ""
}
\]
}
}| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 是 | 枚举 | overwrite 先删除表中数据,再重新写入 into 在表中追加数据 |
| partitions | 否 | Partition | 分区信息 readMode=copy时支持 |
| conditions | 否 | String | 过滤条件 |
Partition
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| key | 是 | String | 分区key |
| partitionType | 是 | 枚举 | static 静态分区 dynamic动态分区 |
| value | 是 | 分区值表达式 | 动态分区时,为列值 静态分区为常量值,支持azkaban表达式 |
3.2.7 Greenplum
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[""\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"partitionColumn": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
| 参数 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| partitionColumn | 否 | String | greenplum-spark-connector用于分区字段参数 表中整型字段 |
| conditions | 否 | String | 过滤条件 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| insertType | 是 | 枚举 | truncate清空表数据写入 append overwrite |
| postSQL | 否 | 见前文参数字典 | |
| preSQL | 否 | 见前文参数字典 |
3.2.8 Doris
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
| 参数名 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换 (三个参数) |
否 | 见参数字典描述 enableStrReplace originalStrs replacedStr |
|
| conditions | 否 | String | 过滤条件(where,分区参数) |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
"maxFilterRatio": "",
"writeMode": ""
}
}
参数描述
| 参数 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| postSQL | 否 | 见参数字典 | |
| preSQL | 否 | 见参数字典 | |
| insertType | 是 | 枚举 | |
| writeMode | 是 | 枚举 | 写入方式 brokerload streamload |
| loadInterval | 否 | 整型 | 导入间隔 writeMode=streamload时,为必填参数 |
| maxFilterRatio | 否 | 浮点数字符串 | 最大容忍比例 writeMode=brokerload时,为必填参数 |
3.2.9 Hbase
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"conf": {},
"setting": {
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"startRowKey": "",
"endRowKey": ""
}
}
| 参数名 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换 (三个参数) |
否 | 见参数字典描述 enableStrReplace originalStrs replacedStr |
|
| conditions | 否 | String | 过滤条件(where,分区参数) |
| startRowKey | 否 | String | hbase 起始rowKey |
| endRowKey | 否 | String | hbase 终止rowKey |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"writeType": "",
"dfsNameNodeRpcAddress": "",
"dfsNameServices": ""
}
}
参数描述
| 参数 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| writeType | 是 | 枚举 | 写入方式 bulkLoad put |
| dfsNameNodeRpcAddress | 否 | String | HFile所在HDFS的schema writeType=bulkLoad启用 |
| dfsNameServices | 否 | String | nameNode rpc地址,多个以逗号分隔 writeType=bulkLoad启用 |
3.2.10 ElasticSearch
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
| 参数名 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(三个参数) | 否 | 见参数字典描述 |
|
| conditions | 否 | String | 过滤条件(where,分区参数) |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"insertType": "",
"batchSize": 0,
"mappingId": "",
"indexType" :""
}
}
参数描述
| 参数 | 是否必须 | 参数类型 | 语义和约束 |
|---|---|---|---|
| batchSize | 是 | Long | batchSize es批量写入数据量 |
| mappingId | 是 | String | es 主键字段 |
| insertType | 是 | 枚举 | index (default) 根据ES主键字段新增数据,如果已经存在就替换并重建索引 create 根据ES主键字段添加新的数据,如果数据已经存在就报错 update 根据ES主键字段更新数据,如果数据不存在就报错 upsert 根据ES主键字段数据存在就更新,不存在就新增 |
| indexType | 是 | 枚举 | 索引类型: dynamic静态索引 static动态索引 |
3.2.11 API
Reader Template
"reader": {
"conf": {},
"datasources": \[
{
"datasourceId": 3220,
"db": "default",
"table": \[
"/hello"
\]
}
\],
"setting": {
"headers": \[
{
"value": "header",
"key": "header"
}
\],
"method": "post",
"enableStrReplace": false,
"paging": {
"total": "id",
"type": "offset"
},
"body": "{\\n\\"body\\":\\"body\\"\\n}",
"transformType": "common",
"originalStrs": "",
"enablePaging": "true",
"replacedStr": ""
}
}
参数描述
特殊字符替换参数:enableStrReplace、originalStrs、replacedStr
| 参数名 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| body | N | String | post请求体 |
| params | N | String | get请求体,拼到url中 |
| headers | N | String | http header |
| method | Y | 枚举 | HTTP方法,枚举 get、post |
| enablePaging | Y | bool | 是否启用分页参数 |
| paging | N | 对象 | 分页参数 |
| total | N | String | 分页字段名称 具体使用jsonPath从返回Response中获取该值 |
| type | N | 枚举 | 分页类型,枚举 page、offset page启用两个参数进行分页:pageNum、pageSize offset启用两个参数进行分页:offset、limit |
3.2.12 ClickHouse
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"flowControl": "",
"transformType": "common",
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"forbiddenSplit": true,
"split": "",
"partitionNum": "",
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": ""
}
}
参数描述
| 参数类型 | 是否必须 | 参数类型 | 语义与约束 |
|---|---|---|---|
| 特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
| 并发读取(3个参数) | 否 | 查看前文参数字典 | |
| 流水型任务(3个参数) | 否 | 查看前文参数字典 | |
| customSql | 否 | String | sql模式的sql |
| conditions | 否 | String | where 条件 |
| transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| insertType | 是 | 写入模式,枚举 into |
| postSQL | 否 | 写入前sql |
| preSQL | 否 | 写入后sql |
3.2.13 FTP
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": ""
}
\],
"conf": {},
"setting": {
"fileFilter": true,
"filteredPath": {
"type": "",
"paths": \[
{
"fileName": "",
"fullPath": "",
"isFile": true,
"size": 0,
"timestamp": "",
"fileSize": 0,
"fileId": 0
}
\],
"conditions": \[
{
"type": "",
"value": "",
"operator": ""
}
\]
},
"docSplit": true,
"format": "",
"fieldSeparator": "",
"skipLines": "",
"sheetName": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"enableRowCheck": true,
"checkFilePath": "",
"checkContentSeparator": "",
"checkContentRowPosition": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| fileFilter | Y | 文件筛选,true:全部文件,false:部分文件 |
| filteredPath | N | 文件过滤条件,fileFilter=false时必须 |
| docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
| format | N | 文件格式 |
| fieldSeparator | N | 列分隔符 |
| skipLines | N | 跳过行数 |
| sheetName | N | 传输Excel时sheet名称 |
字符串替换功能参数
| enableStrReplace | N | 是否开启字符替换 |
|---|---|---|
| originalStrs | N | 原始str |
| replacedStr | N | 替换str |
行数校验功能参数
| enableRowCheck | N | 是否开启行数校验 |
|---|---|---|
| checkFilePath | N | 校验文件路径 |
| checkContentSeparator | N | 校验文件分隔符 |
| checkContentRowPosition | N | 校验数据的行数位置 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\]
},
"setting": {
"format": "",
"insertType": "",
"fileNameType": "",
"fileName": "",
"preAction": \[
{
"actionType": "",
"actionPath": "",
"actionNewPath": ""
}
\],
"postAction": \[
{
"actionType": "",
"actionPath": "",
"actionNewPath": ""
}
\],
"writeFlagFile": true
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| format | 否 | 文件格式,枚举 |
| docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
| insertType | 是 | 同名文件处理规则 into 覆盖 overwrite |
| fileNameType | 是 | 目标文件名类型 自定义custom 系统生成system |
| preAction | 否 | 前置处理,不处理actionType=none |
| postAction | 否 | 后置处理,不处理actionType=none |
前置处理、后置处理
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| actionType | 是 | 枚举 不处理、删除、重命名 none、rm、rename |
| actionPath | 否 | 操作文件路径 |
| actionNewPath | 否 | 重名名后的文件全路径 |
3.2.14 HDFS
Reader Template
{
"datasources": \[
{
"table": \[
""
\],
}
\],
"conf": {},
"setting": {
"conditions": "",
"docSplit": "",
"fieldSeparator": "",
"format": "",
"skipLines": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| fileFilter | Y | 文件筛选,true:全部文件,false:部分文件 |
| filteredPath | N | 文件过滤条件,fileFilter=false时必须 |
| docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
| format | N | 文件格式 |
| fieldSeparator | N | 列分隔符 |
| skipLines | N | 跳过行数 |
| sheetName | N | 传输Excel时sheet名称 |
Writer Template
{
"conf": {},
"datasource": {
"table": "",
},
"setting": {
"compressionCodec": "",
"fileName": "",
"fileNameType": "",
"format": "",
"insertType": "",
"writeFlagFile": true
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| format | 否 | 文件格式,枚举 |
| docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
| insertType | 是 | 同名文件处理规则 into 覆盖 overwrite |
| fileNameType | 是 | 目标文件名类型 自定义custom 系统生成system |
| preAction | 否 | 前置处理,不处理actionType=none |
| postAction | 否 | 后置处理,不处理actionType=none |
前置处理、后置处理
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| actionType | 是 | 枚举 不处理、删除、重命名 none、rm、rename |
| actionPath | 否 | 操作文件路径 |
| actionNewPath | 否 | 重名名后的文件全路径 |
3.2.15 Kafka
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
},
"setting": {
"key": "",
"producerConf": \[
{
"key": "",
"value": ""
}
\]
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| key | 是 | 写入kafka时,数据形式如下 {"key":key,"value":json} |
| producerConf | 否 | kafka的连接配置项 |
3.2.16 Kudu
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
},
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| conditions | 否 | 过滤条件(where,分区参数) |
字符串替换功能参数
| enableStrReplace | N | 是否开启字符替换 |
|---|---|---|
| originalStrs | N | 原始str |
| replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "default",
},
"setting": {
"insertType": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| insertType | 是 | 写入方式,枚举: into插入 overwrite覆写 |
3.2.17 Mongo
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| conditions | 否 | 过滤条件(where,分区参数) |
字符串替换功能参数
| enableStrReplace | N | 是否开启字符替换 |
|---|---|---|
| originalStrs | N | 原始str |
| replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"insertType": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| insertType | 是 | 写入方式,枚举: append追加 overwrite覆写 |
3.2.18 StartRocks
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
| 参数名 | 是否必须 | 语义与约束 |
|---|---|---|
| conditions | Y | 过滤条件(where,分区参数) |
字符串替换功能参数
| enableStrReplace | Y | 是否开启字符替换 |
|---|---|---|
| originalStrs | N | 原始str |
| replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
"demo"
\],
"version": "",
"tableNameType": "",
},
"setting": {
"loadInterval": 0,
"maxFilterRatio": "0.0",
"insertType": "into",
"brokerLoadBrokers":"",
"postSQL": \[\],
"preSQL": \[\]
"writeMode": "streamload"
}
}
参数描述
| 参数名 | 是否必须 | 类型 | 语义与约束 |
|---|---|---|---|
| postSQL | 否 | 见参数字典 | |
| preSQL | 否 | 见参数字典 | |
| writeMode | 是 | 枚举 | 写入方式 brokerload streamload |
| loadInterval | 否 | 整型 | 导入间隔 writeMode=streamload时,为必填参数 |
| maxFilterRatio | 否 | 浮点数字符串 | 最大容忍比例 writeMode=brokerload时,为必填参数 |
| insertType | 否 | 写入类型,枚举: into |
|
| brokerLoadBrokers | 否 | String | broker名称列表,以逗号分割 |
四、最佳实践
4.1 创建Mysql2Mysql任务
阅读此章节,请了解[OpenCreateTask]相关参数
4.1.1 构造Task层级参数
如下所示,一个不含有reader,writer以及 handlers/columns的json,但是readerType,writerType已经确定,表明该任务是Mysql2Mysql的任务;
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"name": "参数校验mysql2mysql\_2",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
// 读取高级配置项,键值对
},
"datasources": \[
// 读取数据源列表,只有部分数据源支持多个来源,参考数据源参数说明
\],
"setting": {
// 根据数据源而定,查阅Reader Template
}
},
"writerType": "mysql",
"writer": {
"conf": {
// 写入高级配置项
},
"datasource": {
// 写入数据源参数
},
"setting": {
// 写入配置项,根据数据源而定,查阅Writer Template
}
},
"handlers": \[
{
}
\],
"setting": {
"messyDataConfig": {
"saveMessyData": false,
// saveMessyData= false 表示,不需要
"saveConfig": {
"datasource": {
"name": "",
"type": "",
"datasourceId": 0,
"db": "",
"table": \[
""
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
},
"alterTableStrategy": {
"lastTable": \[\],
"columnAlterStrategy": 1
},
// 流量控制,查阅流量控制参数
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
4.1.2 构造reader层级
4.1.3 构造reader的datasources
查询1.4.3DataSource,根据元数据中心所登记的数据源信息,构造如下JSON参数
根据1.4.3DataSource参数描述,Mysql是支持多库表的,如果有多个来源库表,可以填写多个table和多个DataSources
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
4.1.4 构造reader的setting
查询3.2.1 知道Mysql支持哪些特性参数,非常容易可以构造如下参数,
值得注意的是,替换字符串的参数,需要注意是否转义,数据传输代码由java进行编码,不进行转义和反转义操作,这里的json中的"\\n",接收后是\n符号;
{
// 数据过滤
"conditions": "1=1",
// 传输类型为普通类型
"transformType": "common",
// 并发读取
"split": "id",
"forbiddenSplit": false,
"partitionNum": 100,
// 特殊字符替换
"enableStrReplace": true,
"replacedStr": "ss",
"originalStrs": "\\\\n,\\\\r,\\\\01"
}
4.1.5 构造writer
同理,可以构造writer,到目前为止构造出来的Task如下
{
"writer": {
"datasources": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"conf": {},
"setting": {
"postSQL": \[\],
"datasourceId": 2727,
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
}
}
4.1.6 构造handlers
由 1.4.1 可以知道 handers和columns为互斥参数,以columns为优先,[columns参数见4.3节]
handlers的构造,参照1.4.5和1.4.6去构造 columnHandler,columnHandler中的map表示列映射详情,具体参考1.4.6 ColumnMap进行映射:
Hbase由于其schema复杂性,进行了特殊处理,且暂时不支持columns进行列名映射,参见实践[ColumnMap],实践ColumnMap还提供了简化参数的方式。
{
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
4.2 ColumnMap的构造
ColumnHandler包含列映射构造,其核心为ColumnMap的构造,对于结构化数据源
只需要两个字段:newName和oldName,即reader的列名和writer的列名
比如一个Mysql2Mysql的任务 columnMap最终构造如下
"map": \[
{
"newName": "id",
"oldName": "id"
},
{
"newName": "name",
"oldName": "name"
},
{
"newName": "age",
"oldName": "age"
},
{
"newName": "remark",
"oldName": "remark"
},
{
"newName": "createAt",
"oldName": "createAt"
}
\]
但对于非结构化数据源,则需要定义reader的类型,而去向则不需要,因为去向如果是非结构化,写入到文件中,都会转为String类型;
Hbase因为其特殊性,包含了另外两个参数
在 columnSelectiveValue不为空时,有如下逻辑
if (StringUtils.equalsIgnoreCase(columnSelectiveValue, "rowkey")) {
oldName = "rowkey"
} else {
if (StringUtils.isNotEmpty(oldName)) {
oldName = oldName + ":" + mapColumnData.getOrElse("oldExpression", "")
}
}
上述逻辑,文字概括:对于rowkey列,columnSelectiveValue=rowkey
对于非rowkey列,oldName为列簇,oldExpression为列名
mysql2Hbase的ColumnMap
"map": [
{
"oldType": "INT",
"columnSelectValue": "rowKey",
"newName": "id",
"oldName": "0",
},
{
"oldType": "STRING",
"columnSelectValue": "0",
"newName": "name",
"oldName": "0",
"oldExpression": "name",
}
]
4.3 字段同名映射 columns参数
handers和columns为互斥参数,以columns为优先,columns只能进行同名映射。
columns表示需要导入的列,具体业务:
数据传输获取Reader和Writer的schema,进行同名映射,如果写入数据源是一个nonSchema的数据源,则只通过Reader的schema进行映射,为目标端构造schema
nonSchema数据源:EleasticSearch,MongoDB,Kafka,RocketMQ,Hbase,FTP,HDFS
{
"id",
"name"
}
上述columns,如果来源和去向都含有id,name字段,则构建两个字段的映射。
4.4 关于高级配置
高级配置分为三类,任务级别配置,来源配置(读取),去向配置(写入)。
ndi.spark,
ndi. 则表示为任务级别参数
spark常用的几个配置项
// executor内存
ndi.spark.spark-argument.executor-memory
// dirver内存
ndi.spark.spark-argument.driver-memory
// hive动态分区最大值
ndi.spark.spark-conf.spark.hadoop.hive.exec.max.dynamic.partitions
// 是否计算hive的统计信息写入到metastore
ndi.analyzeTableStatistics
spark-argument表示为spark-submit命令启动的参数,spark-conf则会添加上
--conf
4.5 任务创建实践
4.5.1 mysql2mysql
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"name": "参数校验mysql2mysql\_2",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
"key1": "value1",
"key2": "value2"
},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"initialValue": 0,
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"column": "",
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"conf": {},
"datasource": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"setting": {
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
\],
"setting": {
"messyDataConfig": {
"saveMessyData": false,
"saveConfig": {
"datasource": {
"name": "",
"type": "mysql",
"datasourceId": 0,
"db": "",
"table": \[
"table"
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
},
"alterTableStrategy": {
"lastTable": \[\],
"columnAlterStrategy": 1
},
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
以上内容对您是否有帮助?