数据传输OpenAPI
此手册用于记录大数据开发与管理平台中,数据传输产品所有对外开放的OpenAPI,阅读此手册,你将了解数据传输产品开放的OpenAPI能力与调用方法。
一、 数据传输公共参数
1.1 响应格式
名称 | 类型 | 描述 |
---|---|---|
code | Number | 响应码 |
reqId | String | 请求ID |
cost | Number | 耗时,单位:ms |
msg | String | 响应消息 |
result | Object | 响应结果 |
1.2 响应码
数据传输以4位中的首位区分功能模块,第二位区分业务子模块,后面两位递增,如不足则顺延。
1.2.1 1000~1999 数据源校验异常码
响应码 | 说明 |
---|---|
1001 | 数据源被引用 |
1002 | 数据源URL非法平台模块查询异常 |
1003 | 数据源名称已存在 |
1004 | 数据源已存在(url,user) |
1005 | 数据源不存在 |
1006 | 数据源不支持读 |
1007 | 数据源不支持写 |
1101 | 表不存在 |
1102 | 同义词不是表 |
1103 | 同义词没有读权限 |
1200 | SQL解析异常 |
1201 | hql不能包含db |
1202 | hql语法错误 |
1203 | sql的表名解析异常 |
1.2.2 2000~2999 离线任务异常码
响应码 | 说明 |
---|---|
2001 | 任务已被引用 |
2002 | 任务名称已存在 |
2003 | 任务不存在 |
2004 | 任务已提交 |
2005 | 任务不存在实例, 请先运行任务 |
2006 | 任务Reader写库异常 |
2007 | 任务Writer写库异常 |
2201 | 校验脱敏规则是否存在 |
1.2.3 3000~3999 实时任务异常码
1.2.4 4000~5999 其他子模块业务异常码
响应码 | 说明 |
---|---|
4000 | 统一错误跳转web页面,任务权限校验失败code |
4001 | 统一错误浮窗提示 |
1.2.5 6000~7999 预留业务异常码
1.2.6 8000~9999 调用其他子产品异常码
响应码 | 说明 |
---|---|
8001 | 单点认证失败 |
8002 | ACC登录异常 |
8100 | 元数据中心异常 |
8101 | 任务血缘发送异常 |
8202 | Azkaban请求错误 |
8203 | Azkaban响应错误 |
8204 | AZKABAN连接错误 |
8400 | 离线开发平台异常 |
8401 | 离线开发平台的任务名称已存在 |
8402 | 离线任务来源表名称重复 |
8403 | 离线开发任务与数据同步节点同名,造成环路异常 |
8600 | 安全中心异常 |
8700 | sloth异常 |
8701 | sloth校验语法异常 |
1.3 枚举列表
1.3.1 DataSourceTypeEnum
数据源类型枚举
枚举值 | 类型 | 枚举code | 名称 |
---|---|---|---|
mysql | String | 1 | mysql |
hive | String | 2 | Hive |
ddb | String | 3 | 网易自研数据库DDB |
ddbqs | String | 4 | 网易自研数据库DDB的qs版本 |
oracle | String | 5 | Oracle |
hbase | String | 6 | Hbase |
kudu | String | 7 | Kudu |
kafka | String | 8 | Kafka |
rocketmq | String | 9 | RocketMq |
mongodb | String | 10 | MongoDB |
es | String | 11 | elasticsearch |
api | String | 12 | Http api数据源 |
hdfs | String | 13 | HDFS |
greenplum | String | 14 | Greenplum |
db2 | String | 15 | DB2 |
hana | String | 16 | Hana |
dm | String | 17 | 达梦数据库 |
sqlserver | String | 18 | Sqlserver |
postgresql | String | 19 | PostgreSQL |
ftp | String | 20 | Ftp |
clickhouse | String | 21 | ClickHouse |
tidb | String | 22 | TiDB |
doris | String | 23 | Doris |
vertica | String | 24 | Vertica |
cassandra | String | 25 | Cassandra |
phoenix | String | 26 | phoenix |
localfile | String | 27 | 本地文件数据源 |
maxcompute | String | 28 | 阿里maxcompute |
redis | String | 29 | Redis |
1.3.2 HiveType
枚举 | 类型 | 描述 |
---|---|---|
mammut | String | 中台内部hive,枚举为空也为此值 |
hiveExt | String | 外部hive数据源 |
1.3.3 TableNameType
枚举 | 类型 | 描述 |
---|---|---|
normal | String | 库表选择 |
regular | String | 正则匹配 |
synonym | String | 同义词 |
1.3.4 DatasourceMode
枚举 | 类型 | 描述 |
---|---|---|
logical | String | 逻辑数据源 |
physical | String | 物理数据源 |
1.4 参数实体
1.4.1 Reader
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
conf | Map<String, String> | 读取高级配置 | |
setting |
Map<String,Object> |
读取功能参数 | 由于数据源太多,且特性各异,见第三章功能参数说明 |
datasources | List<Datasource> |
数据源信息 | 不支持多库表的数据源,size=1 Mysql,Oracle,Sqlserver支持多库表 |
1.4.2 Writer
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
conf | List<Map<String, String>> | 写入高级配置 | |
setting | Map<string,Object> | 写入功能参数 | 由于数据源太多,且特性各异,见第三章读写功能参数说明 |
datasource | Datasource | 数据源信息 | 具体详见数据源库表参数说明 |
1.4.3 Datasource
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
datasourceId | Long | 数据源ID | 元数据中心登记参数 |
db | String | 数据库名称 | |
table |
List<String> |
表名称 | 不支持多库表的数据源,size=1,Mysql,Oracle,Sqlserver支持多库表 去向数据源时,size=1 |
tableNameType | TableNameType | 表名称规则类型 | 见TableNameType枚举 |
hiveType | HiveType | hive的类型 | 见HiveType枚举 只有hive数据源支持 |
datasourceMode | DatasourceMode | 枚举,逻辑数据源,物理数据源 | 无 |
1.4.4 Handler
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
map | List<ColumnMap> | 列映射列表 | 无 |
1.4.5 ColumnMap
这里包含了很多参数,有些参数是特定场景下使用,具体见[4.2 ColumnMap的构造]
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
columnSelectValue | String | 只用于Hbase中 | 见Hbase实践说明 |
oldName | String | 来源表字段名称 | |
oldSourceType | String | 来源表字段类型 | 值="$CUSTOM_EXPR$"时,oldName为自定义表示 非结构化数据源,Hbase,HDFS,FTP,使用此类型 |
oldType |
String |
来源表字段类型 | 用于实时任务 |
comment |
String | 来源字段注释 |
|
oldExpression | String | 来源表字段自定义表达式 | |
newColumnSelectValue | String | 只用于Hbase中 | 见Hbase实践说明 |
newName | String | 去向表字段名称 | |
newSourceType | String | 去向表字段类型 | 只用于web展示 |
newType | String | 去向表字段类型 | 实时任务 |
newComment | String | 去向字段注释 | 无 |
1.5 响应实体
二、OpenAPI列表
2.1 OpenAPI总览
模块 | 名称 | 支持版本 | 状态 | 请求方法 | 请求路径 |
---|---|---|---|---|---|
离线任务管理 | 任务创建 | v2.7.0 | 已发布 | POST | /task/v2/add |
实时任务管理 | 任务启动 | v3.10.0 | POST | /stream/v2/job/exec | |
实时任务管理 | 任务停止 | v3.10.0 | POST | /stream/v2/job/stop |
2.2 离线任务管理OpenAPI
2.2.1 任务创建
POST /api/openapi/task/v2/add
产品版本:v2.5.0
描述:创建任务
入参
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
product | String | 项目名称 | |
clusterId | String | 集群Id | |
user | String | 用户负责人邮箱 | |
name |
String | 任务名称 | 256字符长度 mysql可识别字符 |
taskId |
String | 任务Id |
非必须参数,不填由server生成 <64字符 整型字符串 |
description | String | 任务描述 | 1024字符 |
conf | Map<String,String> | 高级配置 | 任务级别的高级配置 |
readerType | DataSourceTypeEnum | 读取数据源 | |
reader | Reader | 读取实体 | |
writerType | DataSourceTypeEnum | 写入数据源 | |
writer | Writer | 写入实体 | |
paramSetIds | List<Long> |
参数组 | 离线开发平台登记参数组,传输使用参数组ID |
handlers | List<Handler> | 列处理器 | openAPI 只支持列映射处理器 size=1 见Handler说明 |
columns |
List<String> |
同名列映射处理 server端根据List<String> 和去向端的字段,自动同名映射生成handlers |
与handlers 只可存其一,以columns 优先 参见最佳实践4.1的说明 |
setting | Map | 任务功能参数 | 见3.1节功能参数说明 |
出参: String 任务ID
示例 Mysql2Mysql任务:
入参示例:
{
"email": "tangjiafu@corp.netease.com",
"product": "data\_transform",
"clusterId": "dev4",
"conf": {},
"name": "参数校验mysql2mysql\_1",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"datasources": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"conf": {},
"setting": {
"postSQL": \[\],
"datasourceId": 2727,
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newType": "bigint",
"newComment": "主键",
"oldType": "bigint",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newType": "string",
"newComment": "姓名",
"oldType": "string",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
},
{
"newType": "string",
"newComment": "手机号码",
"oldType": "string",
"newSourceType": "VARCHAR(16)",
"newName": "phone",
"oldSourceType": "VARCHAR(16)",
"oldName": "phone",
"comment": "手机号码"
},
{
"newType": "timestamp",
"newComment": "时间1",
"oldType": "timestamp",
"newSourceType": "TIMESTAMP(0)",
"newName": "time\_1",
"oldSourceType": "TIMESTAMP(0)",
"oldName": "time\_1",
"comment": "时间1"
}
\]
}
\]
}
出参:
参数 | 参数类型 | 描述 |
---|---|---|
code | Number | 响应码 |
reqId | String | 请求ID |
cost | Number | 耗时,单位:ms |
msg | String | 响应消息 |
result | String | 任务Id |
1649397381105018为成功创建任务的ID
{
"code": 200,
"message": "success",
"result": "1649397381105018",
"reqId": "0773215919a348d48c435df103e606fe",
"cost": 550
}
2.3 实时任务管理
2.3.1 启动任务
POST /api/openapi/stream/v2/job/exec
产品版本:v3.10.0
描述:启动任务
入参
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
user | User | 用户信息 | |
request | JobRunConfigReqDTO | 请求信息 | 无 |
User
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
product | String | 项目名称 | |
clusterId | String | 集群id | |
user | String | 用户负责人邮箱 | 无 |
JobRunConfigReqDTO
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
taskId | String | 任务id | |
runConfig | RunConfig | 运行参数 | |
checkPointConfig | CheckPointConfig | cp参数 | |
restartStrategy | RestartStrategy | 重启策略 | |
advancedConfig | List<Pair> | 高级参数 | |
customConfig | List<Pair> | 自定义参数 | 无 |
RunConfig
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
yarnQueue | Integer | 实例id | |
slots | Integer | slot数量 | |
tmMemory | Integer | TM内存,到位MB | |
jmMemory | Integer | JM内存,单位MB | |
parallelism | Integer | 并行度 | 无 |
CheckPointConfig
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
cpInterval | Integer | cp间隔时间,单位:秒 | |
cpMode | String | 模式,可选值:EXACTLY_ONCE,AT_LEAST_ONCE | |
cpTimeout | Integer | 超时时间,单位:秒 | |
cpSwitch | Boolean | 是否开启checkpoint | |
checkPointPath | String | cp或sp启动时,状态文件的路径 | |
checkPointType | String | 启动位点类型,可选值:cp, sp, none, custom | cp: 从cp启动; sp:从sp启动 none:直接启动 custom:自定义位点启动 |
RestartStrategy
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
strategy | String | 重启策略,可选值:failure-rate, fixed-delay | failure-rate: 失败率重启, fixed-delay: 固定间隔重启 |
restartFailedCount | Integer | 失败重启次数 | |
restartTimeInterval | Integer | 重试时间间隔, 单位:秒 | 无 |
Pair
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
key | String | 属性的键 | |
value | String | 属性的值 | 无 |
出参:
参数 | 参数类型 | 描述 |
---|---|---|
code | Number | 响应码 |
reqId | String | 请求ID |
cost | Number | 耗时,单位:ms |
msg | String | 响应消息 |
result | RunJobRespDTO | 停止作业响应 |
JobRunRespDTO
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
execId | Integer | 实例id | 无 |
2.3.2 停止任务
POST /api/openapi/stream/v2/job/stop
产品版本:v3.10.0
描述:启动任务
入参
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
user | User | 用户信息 | |
request | StopJobReqDTO | 请求信息 | 无 |
User
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
product | String | 项目名称 | |
clusterId | String | 集群Id | |
user | String | 用户负责人邮箱 | 无 |
StopJobReqDTO
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
taskId | String | 任务Id | |
savepoint | Boolean | 是否保存savepoint | 无 |
出参:
参数 | 参数类型 | 描述 |
---|---|---|
code | Number | 响应码 |
reqId | String | 请求ID |
cost | Number | 耗时,单位:ms |
msg | String | 响应消息 |
result | StopJobRespDTO | 停止作业响应 |
StopJobRespDTO
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
stopSuccess | Boolean | 是否停止成功 | 无 |
三、数据源参数说明
当前的setting根据功能各有差异,有些功能需要多个参数组合使用,主要分为特殊字符替换替换,并发读取,流量控制等;
3.1节列举了一些任务级别的功能参数。
3.2节列举Reader和Writer setting里的一些常规参数字典,另外有一些专用于固定一个数据源的,具体查看数据源。
tips:
非必选参数应该为null或者不填,比如没有分区,比不是"partition":[]
读写支持
数据源 | 读 | 写 |
---|---|---|
Mysql | Y | Y |
SQLserver | Y | Y |
Oracle | Y | Y |
PostgreSQL | Y | Y |
TiDB | Y | Y |
Hive | Y | Y |
Greenplum | Y | Y |
Doris | Y | Y |
Hbase | Y | Y |
ElasticSearch | Y | Y |
API | Y | |
ClickHouse | Y | Y |
FTP | Y | Y |
HDFS | Y | Y |
Kafka | Y | |
Kudu | Y | Y |
Mongo | Y | Y |
StarRocks | Y | Y |
3.1 Task功能参数说明
task功能参数指针对该任务生效的参数组合,大多数任务全面支持。
3.1.1 流量控制
{
// 省略了其他任务参数
"setting": {
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
上述参数,在task-json中的setting里面增加flowControl键表示流量控制,指为json,参数说明如下:
参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
enable | 否 | 布尔 | false | 是否开启流量控制,开启时,type,value必须 |
type | 否 | 字符串 | null | 流量控制类型枚举: size:流量控制按照字节数控制,单位MB count: 流量控制按照行数控制 |
value | 否 | 整数 | null | 流量控制限速值 |
3.1.2 脏数据管理参数
脏数据管理是为了解决数据一旦出现错误,任务即会中断的场景,从而使得容许一定数据量的脏数据,并把脏数据存储到另外一张表中
{
"setting": {
"saveMessyData": false,
"saveConfig": {
"datasource": {
"name": "",
"type": "mysql",
"datasourceId": 0,
"db": "",
"table": \[
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
}
}
参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
saveMessyData | 是 | 布尔 | false | 脏数据保存开关 |
tolerateMessyData | 是 | 字符串 | true | 脏数据容忍开关 |
maxRejectedCount | 否 | 整数 | 0 | 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。 |
failedThreshold | 否 | 浮点数 | 0.0 | 表示子任务的脏数据最大容忍条数。如果填写0,则表示不允许脏数据存在。 |
saveConfig | 否 |
Object | null | 当开启脏数据容忍是,必须参数 脏数据保存配置参数配置 |
datasource | 是 | Object | null | 脏数据数据源配置 saveMessyData !=null时为必须参数 |
name | 是 | 字符串 | null | 数据源名称 |
type | 是 | 字符串 | null | 数据源类型 |
datasourceId | 是 | 整型 | null | 数据源id |
db | 是 | 字符串 | null | 数据源数据库名称 |
table | 是 | 字符串列表 | null | 数据源表名, size=1,多个表只取第一个 |
3.1.3 来源表结构变化策略
{
"setting": {
"alterTableStrategy": {
"lastTable": \[
\],
"columnAlterStrategy": 1
}
}
}
alterTableStrategy参数如下
参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
lastTable | 否 | 字符串列表 | 空列表 | 上一次修改任务(创建任务)来源表的表结构列名列表 |
columnAlterStrategy | 否 | 整型枚举 | 1 | 表结构策略 1:忽略表结构变更策略 2:变更表结构并变更映射关系 3:抛异常 |
3.1.4 脱敏支持
{
"setting": {
"maskScanSetting":{
"scanResult":\[
{
"maskName":"邮编遮盖脱敏",
"column":"name"
},
{
"maskName":"银行卡遮盖脱敏",
"column":"age"
}
\]
}
}
}
maskScanSetting参数
参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
scanResult | 必须 | List<ScanResultDTO> | 无 | 手动指定脱敏列名及其脱敏规则 |
ScanResultDTO参数如下
参数 | 是否必须 | 数据类型 | 默认值 | 描述 |
---|---|---|---|---|
maskName | 必须 | String | 无 | 脱敏规则名称 |
column | 必须 | String | 无 | 脱敏列名 |
3.2 读写功能参数说明
读写功能参数指针对读取或者写入时的功能参数设置
3.1.1 一般参数字典
一般参数字典指大多数数据源包含这些参数,在此说明,供开发者查阅
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
customSql | String | SQL模式任务的sql | 仅mysql,sqlserver,oracle,hana支持 |
conditions | string |
数据过滤 where之后的sql |
和流水型任务参数互斥 |
transformType | String | 传输类型 | 见枚举 |
insertType | String | 写入规则类型 | 根据数据源而定有不同枚举 |
postSQL | List<String> | 写入开始前SQL | 至多5条sql |
preSQL | List<String> | 写入结束后SQL | 至多5条sql |
3.1.2 特殊字符替换
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
enableStrReplace | boolean | 开启字符串替换 | |
originalStrs | String | 原始字符串 | |
replacedStr | String | 替换字符串 | 无 |
3.1.3 并发读取
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
forbiddenSplit | boolean | 是否禁用并发读取 | 与是否开启含义相反 |
split | String | 切分键(字段) | 要求为主键 类型为 Integer 或TimeStamp |
partitionNum | String | 并发度 | 无 |
3.1.4 流水型任务
流水型任务与非组合参数中数据过滤参数(conditions)互斥
参数 | 参数类型 | 描述 | 约束与补充 |
---|---|---|---|
initialValue | String | 流水型起始值 | |
column | String | 流水型字段名称 | |
columnSourceType | String | 流水型字段类型 | 用于web页面展示 用户sqlserver的特殊处理,值为timestamp时,使用转换函数 |
columnDataType | Integer | 流水型字段类型,任务运行时使用 参考jdbc的枚举code值 java.sql.Types |
支持整型和timestamp spark类型的枚举值 |
3.3 具体数据源特性参数
3.2.1 Mysql
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) enableStrReplace originalStrs replacedStr |
否 | 查看前文参数字典 | |
并发读取(3个参数) forbiddenSplit split partitionNum |
否 | 查看前文参数字典 | |
流水型任务(3个参数) initialValue column columnSourceType columnDataType |
否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
conditions | 否 | String | where 条件 |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 否 | 枚举 | 写入规则类型(对于mysql语法) into插入 ignore忽略主键冲突 overwrite覆写 insertonduplicatekeyupdate 插入时主键冲突更新 |
postSQL | 否 | List<String> | 写入后执行sql |
preSQL | 否 | List<String> | 写入结束后SQL |
任务Mysql2Mysql
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"conf": {
"1": "2"
},
"name": "参数校验mysql2mysql",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
"key1": "value1",
"key2": "value2"
},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"initialValue": 0,
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"column": "",
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"conf": {},
"datasource": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"setting": {
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\],
"postSQL": \[\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
\]
}
3.2.2 SQLserver
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
并发读取(3个参数) | 否 | 查看前文参数字典 | |
流水型任务(3个参数) | 否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
conditions | 否 | String | where 条件 |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\]
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 否 | 枚举 | 写入规则类型 into插入 |
postSQL | 否 | List<String> | 写入后执行sql |
preSQL | 否 | List<String> | 写入结束后SQL |
3.2.3 Oracle
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"customSql": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
并发读取(3个参数) | 否 | 查看前文参数字典 | |
流水型任务(3个参数) | 否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
conditions | 否 | String | where 条件 |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 否 | 枚举 | 写入规则类型 into |
postSQL | 否 | List<String> | 写入后执行sql |
preSQL | 否 | List<String> | 写入结束后SQL |
3.2.4 PostgreSQL
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
并发读取(3个参数) | 否 | 查看前文参数字典 | |
流水型任务(3个参数) | 否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\]
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 否 | 枚举 | 写入规则类型 into插入 |
postSQL | 否 | List<String> | 写入后执行sql |
preSQL | 否 | List<String> | 写入结束后SQL |
3.2.5 TiDB
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[""\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"ifCondition": {
"relatedArray": {},
"componentType": {}
},
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"split": "",
"partitionNum": "",
"forbiddenSplit": true,
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": "",
"transformType": "",
"ignoreConditions": true
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
并发读取(3个参数) | 否 | 查看前文参数字典 | |
流水型任务(3个参数) | 否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
}
}
参数说明
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 否 | 枚举 | 写入规则类型 into upsert主键冲突是覆写 ignore主键冲突时忽略 |
postSQL | 否 | List<String> | 写入后执行sql |
preSQL | 否 | List<String> | 写入结束后SQL |
3.2.6 Hive
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"conf": {},
"setting": {
"conditions": "",
"readMode": "",
"partitions": \[
{
"value": ""
}
\],
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
readMode | 是 | 枚举 | hive 读取方式 copy:hadoop distcp文件拷贝 impala:impala读取 jdbc: hiveserver2读取 spark: spark读取 |
partitions | 否 | 对象 | 分区信息 readMode=copy时支持 |
conditions | 否 | String | 过滤条件 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"setting": {
"insertType": "",
"partitionList": \[
{
"key": "",
"partitionType": "",
"value": "",
"valueType": ""
}
\]
}
}
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 是 | 枚举 | overwrite 先删除表中数据,再重新写入 into 在表中追加数据 |
partitions | 否 | Partition | 分区信息 readMode=copy时支持 |
conditions | 否 | String | 过滤条件 |
Partition
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
key | 是 | String | 分区key |
partitionType | 是 | 枚举 | static 静态分区 dynamic动态分区 |
value | 是 | 分区值表达式 | 动态分区时,为列值 静态分区为常量值,支持azkaban表达式 |
3.2.7 Greenplum
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[""\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"partitionColumn": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
partitionColumn | 否 | String | greenplum-spark-connector用于分区字段参数 表中整型字段 |
conditions | 否 | String | 过滤条件 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
"hiveType": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
insertType | 是 | 枚举 | truncate清空表数据写入 append overwrite |
postSQL | 否 | 见前文参数字典 | |
preSQL | 否 | 见前文参数字典 |
3.2.8 Doris
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数名 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换 (三个参数) |
否 | 见参数字典描述 enableStrReplace originalStrs replacedStr |
|
conditions | 否 | String | 过滤条件(where,分区参数) |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
},
"setting": {
"insertType": "",
"postSQL": \[""\],
"preSQL": \[""\],
"maxFilterRatio": "",
"writeMode": ""
}
}
参数描述
参数 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
postSQL | 否 | 见参数字典 | |
preSQL | 否 | 见参数字典 | |
insertType | 是 | 枚举 | |
writeMode | 是 | 枚举 | 写入方式 brokerload streamload |
loadInterval | 否 | 整型 | 导入间隔 writeMode=streamload时,为必填参数 |
maxFilterRatio | 否 | 浮点数字符串 | 最大容忍比例 writeMode=brokerload时,为必填参数 |
3.2.9 Hbase
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"conf": {},
"setting": {
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"startRowKey": "",
"endRowKey": ""
}
}
参数名 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换 (三个参数) |
否 | 见参数字典描述 enableStrReplace originalStrs replacedStr |
|
conditions | 否 | String | 过滤条件(where,分区参数) |
startRowKey | 否 | String | hbase 起始rowKey |
endRowKey | 否 | String | hbase 终止rowKey |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"writeType": "",
"dfsNameNodeRpcAddress": "",
"dfsNameServices": ""
}
}
参数描述
参数 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
writeType | 是 | 枚举 | 写入方式 bulkLoad put |
dfsNameNodeRpcAddress | 否 | String | HFile所在HDFS的schema writeType=bulkLoad启用 |
dfsNameServices | 否 | String | nameNode rpc地址,多个以逗号分隔 writeType=bulkLoad启用 |
3.2.10 ElasticSearch
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数名 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(三个参数) | 否 | 见参数字典描述 |
|
conditions | 否 | String | 过滤条件(where,分区参数) |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"insertType": "",
"batchSize": 0,
"mappingId": "",
"indexType" :""
}
}
参数描述
参数 | 是否必须 | 参数类型 | 语义和约束 |
---|---|---|---|
batchSize | 是 | Long | batchSize es批量写入数据量 |
mappingId | 是 | String | es 主键字段 |
insertType | 是 | 枚举 | index (default) 根据ES主键字段新增数据,如果已经存在就替换并重建索引 create 根据ES主键字段添加新的数据,如果数据已经存在就报错 update 根据ES主键字段更新数据,如果数据不存在就报错 upsert 根据ES主键字段数据存在就更新,不存在就新增 |
indexType | 是 | 枚举 | 索引类型: dynamic静态索引 static动态索引 |
3.2.11 API
Reader Template
"reader": {
"conf": {},
"datasources": \[
{
"datasourceId": 3220,
"db": "default",
"table": \[
"/hello"
\]
}
\],
"setting": {
"headers": \[
{
"value": "header",
"key": "header"
}
\],
"method": "post",
"enableStrReplace": false,
"paging": {
"total": "id",
"type": "offset"
},
"body": "{\\n\\"body\\":\\"body\\"\\n}",
"transformType": "common",
"originalStrs": "",
"enablePaging": "true",
"replacedStr": ""
}
}
参数描述
特殊字符替换参数:enableStrReplace、originalStrs、replacedStr
参数名 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
body | N | String | post请求体 |
params | N | String | get请求体,拼到url中 |
headers | N | String | http header |
method | Y | 枚举 | HTTP方法,枚举 get、post |
enablePaging | Y | bool | 是否启用分页参数 |
paging | N | 对象 | 分页参数 |
total | N | String | 分页字段名称 具体使用jsonPath从返回Response中获取该值 |
type | N | 枚举 | 分页类型,枚举 page、offset page启用两个参数进行分页:pageNum、pageSize offset启用两个参数进行分页:offset、limit |
3.2.12 ClickHouse
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"flowControl": "",
"transformType": "common",
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"forbiddenSplit": true,
"split": "",
"partitionNum": "",
"initialValue": "",
"column": "",
"columnSourceType": "",
"columnDataType": ""
}
}
参数描述
参数类型 | 是否必须 | 参数类型 | 语义与约束 |
---|---|---|---|
特殊字符替换(3个参数) | 否 | 查看前文参数字典 | |
并发读取(3个参数) | 否 | 查看前文参数字典 | |
流水型任务(3个参数) | 否 | 查看前文参数字典 | |
customSql | 否 | String | sql模式的sql |
conditions | 否 | String | where 条件 |
transformType | 否 | String | 任务类型,枚举 common:一般任务 line:流水型任务 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": ""
},
"setting": {
"insertType": "",
"postSQL": \[
""
\],
"preSQL": \[
""
\]
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
insertType | 是 | 写入模式,枚举 into |
postSQL | 否 | 写入前sql |
preSQL | 否 | 写入后sql |
3.2.13 FTP
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": ""
}
\],
"conf": {},
"setting": {
"fileFilter": true,
"filteredPath": {
"type": "",
"paths": \[
{
"fileName": "",
"fullPath": "",
"isFile": true,
"size": 0,
"timestamp": "",
"fileSize": 0,
"fileId": 0
}
\],
"conditions": \[
{
"type": "",
"value": "",
"operator": ""
}
\]
},
"docSplit": true,
"format": "",
"fieldSeparator": "",
"skipLines": "",
"sheetName": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": "",
"enableRowCheck": true,
"checkFilePath": "",
"checkContentSeparator": "",
"checkContentRowPosition": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
fileFilter | Y | 文件筛选,true:全部文件,false:部分文件 |
filteredPath | N | 文件过滤条件,fileFilter=false时必须 |
docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
format | N | 文件格式 |
fieldSeparator | N | 列分隔符 |
skipLines | N | 跳过行数 |
sheetName | N | 传输Excel时sheet名称 |
字符串替换功能参数
enableStrReplace | N | 是否开启字符替换 |
---|---|---|
originalStrs | N | 原始str |
replacedStr | N | 替换str |
行数校验功能参数
enableRowCheck | N | 是否开启行数校验 |
---|---|---|
checkFilePath | N | 校验文件路径 |
checkContentSeparator | N | 校验文件分隔符 |
checkContentRowPosition | N | 校验数据的行数位置 |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
""
\]
},
"setting": {
"format": "",
"insertType": "",
"fileNameType": "",
"fileName": "",
"preAction": \[
{
"actionType": "",
"actionPath": "",
"actionNewPath": ""
}
\],
"postAction": \[
{
"actionType": "",
"actionPath": "",
"actionNewPath": ""
}
\],
"writeFlagFile": true
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
format | 否 | 文件格式,枚举 |
docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
insertType | 是 | 同名文件处理规则 into 覆盖 overwrite |
fileNameType | 是 | 目标文件名类型 自定义custom 系统生成system |
preAction | 否 | 前置处理,不处理actionType=none |
postAction | 否 | 后置处理,不处理actionType=none |
前置处理、后置处理
参数名 | 是否必须 | 语义与约束 |
---|---|---|
actionType | 是 | 枚举 不处理、删除、重命名 none、rm、rename |
actionPath | 否 | 操作文件路径 |
actionNewPath | 否 | 重名名后的文件全路径 |
3.2.14 HDFS
Reader Template
{
"datasources": \[
{
"table": \[
""
\],
}
\],
"conf": {},
"setting": {
"conditions": "",
"docSplit": "",
"fieldSeparator": "",
"format": "",
"skipLines": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
fileFilter | Y | 文件筛选,true:全部文件,false:部分文件 |
filteredPath | N | 文件过滤条件,fileFilter=false时必须 |
docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
format | N | 文件格式 |
fieldSeparator | N | 列分隔符 |
skipLines | N | 跳过行数 |
sheetName | N | 传输Excel时sheet名称 |
Writer Template
{
"conf": {},
"datasource": {
"table": "",
},
"setting": {
"compressionCodec": "",
"fileName": "",
"fileNameType": "",
"format": "",
"insertType": "",
"writeFlagFile": true
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
format | 否 | 文件格式,枚举 |
docSplit | 是 | true:结构化文件传输 false:非结构化文件传输 |
insertType | 是 | 同名文件处理规则 into 覆盖 overwrite |
fileNameType | 是 | 目标文件名类型 自定义custom 系统生成system |
preAction | 否 | 前置处理,不处理actionType=none |
postAction | 否 | 后置处理,不处理actionType=none |
前置处理、后置处理
参数名 | 是否必须 | 语义与约束 |
---|---|---|
actionType | 是 | 枚举 不处理、删除、重命名 none、rm、rename |
actionPath | 否 | 操作文件路径 |
actionNewPath | 否 | 重名名后的文件全路径 |
3.2.15 Kafka
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
},
"setting": {
"key": "",
"producerConf": \[
{
"key": "",
"value": ""
}
\]
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
key | 是 | 写入kafka时,数据形式如下 {"key":key,"value":json} |
producerConf | 否 | kafka的连接配置项 |
3.2.16 Kudu
Reader Template
{
"datasources": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
},
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
conditions | 否 | 过滤条件(where,分区参数) |
字符串替换功能参数
enableStrReplace | N | 是否开启字符替换 |
---|---|---|
originalStrs | N | 原始str |
replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": "default",
},
"setting": {
"insertType": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
insertType | 是 | 写入方式,枚举: into插入 overwrite覆写 |
3.2.17 Mongo
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": "",
"version": "",
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
conditions | 否 | 过滤条件(where,分区参数) |
字符串替换功能参数
enableStrReplace | N | 是否开启字符替换 |
---|---|---|
originalStrs | N | 原始str |
replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": "",
"version": ""
},
"setting": {
"insertType": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
insertType | 是 | 写入方式,枚举: append追加 overwrite覆写 |
3.2.18 StartRocks
Reader Template
{
"datasources": \[
{
"datasourceId": 0,
"db": "",
"table": \[
""
\],
"version": "",
"tableNameType": "",
"datasourceMode": ""
}
\],
"conf": {},
"setting": {
"conditions": "",
"enableStrReplace": true,
"originalStrs": "",
"replacedStr": ""
}
}
参数描述
参数名 | 是否必须 | 语义与约束 |
---|---|---|
conditions | Y | 过滤条件(where,分区参数) |
字符串替换功能参数
enableStrReplace | Y | 是否开启字符替换 |
---|---|---|
originalStrs | N | 原始str |
replacedStr | N | 替换str |
Writer Template
{
"conf": {},
"datasource": {
"datasourceId": 0,
"db": "",
"table": \[
"demo"
\],
"version": "",
"tableNameType": "",
},
"setting": {
"loadInterval": 0,
"maxFilterRatio": "0.0",
"insertType": "into",
"brokerLoadBrokers":"",
"postSQL": \[\],
"preSQL": \[\]
"writeMode": "streamload"
}
}
参数描述
参数名 | 是否必须 | 类型 | 语义与约束 |
---|---|---|---|
postSQL | 否 | 见参数字典 | |
preSQL | 否 | 见参数字典 | |
writeMode | 是 | 枚举 | 写入方式 brokerload streamload |
loadInterval | 否 | 整型 | 导入间隔 writeMode=streamload时,为必填参数 |
maxFilterRatio | 否 | 浮点数字符串 | 最大容忍比例 writeMode=brokerload时,为必填参数 |
insertType | 否 | 写入类型,枚举: into |
|
brokerLoadBrokers | 否 | String | broker名称列表,以逗号分割 |
四、最佳实践
4.1 创建Mysql2Mysql任务
阅读此章节,请了解[OpenCreateTask]相关参数
4.1.1 构造Task层级参数
如下所示,一个不含有reader,writer以及 handlers/columns的json,但是readerType,writerType已经确定,表明该任务是Mysql2Mysql的任务;
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"name": "参数校验mysql2mysql\_2",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
// 读取高级配置项,键值对
},
"datasources": \[
// 读取数据源列表,只有部分数据源支持多个来源,参考数据源参数说明
\],
"setting": {
// 根据数据源而定,查阅Reader Template
}
},
"writerType": "mysql",
"writer": {
"conf": {
// 写入高级配置项
},
"datasource": {
// 写入数据源参数
},
"setting": {
// 写入配置项,根据数据源而定,查阅Writer Template
}
},
"handlers": \[
{
}
\],
"setting": {
"messyDataConfig": {
"saveMessyData": false,
// saveMessyData= false 表示,不需要
"saveConfig": {
"datasource": {
"name": "",
"type": "",
"datasourceId": 0,
"db": "",
"table": \[
""
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
},
"alterTableStrategy": {
"lastTable": \[\],
"columnAlterStrategy": 1
},
// 流量控制,查阅流量控制参数
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
4.1.2 构造reader层级
4.1.3 构造reader的datasources
查询1.4.3DataSource,根据元数据中心所登记的数据源信息,构造如下JSON参数
根据1.4.3DataSource参数描述,Mysql是支持多库表的,如果有多个来源库表,可以填写多个table和多个DataSources
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
4.1.4 构造reader的setting
查询3.2.1 知道Mysql支持哪些特性参数,非常容易可以构造如下参数,
值得注意的是,替换字符串的参数,需要注意是否转义,数据传输代码由java进行编码,不进行转义和反转义操作,这里的json中的"\\n",接收后是\n符号;
{
// 数据过滤
"conditions": "1=1",
// 传输类型为普通类型
"transformType": "common",
// 并发读取
"split": "id",
"forbiddenSplit": false,
"partitionNum": 100,
// 特殊字符替换
"enableStrReplace": true,
"replacedStr": "ss",
"originalStrs": "\\\\n,\\\\r,\\\\01"
}
4.1.5 构造writer
同理,可以构造writer,到目前为止构造出来的Task如下
{
"writer": {
"datasources": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"conf": {},
"setting": {
"postSQL": \[\],
"datasourceId": 2727,
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
}
}
4.1.6 构造handlers
由 1.4.1 可以知道 handers和columns为互斥参数,以columns为优先,[columns参数见4.3节]
handlers的构造,参照1.4.5和1.4.6去构造 columnHandler,columnHandler中的map表示列映射详情,具体参考1.4.6 ColumnMap进行映射:
Hbase由于其schema复杂性,进行了特殊处理,且暂时不支持columns进行列名映射,参见实践[ColumnMap],实践ColumnMap还提供了简化参数的方式。
{
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
4.2 ColumnMap的构造
ColumnHandler包含列映射构造,其核心为ColumnMap的构造,对于结构化数据源
只需要两个字段:newName和oldName,即reader的列名和writer的列名
比如一个Mysql2Mysql的任务 columnMap最终构造如下
"map": \[
{
"newName": "id",
"oldName": "id"
},
{
"newName": "name",
"oldName": "name"
},
{
"newName": "age",
"oldName": "age"
},
{
"newName": "remark",
"oldName": "remark"
},
{
"newName": "createAt",
"oldName": "createAt"
}
\]
但对于非结构化数据源,则需要定义reader的类型,而去向则不需要,因为去向如果是非结构化,写入到文件中,都会转为String类型;
Hbase因为其特殊性,包含了另外两个参数
在 columnSelectiveValue不为空时,有如下逻辑
if (StringUtils.equalsIgnoreCase(columnSelectiveValue, "rowkey")) {
oldName = "rowkey"
} else {
if (StringUtils.isNotEmpty(oldName)) {
oldName = oldName + ":" + mapColumnData.getOrElse("oldExpression", "")
}
}
上述逻辑,文字概括:对于rowkey列,columnSelectiveValue=rowkey
对于非rowkey列,oldName为列簇,oldExpression为列名
mysql2Hbase的ColumnMap
"map": [
{
"oldType": "INT",
"columnSelectValue": "rowKey",
"newName": "id",
"oldName": "0",
},
{
"oldType": "STRING",
"columnSelectValue": "0",
"newName": "name",
"oldName": "0",
"oldExpression": "name",
}
]
4.3 字段同名映射 columns参数
handers和columns为互斥参数,以columns为优先,columns只能进行同名映射。
columns表示需要导入的列,具体业务:
数据传输获取Reader和Writer的schema,进行同名映射,如果写入数据源是一个nonSchema的数据源,则只通过Reader的schema进行映射,为目标端构造schema
nonSchema数据源:EleasticSearch,MongoDB,Kafka,RocketMQ,Hbase,FTP,HDFS
{
"id",
"name"
}
上述columns,如果来源和去向都含有id,name字段,则构建两个字段的映射。
4.4 关于高级配置
高级配置分为三类,任务级别配置,来源配置(读取),去向配置(写入)。
ndi.spark,
ndi. 则表示为任务级别参数
spark常用的几个配置项
// executor内存
ndi.spark.spark-argument.executor-memory
// dirver内存
ndi.spark.spark-argument.driver-memory
// hive动态分区最大值
ndi.spark.spark-conf.spark.hadoop.hive.exec.max.dynamic.partitions
// 是否计算hive的统计信息写入到metastore
ndi.analyzeTableStatistics
spark-argument表示为spark-submit命令启动的参数,spark-conf则会添加上
--conf
4.5 任务创建实践
4.5.1 mysql2mysql
{
"product": "data\_transform",
"clusterId": "dev4",
"user": "tangjiafu@corp.netease.com",
"name": "参数校验mysql2mysql\_2",
"description": "",
"readerType": "mysql",
"reader": {
"conf": {
"key1": "value1",
"key2": "value2"
},
"datasources": \[
{
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
}
\],
"setting": {
"initialValue": 0,
"split": "id",
"ifCondition": \[\],
"forbiddenSplit": false,
"column": "",
"enableStrReplace": true,
"replacedStr": "ss",
"conditions": "1=1",
"transformType": "common",
"originalStrs": "\\\\n,\\\\r,\\\\01",
"partitionNum": 100
}
},
"writerType": "mysql",
"writer": {
"conf": {},
"datasource": {
"datasourceId": 2727,
"db": "ndi",
"tableNameType": "normal",
"table": \[
"user"
\]
},
"setting": {
"insertType": "into",
"preSQL": \[
"TRUNCATE ndi.user;"
\]
}
},
"handlers": \[
{
"add": \[\],
"type": "columnHandler",
"map": \[
{
"newComment": "主键",
"newSourceType": "BIGINT(19, 0)",
"newName": "id",
"oldSourceType": "BIGINT(19, 0)",
"oldName": "id",
"comment": "主键"
},
{
"newComment": "姓名",
"newSourceType": "VARCHAR(255)",
"newName": "name",
"oldSourceType": "VARCHAR(255)",
"oldName": "name",
"comment": "姓名"
}
\]
}
\],
"setting": {
"messyDataConfig": {
"saveMessyData": false,
"saveConfig": {
"datasource": {
"name": "",
"type": "mysql",
"datasourceId": 0,
"db": "",
"table": \[
"table"
\]
}
},
"tolerateMessyData": false,
"maxRejectedCount": 0,
"failedThreshold": 0.00
},
"alterTableStrategy": {
"lastTable": \[\],
"columnAlterStrategy": 1
},
"flowControl": {
"enable": false,
"type": "",
"value": 0
}
}
}
以上内容对您是否有帮助?