此手册用于记录大数据开发与管理平台中,数据传输产品所有对外开放的OpenAPI,阅读此手册,你将了解数据传输产品开放的OpenAPI能力与调用方法。

一、 数据传输公共参数

1.1 响应格式

名称 类型 描述
code Number 响应码
reqId String 请求ID
cost Number 耗时,单位:ms
msg String 响应消息
result Object 响应结果

1.2 响应码

数据传输以4位中的首位区分功能模块,第二位区分业务子模块,后面两位递增,如不足则顺延。

1.2.1 1000~1999 数据源校验异常码
响应码 说明
1001 数据源被引用
1002 数据源URL非法平台模块查询异常
1003 数据源名称已存在
1004 数据源已存在(url,user)
1005 数据源不存在
1006 数据源不支持读
1007 数据源不支持写
1101 表不存在
1102 同义词不是表
1103 同义词没有读权限
1200 SQL解析异常
1201 hql不能包含db
1202 hql语法错误
1203 sql的表名解析异常
1.2.2 2000~2999 离线任务异常码
响应码 说明
2001 任务已被引用
2002 任务名称已存在
2003 任务不存在
2004 任务已提交
2005 任务不存在实例, 请先运行任务
2006 任务Reader写库异常
2007 任务Writer写库异常
2201 校验脱敏规则是否存在
1.2.3 3000~3999 实时任务异常码
1.2.4 4000~5999 其他子模块业务异常码
响应码 说明
4000 统一错误跳转web页面,任务权限校验失败code
4001 统一错误浮窗提示
1.2.5 6000~7999 预留业务异常码
1.2.6 8000~9999 调用其他子产品异常码
响应码 说明
8001 单点认证失败
8002 ACC登录异常
8100 元数据中心异常
8101 任务血缘发送异常
8202 Azkaban请求错误
8203 Azkaban响应错误
8204 AZKABAN连接错误
8400 离线开发平台异常
8401 离线开发平台的任务名称已存在
8402 离线任务来源表名称重复
8403 离线开发任务与数据同步节点同名,造成环路异常
8600 安全中心异常
8700 sloth异常
8701 sloth校验语法异常

1.3 枚举列表

1.3.1 DataSourceTypeEnum

数据源类型枚举

枚举值 类型 枚举code 名称
mysql String 1 mysql
hive String 2 Hive
ddb String 3 网易自研数据库DDB
ddbqs String 4 网易自研数据库DDB的qs版本
oracle String 5 Oracle
hbase String 6 Hbase
kudu String 7 Kudu
kafka String 8 Kafka
rocketmq String 9 RocketMq
mongodb String 10 MongoDB
es String 11 elasticsearch
api String 12 Http api数据源
hdfs String 13 HDFS
greenplum String 14 Greenplum
db2 String 15 DB2
hana String 16 Hana
dm String 17 达梦数据库
sqlserver String 18 Sqlserver
postgresql String 19 PostgreSQL
ftp String 20 Ftp
clickhouse String 21 ClickHouse
tidb String 22 TiDB
doris String 23 Doris
vertica String 24 Vertica
cassandra String 25 Cassandra
phoenix String 26 phoenix
localfile String 27 本地文件数据源
maxcompute String 28 阿里maxcompute
redis String 29 Redis
1.3.2 HiveType
枚举 类型 描述
mammut String 中台内部hive,枚举为空也为此值
hiveExt String 外部hive数据源
1.3.3 TableNameType
枚举 类型 描述
normal String 库表选择
regular String 正则匹配
synonym String 同义词
1.3.4 DatasourceMode
枚举 类型 描述
logical String 逻辑数据源
physical String 物理数据源

1.4 参数实体

1.4.1 Reader
参数 参数类型 描述 约束与补充
conf Map<String, String> 读取高级配置
setting
Map<String,Object>
读取功能参数 由于数据源太多,且特性各异,见第三章功能参数说明
datasources List<Datasource>
数据源信息 不支持多库表的数据源,size=1
Mysql,Oracle,Sqlserver支持多库表
1.4.2 Writer
参数 参数类型 描述 约束与补充
conf List<Map<String, String>> 写入高级配置
setting Map<string,Object> 写入功能参数 由于数据源太多,且特性各异,见第三章读写功能参数说明
datasource Datasource 数据源信息
具体详见数据源库表参数说明
1.4.3 Datasource
参数 参数类型 描述 约束与补充
datasourceId Long 数据源ID 元数据中心登记参数
db String 数据库名称
table
List<String>
表名称 不支持多库表的数据源,size=1,Mysql,Oracle,Sqlserver支持多库表
去向数据源时,size=1
tableNameType TableNameType 表名称规则类型 见TableNameType枚举
hiveType HiveType hive的类型 见HiveType枚举
只有hive数据源支持
datasourceMode DatasourceMode 枚举,逻辑数据源,物理数据源
1.4.4 Handler
参数 参数类型 描述 约束与补充
map List<ColumnMap> 列映射列表
1.4.5 ColumnMap

这里包含了很多参数,有些参数是特定场景下使用,具体见[4.2 ColumnMap的构造]

参数 参数类型 描述 约束与补充
columnSelectValue String 只用于Hbase中 见Hbase实践说明
oldName String 来源表字段名称
oldSourceType String 来源表字段类型 值="$CUSTOM_EXPR$"时,oldName为自定义表示
非结构化数据源,Hbase,HDFS,FTP,使用此类型
oldType
String
来源表字段类型 用于实时任务
comment
String 来源字段注释
oldExpression String 来源表字段自定义表达式
newColumnSelectValue String 只用于Hbase中 见Hbase实践说明
newName String 去向表字段名称
newSourceType String 去向表字段类型 只用于web展示
newType String 去向表字段类型 实时任务
newComment String 去向字段注释

1.5 响应实体

二、OpenAPI列表

2.1 OpenAPI总览

模块 名称 支持版本 状态 请求方法 请求路径
离线任务管理 任务创建 v2.7.0 已发布 POST /task/v2/add
实时任务管理 任务启动 v3.10.0 POST /stream/v2/job/exec
实时任务管理 任务停止 v3.10.0 POST /stream/v2/job/stop

2.2 离线任务管理OpenAPI

2.2.1 任务创建

POST /api/openapi/task/v2/add

产品版本:v2.5.0

描述:创建任务

入参

参数 参数类型 描述 约束与补充
product String 项目名称
clusterId String 集群Id
user String 用户负责人邮箱
name
String 任务名称 256字符长度
mysql可识别字符
taskId
String 任务Id
非必须参数,不填由server生成
<64字符
整型字符串
description String 任务描述 1024字符
conf Map<String,String> 高级配置 任务级别的高级配置
readerType DataSourceTypeEnum 读取数据源
reader Reader 读取实体
writerType DataSourceTypeEnum 写入数据源
writer Writer 写入实体
paramSetIds List<Long>
参数组 离线开发平台登记参数组,传输使用参数组ID
handlers List<Handler> 列处理器 openAPI 只支持列映射处理器
size=1
见Handler说明
columns
List<String>
同名列映射处理
server端根据List<String> 和去向端的字段,自动同名映射生成handlers
与handlers 只可存其一,以columns 优先
参见最佳实践4.1的说明
setting Map 任务功能参数 见3.1节功能参数说明

出参: String 任务ID

示例 Mysql2Mysql任务:

入参示例:


{

    "email": "tangjiafu@corp.netease.com",

    "product": "data\_transform",

    "clusterId": "dev4",

    "conf": {},

    "name": "参数校验mysql2mysql\_1",

    "description": "",

    "readerType": "mysql",

    "reader": {

        "conf": {},

        "datasources": \[

            {

                "datasourceId": 2727,

                "db": "ndi",

                "tableNameType": "normal",

                "table": \[

                    "user"

                \]

            }

        \],

        "setting": {

            "split": "id",

            "ifCondition": \[\],

            "forbiddenSplit": false,

            "enableStrReplace": true,

            "replacedStr": "ss",

            "conditions": "1=1",

            "transformType": "common",

            "originalStrs": "\\\\n,\\\\r,\\\\01",

            "partitionNum": 100

        }

    },

    "writerType": "mysql",

    "writer": {

        "datasources": {

            "datasourceId": 2727,

            "db": "ndi",

            "tableNameType": "normal",

            "table": \[

                "user"

            \]

        },

        "conf": {},

        "setting": {

            "postSQL": \[\],

            "datasourceId": 2727,

            "insertType": "into",

            "preSQL": \[

                "TRUNCATE ndi.user;"

            \]

        }

    },

    "handlers": \[

        {

            "add": \[\],

            "type": "columnHandler",

            "map": \[

                {

                    "newType": "bigint",

                    "newComment": "主键",

                    "oldType": "bigint",

                    "newSourceType": "BIGINT(19, 0)",

                    "newName": "id",

                    "oldSourceType": "BIGINT(19, 0)",

                    "oldName": "id",

                    "comment": "主键"

                },

                {

                    "newType": "string",

                    "newComment": "姓名",

                    "oldType": "string",

                    "newSourceType": "VARCHAR(255)",

                    "newName": "name",

                    "oldSourceType": "VARCHAR(255)",

                    "oldName": "name",

                    "comment": "姓名"

                },

                {

                    "newType": "string",

                    "newComment": "手机号码",

                    "oldType": "string",

                    "newSourceType": "VARCHAR(16)",

                    "newName": "phone",

                    "oldSourceType": "VARCHAR(16)",

                    "oldName": "phone",

                    "comment": "手机号码"

                },

                {

                    "newType": "timestamp",

                    "newComment": "时间1",

                    "oldType": "timestamp",

                    "newSourceType": "TIMESTAMP(0)",

                    "newName": "time\_1",

                    "oldSourceType": "TIMESTAMP(0)",

                    "oldName": "time\_1",

                    "comment": "时间1"

                }

            \]

        }

    \]

}

出参:

参数 参数类型 描述
code Number 响应码
reqId String 请求ID
cost Number 耗时,单位:ms
msg String 响应消息
result String 任务Id

1649397381105018为成功创建任务的ID


{

    "code": 200,

    "message": "success",

    "result": "1649397381105018",

    "reqId": "0773215919a348d48c435df103e606fe",

    "cost": 550

}

2.3 实时任务管理

2.3.1 启动任务

POST /api/openapi/stream/v2/job/exec

产品版本:v3.10.0

描述:启动任务

入参

参数 参数类型 描述 约束与补充
user User 用户信息
request JobRunConfigReqDTO 请求信息

User

参数 参数类型 描述 约束与补充
product String 项目名称
clusterId String 集群id
user String 用户负责人邮箱

JobRunConfigReqDTO

参数 参数类型 描述 约束与补充
taskId String 任务id
runConfig RunConfig 运行参数
checkPointConfig CheckPointConfig cp参数
restartStrategy RestartStrategy 重启策略
advancedConfig List<Pair> 高级参数
customConfig List<Pair> 自定义参数

RunConfig

参数 参数类型 描述 约束与补充
yarnQueue Integer 实例id
slots Integer slot数量
tmMemory Integer TM内存,到位MB
jmMemory Integer JM内存,单位MB
parallelism Integer 并行度

CheckPointConfig

参数 参数类型 描述 约束与补充
cpInterval Integer cp间隔时间,单位:秒
cpMode String 模式,可选值:EXACTLY_ONCE,AT_LEAST_ONCE
cpTimeout Integer 超时时间,单位:秒
cpSwitch Boolean 是否开启checkpoint
checkPointPath String cp或sp启动时,状态文件的路径
checkPointType String 启动位点类型,可选值:cp, sp, none, custom cp: 从cp启动;
sp:从sp启动
none:直接启动
custom:自定义位点启动

RestartStrategy

参数 参数类型 描述 约束与补充
strategy String 重启策略,可选值:failure-rate, fixed-delay failure-rate: 失败率重启,
fixed-delay: 固定间隔重启
restartFailedCount Integer 失败重启次数
restartTimeInterval Integer 重试时间间隔, 单位:秒

Pair

参数 参数类型 描述 约束与补充
key String 属性的键
value String 属性的值

出参:

参数 参数类型 描述
code Number 响应码
reqId String 请求ID
cost Number 耗时,单位:ms
msg String 响应消息
result RunJobRespDTO 停止作业响应

JobRunRespDTO

参数 参数类型 描述 约束与补充
execId Integer 实例id
2.3.2 停止任务

POST /api/openapi/stream/v2/job/stop

产品版本:v3.10.0

描述:启动任务

入参

参数 参数类型 描述 约束与补充
user User 用户信息
request StopJobReqDTO 请求信息

User

参数 参数类型 描述 约束与补充
product String 项目名称
clusterId String 集群Id
user String 用户负责人邮箱

StopJobReqDTO

参数 参数类型 描述 约束与补充
taskId String 任务Id
savepoint Boolean 是否保存savepoint

出参:

参数 参数类型 描述
code Number 响应码
reqId String 请求ID
cost Number 耗时,单位:ms
msg String 响应消息
result StopJobRespDTO 停止作业响应

StopJobRespDTO

参数 参数类型 描述 约束与补充
stopSuccess Boolean 是否停止成功

三、数据源参数说明

当前的setting根据功能各有差异,有些功能需要多个参数组合使用,主要分为特殊字符替换替换,并发读取,流量控制等;

3.1节列举了一些任务级别的功能参数。

3.2节列举Reader和Writer setting里的一些常规参数字典,另外有一些专用于固定一个数据源的,具体查看数据源。

tips:

非必选参数应该为null或者不填,比如没有分区,比不是"partition":[]

读写支持

数据源
Mysql Y Y
SQLserver Y Y
Oracle Y Y
PostgreSQL Y Y
TiDB Y Y
Hive Y Y
Greenplum Y Y
Doris Y Y
Hbase Y Y
ElasticSearch Y Y
API Y
ClickHouse Y Y
FTP Y Y
HDFS Y Y
Kafka Y
Kudu Y Y
Mongo Y Y
StarRocks Y Y

3.1 Task功能参数说明

task功能参数指针对该任务生效的参数组合,大多数任务全面支持。

3.1.1 流量控制

{

  // 省略了其他任务参数

  "setting": {

    "flowControl": {

      "enable": false,

      "type": "",

      "value": 0

    }

  }

}

上述参数,在task-json中的setting里面增加flowControl键表示流量控制,指为json,参数说明如下:

参数 是否必须 数据类型 默认值 描述
enable 布尔 false 是否开启流量控制,开启时,type,value必须
type 字符串 null 流量控制类型枚举:
size:流量控制按照字节数控制,单位MB
count: 流量控制按照行数控制
value 整数 null 流量控制限速值
3.1.2 脏数据管理参数

脏数据管理是为了解决数据一旦出现错误,任务即会中断的场景,从而使得容许一定数据量的脏数据,并把脏数据存储到另外一张表中


{

  "setting": {

    "saveMessyData": false,

    "saveConfig": {

      "datasource": {

        "name": "",

        "type": "mysql",

        "datasourceId": 0,

        "db": "",

        "table": \[

        \]

      }

    },

    "tolerateMessyData": false,

    "maxRejectedCount": 0,

    "failedThreshold": 0.00

  }

}
参数 是否必须 数据类型 默认值 描述
saveMessyData 布尔 false 脏数据保存开关
tolerateMessyData 字符串 true 脏数据容忍开关
maxRejectedCount 整数 0 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。
failedThreshold 浮点数 0.0 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。
saveConfig
Object null 当开启脏数据容忍是,必须参数
脏数据保存配置参数配置
datasource Object null 脏数据数据源配置
saveMessyData !=null时为必须参数
name 字符串 null 数据源名称
type 字符串 null 数据源类型
datasourceId 整型 null 数据源id
db 字符串 null 数据源数据库名称
table 字符串列表 null 数据源表名,
size=1,多个表只取第一个
3.1.3 来源表结构变化策略

{

  "setting": {

    "alterTableStrategy": {

      "lastTable": \[

      \],

      "columnAlterStrategy": 1

    }

  }

}

alterTableStrategy参数如下

参数 是否必须 数据类型 默认值 描述
lastTable 字符串列表 空列表 上一次修改任务(创建任务)来源表的表结构列名列表
columnAlterStrategy 整型枚举 1 表结构策略
1:忽略表结构变更策略
2:变更表结构并变更映射关系
3:抛异常
3.1.4 脱敏支持

{

  "setting": {

     "maskScanSetting":{

            "scanResult":\[

                {

                    "maskName":"邮编遮盖脱敏",

                    "column":"name"

                },

                {

                    "maskName":"银行卡遮盖脱敏",

                    "column":"age"

                }

            \]

        }

  }

}

maskScanSetting参数

参数 是否必须 数据类型 默认值 描述
scanResult 必须 List<ScanResultDTO> 手动指定脱敏列名及其脱敏规则

ScanResultDTO参数如下

参数 是否必须 数据类型 默认值 描述
maskName 必须 String 脱敏规则名称
column 必须 String 脱敏列名

3.2 读写功能参数说明

读写功能参数指针对读取或者写入时的功能参数设置

3.1.1 一般参数字典

一般参数字典指大多数数据源包含这些参数,在此说明,供开发者查阅

参数 参数类型 描述 约束与补充
customSql String SQL模式任务的sql 仅mysql,sqlserver,oracle,hana支持
conditions string
数据过滤
where之后的sql
和流水型任务参数互斥
transformType String 传输类型 见枚举
insertType String 写入规则类型 根据数据源而定有不同枚举
postSQL List<String> 写入开始前SQL 至多5条sql
preSQL List<String> 写入结束后SQL 至多5条sql
3.1.2 特殊字符替换
参数 参数类型 描述 约束与补充
enableStrReplace boolean 开启字符串替换
originalStrs String 原始字符串
replacedStr String 替换字符串
3.1.3 并发读取
参数 参数类型 描述 约束与补充
forbiddenSplit boolean 是否禁用并发读取 与是否开启含义相反
split String 切分键(字段) 要求为主键
类型为 Integer 或TimeStamp
partitionNum String 并发度
3.1.4 流水型任务

流水型任务与非组合参数中数据过滤参数(conditions)互斥

参数 参数类型 描述 约束与补充
initialValue String 流水型起始值
column String 流水型字段名称
columnSourceType String 流水型字段类型 用于web页面展示
用户sqlserver的特殊处理,值为timestamp时,使用转换函数
columnDataType Integer 流水型字段类型,任务运行时使用
参考jdbc的枚举code值
java.sql.Types
支持整型和timestamp
spark类型的枚举值

3.3 具体数据源特性参数

3.2.1 Mysql

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "ifCondition": {

      "relatedArray": {},

      "componentType": {}

    },

    "conditions": "",

    "customSql": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "split": "",

    "partitionNum": "",

    "forbiddenSplit": true,

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": "",

    "transformType": "",

    "ignoreConditions": true

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数)
enableStrReplace
originalStrs
replacedStr
查看前文参数字典
并发读取(3个参数)
forbiddenSplit
split
partitionNum
查看前文参数字典
流水型任务(3个参数)
initialValue
column
columnSourceType
columnDataType
查看前文参数字典
customSql String sql模式的sql
conditions String where 条件
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

    "postSQL": \[

      ""

    \],

    "preSQL": \[

      ""

    \]

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
insertType 枚举 写入规则类型(对于mysql语法)
into插入
ignore忽略主键冲突
overwrite覆写
insertonduplicatekeyupdate 插入时主键冲突更新
postSQL List<String> 写入后执行sql
preSQL List<String> 写入结束后SQL

任务Mysql2Mysql


{

  "product": "data\_transform",

  "clusterId": "dev4",

  "user": "tangjiafu@corp.netease.com",

  "conf": {

    "1": "2"

  },

  "name": "参数校验mysql2mysql",

  "description": "",

  "readerType": "mysql",

  "reader": {

    "conf": {

      "key1": "value1",

      "key2": "value2"

    },

    "datasources": \[

      {

        "datasourceId": 2727,

        "db": "ndi",

        "tableNameType": "normal",

        "table": \[

          "user"

        \]

      }

    \],

    "setting": {

      "initialValue": 0,

      "split": "id",

      "ifCondition": \[\],

      "forbiddenSplit": false,

      "column": "",

      "enableStrReplace": true,

      "replacedStr": "ss",

      "conditions": "1=1",

      "transformType": "common",

      "originalStrs": "\\\\n,\\\\r,\\\\01",

      "partitionNum": 100

    }

  },

  "writerType": "mysql",

  "writer": {

    "conf": {},

    "datasource": {

      "datasourceId": 2727,

      "db": "ndi",

      "tableNameType": "normal",

      "table": \[

        "user"

      \]

    },

    "setting": {

      "insertType": "into",

      "preSQL": \[

        "TRUNCATE ndi.user;"

      \],

      "postSQL": \[\]

    }

  },

  "handlers": \[

    {

      "add": \[\],

      "type": "columnHandler",

      "map": \[

        {

          "newComment": "主键",

          "newSourceType": "BIGINT(19, 0)",

          "newName": "id",

          "oldSourceType": "BIGINT(19, 0)",

          "oldName": "id",

          "comment": "主键"

        },

        {

          "newComment": "姓名",

          "newSourceType": "VARCHAR(255)",

          "newName": "name",

          "oldSourceType": "VARCHAR(255)",

          "oldName": "name",

          "comment": "姓名"

        }

      \]

    }

  \]

}
3.2.2 SQLserver

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "ifCondition": {

      "relatedArray": {},

      "componentType": {}

    },

    "conditions": "",

    "customSql": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "split": "",

    "partitionNum": "",

    "forbiddenSplit": true,

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": "",

    "transformType": "",

    "ignoreConditions": true

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
并发读取(3个参数) 查看前文参数字典
流水型任务(3个参数) 查看前文参数字典
customSql String sql模式的sql
conditions String where 条件
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

     "postSQL": \[""\],

     "preSQL": \[""\]

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
insertType 枚举 写入规则类型
into插入
postSQL List<String> 写入后执行sql
preSQL List<String> 写入结束后SQL
3.2.3 Oracle

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "ifCondition": {

      "relatedArray": {},

      "componentType": {}

    },

    "conditions": "",

    "customSql": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "split": "",

    "partitionNum": "",

    "forbiddenSplit": true,

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": "",

    "transformType": "",

    "ignoreConditions": true

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
并发读取(3个参数) 查看前文参数字典
流水型任务(3个参数) 查看前文参数字典
customSql String sql模式的sql
conditions String where 条件
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

 "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

     "postSQL": \[""\],

    "preSQL": \[""\],

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
insertType 枚举 写入规则类型
into
postSQL List<String> 写入后执行sql
preSQL List<String> 写入结束后SQL

3.2.4 PostgreSQL

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "ifCondition": {

      "relatedArray": {},

      "componentType": {}

    },

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "split": "",

    "partitionNum": "",

    "forbiddenSplit": true,

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": "",

    "transformType": "",

    "ignoreConditions": true

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
并发读取(3个参数) 查看前文参数字典
流水型任务(3个参数) 查看前文参数字典
customSql String sql模式的sql
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

    "postSQL": \[""\],

    "preSQL": \[""\]

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
insertType 枚举 写入规则类型
into插入
postSQL List<String> 写入后执行sql
preSQL List<String> 写入结束后SQL
3.2.5 TiDB

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[""\],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

   "conf": {},

  "setting": {

    "ifCondition": {

      "relatedArray": {},

      "componentType": {}

    },

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "split": "",

    "partitionNum": "",

    "forbiddenSplit": true,

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": "",

    "transformType": "",

    "ignoreConditions": true

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
并发读取(3个参数) 查看前文参数字典
流水型任务(3个参数) 查看前文参数字典
customSql String sql模式的sql
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

   "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

     "postSQL": \[""\],

     "preSQL": \[""\],

  }

}

参数说明

参数类型 是否必须 参数类型 语义与约束
insertType 枚举 写入规则类型
into
upsert主键冲突是覆写
ignore主键冲突时忽略
postSQL List<String> 写入后执行sql
preSQL List<String> 写入结束后SQL
3.2.6 Hive

Reader Template


{

  "datasources": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "",

    "hiveType": ""

  },

   "conf": {},

  "setting": {

    "conditions": "",

    "readMode": "",

    "partitions": \[

      {

        "value": ""

      }

    \],

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}
参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
readMode 枚举 hive 读取方式
copy:hadoop distcp文件拷贝
impala:impala读取
jdbc: hiveserver2读取
spark: spark读取
partitions 对象 分区信息
readMode=copy时支持
conditions String 过滤条件

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "",

    "hiveType": ""

  },

  "setting": {

    "insertType": "",

    "partitionList": \[

      {

          "key": "",

          "partitionType": "",

          "value": "",

          "valueType": ""

     }

    \]

  }

}
参数类型 是否必须 参数类型 语义与约束
insertType 枚举 overwrite 先删除表中数据,再重新写入
into 在表中追加数据
partitions Partition 分区信息
readMode=copy时支持
conditions String 过滤条件

Partition

参数类型 是否必须 参数类型 语义与约束
key String 分区key
partitionType 枚举 static 静态分区
dynamic动态分区
value 分区值表达式 动态分区时,为列值
静态分区为常量值,支持azkaban表达式
3.2.7 Greenplum

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[""\],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

          "conditions": "",

          "partitionColumn": "",

          "enableStrReplace": true,

          "originalStrs": "",

          "replacedStr": ""

    }

}
参数 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
partitionColumn String greenplum-spark-connector用于分区字段参数
表中整型字段
conditions String 过滤条件

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "",

    "hiveType": ""

  },

  "setting": {

    "insertType": "",

    "postSQL": \[

      ""

    \],

    "preSQL": \[

      ""

    \]

  }

}
参数类型 是否必须 参数类型 语义与约束
insertType 枚举 truncate清空表数据写入
append
overwrite
postSQL 见前文参数字典
preSQL 见前文参数字典

3.2.8 Doris

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数名 是否必须 参数类型 语义与约束
特殊字符替换
(三个参数)
见参数字典描述
enableStrReplace
originalStrs
replacedStr
conditions String 过滤条件(where,分区参数)

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": "",

    "tableNameType": "",

    "datasourceMode": ""

  },

  "setting": {

    "insertType": "",

    "postSQL": \[""\],

    "preSQL": \[""\],

    "maxFilterRatio": "",

    "writeMode": ""

  }

}

参数描述

参数 是否必须 参数类型 语义与约束
postSQL 见参数字典
preSQL 见参数字典
insertType 枚举
writeMode 枚举 写入方式
brokerload
streamload
loadInterval 整型 导入间隔
writeMode=streamload时,为必填参数
maxFilterRatio 浮点数字符串 最大容忍比例
writeMode=brokerload时,为必填参数
3.2.9 Hbase

Reader Template


{

  "datasources": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": ""

  },

   "conf": {},

  "setting": {

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "startRowKey": "",

    "endRowKey": ""

  }

}
参数名 是否必须 参数类型 语义与约束
特殊字符替换
(三个参数)
见参数字典描述
enableStrReplace
originalStrs
replacedStr
conditions String 过滤条件(where,分区参数)
startRowKey String hbase 起始rowKey
endRowKey String hbase 终止rowKey

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": ""

  },

  "setting": {

    "writeType": "",

    "dfsNameNodeRpcAddress": "",

    "dfsNameServices": ""

  }

}

参数描述

参数 是否必须 参数类型 语义与约束
writeType 枚举 写入方式
bulkLoad
put
dfsNameNodeRpcAddress String HFile所在HDFS的schema
writeType=bulkLoad启用
dfsNameServices String nameNode rpc地址,多个以逗号分隔
writeType=bulkLoad启用
3.2.10 ElasticSearch

Reader Template


{

  "datasources": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": ""

  },

  "conf": {},

  "setting": {

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数名 是否必须 参数类型 语义与约束
特殊字符替换(三个参数) 见参数字典描述
conditions String 过滤条件(where,分区参数)

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": ""

  },

  "setting": {

    "insertType": "",

    "batchSize": 0,

    "mappingId": "",

    "indexType" :""

  }

}

参数描述

参数 是否必须 参数类型 语义和约束
batchSize Long batchSize
es批量写入数据量
mappingId String es 主键字段
insertType 枚举 index (default)
根据ES主键字段新增数据,如果已经存在就替换并重建索引
create
根据ES主键字段添加新的数据,如果数据已经存在就报错
update
根据ES主键字段更新数据,如果数据不存在就报错
upsert
根据ES主键字段数据存在就更新,不存在就新增
indexType 枚举 索引类型:
dynamic静态索引
static动态索引
3.2.11 API

Reader Template


"reader": {

  "conf": {},

  "datasources": \[

    {

      "datasourceId": 3220,

      "db": "default",

      "table": \[

        "/hello"

      \]

    }

  \],

  "setting": {

    "headers": \[

      {

        "value": "header",

        "key": "header"

      }

    \],

    "method": "post",

    "enableStrReplace": false,

    "paging": {

      "total": "id",

      "type": "offset"

    },

    "body": "{\\n\\"body\\":\\"body\\"\\n}",

    "transformType": "common",

    "originalStrs": "",

    "enablePaging": "true",

    "replacedStr": ""

  }

}

参数描述

特殊字符替换参数:enableStrReplace、originalStrs、replacedStr

参数名 是否必须 参数类型 语义与约束
body N String post请求体
params N String get请求体,拼到url中
headers N String http header
method Y 枚举 HTTP方法,枚举
get、post
enablePaging Y bool 是否启用分页参数
paging N 对象 分页参数
total N String 分页字段名称
具体使用jsonPath从返回Response中获取该值
type N 枚举 分页类型,枚举
page、offset
page启用两个参数进行分页:pageNumpageSize
offset启用两个参数进行分页:offsetlimit
3.2.12 ClickHouse

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

"conf": {},

  "setting": {

    "flowControl": "",

    "transformType": "common",

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "forbiddenSplit": true,

    "split": "",

    "partitionNum": "",

    "initialValue": "",

    "column": "",

    "columnSourceType": "",

    "columnDataType": ""

  }

}

参数描述

参数类型 是否必须 参数类型 语义与约束
特殊字符替换(3个参数) 查看前文参数字典
并发读取(3个参数) 查看前文参数字典
流水型任务(3个参数) 查看前文参数字典
customSql String sql模式的sql
conditions String where 条件
transformType String 任务类型,枚举
common:一般任务
line:流水型任务

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \],

    "version": ""

  },

  "setting": {

    "insertType": "",

    "postSQL": \[

      ""

    \],

    "preSQL": \[

      ""

    \]

  }

}

参数描述

参数名 是否必须 语义与约束
insertType 写入模式,枚举
into
postSQL 写入前sql
preSQL 写入后sql
3.2.13 FTP

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": ""

    }

  \],

"conf": {},

  "setting": {

    "fileFilter": true,

    "filteredPath": {

      "type": "",

      "paths": \[

        {

          "fileName": "",

          "fullPath": "",

          "isFile": true,

          "size": 0,

          "timestamp": "",

          "fileSize": 0,

          "fileId": 0

        }

      \],

      "conditions": \[

        {

          "type": "",

          "value": "",

          "operator": ""

        }

      \]

    },

    "docSplit": true,

    "format": "",

    "fieldSeparator": "",

    "skipLines": "",

    "sheetName": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": "",

    "enableRowCheck": true,

    "checkFilePath": "",

    "checkContentSeparator": "",

    "checkContentRowPosition": ""

  }

}

参数描述

参数名 是否必须 语义与约束
fileFilter Y 文件筛选,true:全部文件,false:部分文件
filteredPath N 文件过滤条件,fileFilter=false时必须
docSplit true:结构化文件传输
false:非结构化文件传输
format N 文件格式
fieldSeparator N 列分隔符
skipLines N 跳过行数
sheetName N 传输Excel时sheet名称

字符串替换功能参数

enableStrReplace N 是否开启字符替换
originalStrs N 原始str
replacedStr N 替换str

行数校验功能参数

enableRowCheck N 是否开启行数校验
checkFilePath N 校验文件路径
checkContentSeparator N 校验文件分隔符
checkContentRowPosition N 校验数据的行数位置

Writer Template


{

"conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      ""

    \]

  },

  "setting": {

    "format": "",

    "insertType": "",

    "fileNameType": "",

    "fileName": "",

    "preAction": \[

      {

          "actionType": "",

          "actionPath": "",

          "actionNewPath": ""

      }

    \],

    "postAction": \[

      {

          "actionType": "",

          "actionPath": "",

          "actionNewPath": ""

      }

    \],

    "writeFlagFile": true

  }

}

参数描述

参数名 是否必须 语义与约束
format 文件格式,枚举
docSplit true:结构化文件传输
false:非结构化文件传输
insertType 同名文件处理规则
into
覆盖 overwrite
fileNameType 目标文件名类型
自定义custom
系统生成system
preAction 前置处理,不处理actionType=none
postAction 后置处理,不处理actionType=none

前置处理、后置处理

参数名 是否必须 语义与约束
actionType 枚举
不处理、删除、重命名
none、rm、rename
actionPath 操作文件路径
actionNewPath 重名名后的文件全路径
3.2.14 HDFS

Reader Template


{

  "datasources": \[

    {

      "table": \[

        ""

      \],

    }

  \],

"conf": {},

  "setting": {

    "conditions": "",

    "docSplit": "",

    "fieldSeparator": "",

    "format": "",

    "skipLines": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数名 是否必须 语义与约束
fileFilter Y 文件筛选,true:全部文件,false:部分文件
filteredPath N 文件过滤条件,fileFilter=false时必须
docSplit true:结构化文件传输
false:非结构化文件传输
format N 文件格式
fieldSeparator N 列分隔符
skipLines N 跳过行数
sheetName N 传输Excel时sheet名称

Writer Template


{

"conf": {},

  "datasource": {

    "table": "",

  },

  "setting": {

    "compressionCodec": "",

    "fileName": "",

    "fileNameType": "",

    "format": "",

    "insertType": "",

    "writeFlagFile": true

  }

}

参数描述

参数名 是否必须 语义与约束
format 文件格式,枚举
docSplit true:结构化文件传输
false:非结构化文件传输
insertType 同名文件处理规则
into
覆盖 overwrite
fileNameType 目标文件名类型
自定义custom
系统生成system
preAction 前置处理,不处理actionType=none
postAction 后置处理,不处理actionType=none

前置处理、后置处理

参数名 是否必须 语义与约束
actionType 枚举
不处理、删除、重命名
none、rm、rename
actionPath 操作文件路径
actionNewPath 重名名后的文件全路径
3.2.15 Kafka

Writer Template


{

"conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "",

  },

  "setting": {

    "key": "",

    "producerConf": \[

      {

        "key": "",

        "value": ""

      }

    \]

  }

}

参数描述

参数名 是否必须 语义与约束
key 写入kafka时,数据形式如下
{"key":key,"value":json}
producerConf kafka的连接配置项
3.2.16 Kudu

Reader Template


{

  "datasources": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "",

  },

"conf": {},

  "setting": {

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数名 是否必须 语义与约束
conditions 过滤条件(where,分区参数)

字符串替换功能参数

enableStrReplace N 是否开启字符替换
originalStrs N 原始str
replacedStr N 替换str

Writer Template


{

"conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": "default",

  },

  "setting": {

    "insertType": ""

  }

}

参数描述

参数名 是否必须 语义与约束
insertType 写入方式,枚举:
into插入
overwrite覆写
3.2.17 Mongo

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": "",

      "version": "",

    }

  \],

  "conf": {},

  "setting": {

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数描述

参数名 是否必须 语义与约束
conditions 过滤条件(where,分区参数)

字符串替换功能参数

enableStrReplace N 是否开启字符替换
originalStrs N 原始str
replacedStr N 替换str

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": "",

    "version": ""

  },

  "setting": {

    "insertType": ""

  }

}

参数描述

参数名 是否必须 语义与约束
insertType 写入方式,枚举:
append追加
overwrite覆写
3.2.18 StartRocks

Reader Template


{

  "datasources": \[

    {

      "datasourceId": 0,

      "db": "",

      "table": \[

        ""

      \],

      "version": "",

      "tableNameType": "",

      "datasourceMode": ""

    }

  \],

  "conf": {},

  "setting": {

    "conditions": "",

    "enableStrReplace": true,

    "originalStrs": "",

    "replacedStr": ""

  }

}

参数描述

参数名 是否必须 语义与约束
conditions Y 过滤条件(where,分区参数)

字符串替换功能参数

enableStrReplace Y 是否开启字符替换
originalStrs N 原始str
replacedStr N 替换str

Writer Template


{

  "conf": {},

  "datasource": {

    "datasourceId": 0,

    "db": "",

    "table": \[

      "demo"

    \],

    "version": "",

    "tableNameType": "",

  },

  "setting": {

    "loadInterval": 0,

    "maxFilterRatio": "0.0",

    "insertType": "into",

    "brokerLoadBrokers":"",

    "postSQL": \[\],

    "preSQL": \[\]

    "writeMode": "streamload"

  }

}

参数描述

参数名 是否必须 类型 语义与约束
postSQL 见参数字典
preSQL 见参数字典
writeMode 枚举 写入方式
brokerload
streamload
loadInterval 整型 导入间隔
writeMode=streamload时,为必填参数
maxFilterRatio 浮点数字符串 最大容忍比例
writeMode=brokerload时,为必填参数
insertType 写入类型,枚举:
into
brokerLoadBrokers String broker名称列表,以逗号分割

四、最佳实践

4.1 创建Mysql2Mysql任务

阅读此章节,请了解[OpenCreateTask]相关参数

4.1.1 构造Task层级参数

如下所示,一个不含有reader,writer以及 handlers/columns的json,但是readerType,writerType已经确定,表明该任务是Mysql2Mysql的任务;


{

  "product": "data\_transform",

  "clusterId": "dev4",

  "user": "tangjiafu@corp.netease.com",

  "name": "参数校验mysql2mysql\_2",

  "description": "",

  "readerType": "mysql",

  "reader": {

    "conf": {

        // 读取高级配置项,键值对

    },

    "datasources": \[

    // 读取数据源列表,只有部分数据源支持多个来源,参考数据源参数说明

    \],

    "setting": {

    // 根据数据源而定,查阅Reader Template

    }

  },

  "writerType": "mysql",

  "writer": {

    "conf": {

     //     写入高级配置项

    },

    "datasource": {

    // 写入数据源参数

    },

    "setting": {

    // 写入配置项,根据数据源而定,查阅Writer Template

    }

  },

  "handlers": \[

    {

    }

  \],

  "setting": {

    "messyDataConfig": {

      "saveMessyData": false,

      // saveMessyData= false 表示,不需要

      "saveConfig": {

        "datasource": {

          "name": "",

          "type": "",

          "datasourceId": 0,

          "db": "",

          "table": \[

            ""

          \]

        }

      },

      "tolerateMessyData": false,

      "maxRejectedCount": 0,

      "failedThreshold": 0.00

    },

    "alterTableStrategy": {

      "lastTable": \[\],

      "columnAlterStrategy": 1

    },

    // 流量控制,查阅流量控制参数

    "flowControl": {

      "enable": false,

      "type": "",

      "value": 0

    }

  }

}
4.1.2 构造reader层级

4.1.3 构造reader的datasources

查询1.4.3DataSource,根据元数据中心所登记的数据源信息,构造如下JSON参数

根据1.4.3DataSource参数描述,Mysql是支持多库表的,如果有多个来源库表,可以填写多个table和多个DataSources


{

    "datasourceId": 2727,

    "db": "ndi",

    "tableNameType": "normal",

    "table": \[

        "user"

    \]

}
4.1.4 构造reader的setting

查询3.2.1 知道Mysql支持哪些特性参数,非常容易可以构造如下参数,

值得注意的是,替换字符串的参数,需要注意是否转义,数据传输代码由java进行编码,不进行转义和反转义操作,这里的json中的"\\n",接收后是\n符号;


{

    // 数据过滤

    "conditions": "1=1",

    // 传输类型为普通类型

    "transformType": "common",

    // 并发读取

    "split": "id",

    "forbiddenSplit": false,

    "partitionNum": 100,

     // 特殊字符替换

    "enableStrReplace": true,

    "replacedStr": "ss",

    "originalStrs": "\\\\n,\\\\r,\\\\01"

}
4.1.5 构造writer

同理,可以构造writer,到目前为止构造出来的Task如下


{

    "writer": {

        "datasources": {

            "datasourceId": 2727,

            "db": "ndi",

            "tableNameType": "normal",

            "table": \[

                "user"

            \]

        },

        "conf": {},

        "setting": {

            "postSQL": \[\],

            "datasourceId": 2727,

            "insertType": "into",

            "preSQL": \[

                "TRUNCATE ndi.user;"

            \]

        }

    }

}
4.1.6 构造handlers

由 1.4.1 可以知道 handers和columns为互斥参数,以columns为优先,[columns参数见4.3节]

handlers的构造,参照1.4.5和1.4.6去构造 columnHandler,columnHandler中的map表示列映射详情,具体参考1.4.6 ColumnMap进行映射:

Hbase由于其schema复杂性,进行了特殊处理,且暂时不支持columns进行列名映射,参见实践[ColumnMap],实践ColumnMap还提供了简化参数的方式。


{

  "map": \[

    {

      "newComment": "主键",

      "newSourceType": "BIGINT(19, 0)",

      "newName": "id",

      "oldSourceType": "BIGINT(19, 0)",

      "oldName": "id",

      "comment": "主键"

    },

    {

      "newComment": "姓名",

      "newSourceType": "VARCHAR(255)",

      "newName": "name",

      "oldSourceType": "VARCHAR(255)",

      "oldName": "name",

      "comment": "姓名"

    }

  \]

}

4.2 ColumnMap的构造

ColumnHandler包含列映射构造,其核心为ColumnMap的构造,对于结构化数据源

只需要两个字段:newName和oldName,即reader的列名和writer的列名

比如一个Mysql2Mysql的任务 columnMap最终构造如下


"map": \[

    {

        "newName": "id",

        "oldName": "id"

    },

    {

        "newName": "name",

        "oldName": "name"

    },

    {

        "newName": "age",

        "oldName": "age"

    },

    {

        "newName": "remark",

        "oldName": "remark"

    },

    {

        "newName": "createAt",

        "oldName": "createAt"

    }

\]

但对于非结构化数据源,则需要定义reader的类型,而去向则不需要,因为去向如果是非结构化,写入到文件中,都会转为String类型;

Hbase因为其特殊性,包含了另外两个参数

在 columnSelectiveValue不为空时,有如下逻辑


if (StringUtils.equalsIgnoreCase(columnSelectiveValue, "rowkey")) {

    oldName = "rowkey"

} else {

    if (StringUtils.isNotEmpty(oldName)) {

      oldName = oldName + ":" + mapColumnData.getOrElse("oldExpression", "")

  }

}

上述逻辑,文字概括:对于rowkey列,columnSelectiveValue=rowkey

对于非rowkey列,oldName为列簇,oldExpression为列名

mysql2Hbase的ColumnMap


 "map": [

        {

          "oldType": "INT",

          "columnSelectValue": "rowKey",

          "newName": "id",

          "oldName": "0",

        },

        {

          "oldType": "STRING",

          "columnSelectValue": "0",

          "newName": "name",

          "oldName": "0",

          "oldExpression": "name",

        }

      ]

4.3 字段同名映射 columns参数

handers和columns为互斥参数,以columns为优先,columns只能进行同名映射。

columns表示需要导入的列,具体业务:

数据传输获取Reader和Writer的schema,进行同名映射,如果写入数据源是一个nonSchema的数据源,则只通过Reader的schema进行映射,为目标端构造schema

nonSchema数据源:EleasticSearch,MongoDB,Kafka,RocketMQ,Hbase,FTP,HDFS


{

"id",

"name"

}

上述columns,如果来源和去向都含有id,name字段,则构建两个字段的映射。

4.4 关于高级配置

高级配置分为三类,任务级别配置,来源配置(读取),去向配置(写入)。

ndi.spark,

ndi. 则表示为任务级别参数

spark常用的几个配置项


// executor内存

ndi.spark.spark-argument.executor-memory

// dirver内存

ndi.spark.spark-argument.driver-memory  

// hive动态分区最大值

ndi.spark.spark-conf.spark.hadoop.hive.exec.max.dynamic.partitions

// 是否计算hive的统计信息写入到metastore

ndi.analyzeTableStatistics

spark-argument表示为spark-submit命令启动的参数,spark-conf则会添加上

--conf

4.5 任务创建实践

4.5.1 mysql2mysql

{

  "product": "data\_transform",

  "clusterId": "dev4",

  "user": "tangjiafu@corp.netease.com",

  "name": "参数校验mysql2mysql\_2",

  "description": "",

  "readerType": "mysql",

  "reader": {

    "conf": {

      "key1": "value1",

      "key2": "value2"

    },

    "datasources": \[

      {

        "datasourceId": 2727,

        "db": "ndi",

        "tableNameType": "normal",

        "table": \[

          "user"

        \]

      }

    \],

    "setting": {

      "initialValue": 0,

      "split": "id",

      "ifCondition": \[\],

      "forbiddenSplit": false,

      "column": "",

      "enableStrReplace": true,

      "replacedStr": "ss",

      "conditions": "1=1",

      "transformType": "common",

      "originalStrs": "\\\\n,\\\\r,\\\\01",

      "partitionNum": 100

    }

  },

  "writerType": "mysql",

  "writer": {

    "conf": {},

    "datasource": {

      "datasourceId": 2727,

      "db": "ndi",

      "tableNameType": "normal",

      "table": \[

        "user"

      \]

    },

    "setting": {

      "insertType": "into",

      "preSQL": \[

        "TRUNCATE ndi.user;"

      \]

    }

  },

  "handlers": \[

    {

      "add": \[\],

      "type": "columnHandler",

      "map": \[

        {

          "newComment": "主键",

          "newSourceType": "BIGINT(19, 0)",

          "newName": "id",

          "oldSourceType": "BIGINT(19, 0)",

          "oldName": "id",

          "comment": "主键"

        },

        {

          "newComment": "姓名",

          "newSourceType": "VARCHAR(255)",

          "newName": "name",

          "oldSourceType": "VARCHAR(255)",

          "oldName": "name",

          "comment": "姓名"

        }

      \]

    }

  \],

  "setting": {

    "messyDataConfig": {

      "saveMessyData": false,

      "saveConfig": {

        "datasource": {

          "name": "",

          "type": "mysql",

          "datasourceId": 0,

          "db": "",

          "table": \[

            "table"

          \]

        }

      },

      "tolerateMessyData": false,

      "maxRejectedCount": 0,

      "failedThreshold": 0.00

    },

    "alterTableStrategy": {

      "lastTable": \[\],

      "columnAlterStrategy": 1

    },

    "flowControl": {

      "enable": false,

      "type": "",

      "value": 0

    }

  }

}