实时开发OpenAPI

此手册用于记录有数大数据开发与管理平台中,实时开发产品所有对外开放的OpenAPI,阅读此手册,你将了解实时开发产品开放的OpenAPI能力与调用方法。


文档编辑历史

版本 日期 编辑人 变更内容
V1.0.0 2022-01-13 创建文档

一 公共参数

1.1 响应格式

名称 类型 描述
code Number 响应码:非0表示失败。
reqId String 请求ID
msg String 响应消息
result Object 响应结果
cost Number 耗时

1.2 响应码(1000~9999)

响应码 描述
1000 openapi请求参数为空!
1001 请求参数必须extends OpenapiBaseRequest

1002

参数[{}]不能为空

1003

获取用户[{}]信息失败

1004

用户[{}]不存在

1005

获取产品信息失败,产品[{}]!

1006

用户[{}]产品列表为空

1007

用户[{}]不在产品里, 产品[{}]!

1008

产品[{}]不存在

1009

任务名重复

1010

目录[{}],不存在

1011

该引擎类型不存在,或者没有入库[{}]

1012

目录[{}]格式有问题

1013

目录[{}]创建失败

1014

目录深度超过两层

1015

JAR包不存在

1016

任务不存在

1017

发布失败,错误原因:[{}]

1018

文件已存在

1019

文件格式只能是*.jar, *.xml, *.proto, *.keytab, *.conf

1.3 公共枚举定义

1.3.1 EngineTypeEnum

枚举值 类型 名称
FLINK1_10 String flink引擎版本1.10
FLINK1_12_0 String flink引擎版本1.12
FLINK1_13 String flink引擎版本1.13
FLINK1_14 String flink引擎版本1.14

1.3.2 startMode

枚举值 类型 名称
FROM_LAST_CHECKPOINT String 从最近的checkpoint启动
FROM_LAST_SAVEPOINT String 从最近的savepoint启动
FROM_PATH String 从指定的路径启动
NONE String 直接启动

1.4 查询实体列表

1.4.1 JobConfig

运行参数配置

字段说明:

名称 类型 描述 是否必传 默认值
clusterName String 集群名称
queueName String 队列名称
slots int slot 1
tmMemory int tm内存:单位M 2048
jmMemory int jm内存:单位M 2048
paralleism int 并发数 1
checkpointSwitch boolean cp开关 False 关闭
checkpointInterval int cp间隔:单位s 30
checkpointMode CheckpointMode cp模式 EXACTLY_ONCE
checkpointTimeout int cp超时时间 30
stateBackend StateBackend 存储介质 hdfs
restartStrategy RestartStrategy 重启策略 fixed-delay
restartAttempts int 重启失败次数 10
restartDelay int 重试时间间隔 10
restartInterval int 重启时间范围 300
extendConfig Map flink高级配置

1.4.2 CheckpointMode

名称 类型 描述
EXACTLY_ONCE String 至多一次
AT_LEAST_ONCE String 至少一次

1.4.3 StateBackend 存储介质

名称 类型 描述
hdfs String hdfs
rocksdb String rocksdb

1.4.4 RestartStrategy 重启策略

名称 类型 描述

fixed-delay
String 固定间隔,如果超过最大尝试次数,作业会最终失败,在连续两次重启尝试之间等待固定的时间

failure-rate
String
在失败之后重新启动作业,但是当超过故障率时,作业最终会失败。在连续两次重启尝试之间等待固定时间。
如果没有启用 checkpointing,则使用无重启 (no restart) 策略。
如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略
fixed-delay 需要传入重启失败次数和重试时间间隔
failure-rate 需要传入 重启失败次数、重启时间范围、重试时间间隔

1.4.5 StartJobInfo

任务启动参数配置

字段说明:

名称 类型 描述 是否必传 默认值
jobId int 任务id
startMode startMode 启动任务的方式,枚举类型 NONE
statePath String 状态路径

1.4.6 StopJobInfo

任务停止参数配置

名称 类型 描述 是否必传 默认值
jobId int 任务id
triggerSavepoint boolean 是否触发savepoint保存

1.4.7 RestartJobInfo

任务重启参数配置

名称 类型 描述 是否必传 默认值
jobId int 任务id
startMode startMode 启动任务的方式,枚举类型 NONE
statePath String 状态路径
triggerSavepoint boolean 是否触发savepoint保存

1.4.8 AlarmInfo

告警信息配置

名称 类型 描述 是否必传 默认值
emails Array 告警用户信息 null
receiveTypes Array 告警方式的接收类型:
1:邮箱;4:短信
5:电话;6:企业微信
null
alarmRules Array 告警规则详细描述 null
alarmGroupId int 告警接收组id,如sloth开发组 null
pausingInterval int 全局告警抑制时间 null
pausingEnable boolean 是否开启告警抑制 false
alarmEnable boolean 是否开启告警 true

1.4.9 AlarmRule

告警规则

名称 类型 描述 是否必传 默认值
metricType String 告警指标类型:
failover
data_retention_latency:数据滞留延迟
job_failed:任务失败
input_qps:输入QPS
output_qps:输出QPS
failed_checkpoint:checkpoint失败次数
user_defined_latency:用户自定义延迟
record_lag:滞留数据量
backpressure:反压
metricName String 告警指标名称 -1
cycleTime int 单个统计周期的时长:(任务失败不支持配置)
1:1分钟周期
5:5分钟周期
10:10分钟周期
15:15分钟周期
20:20分钟周期
25:25分钟周期
30:30分钟周期
alarmInterval int 报警间隔时长,单位:分钟
suppressOperator String 单个统计周期内的运算符:(仅支持用户自定义延迟)
>=:大于等于
-1
cycleNum String 连续多少个统计周期:
1:连续1个周期
3:连续3个周期
5:连续5个周期
-1
threshold String 定义的多个统计周期内告警阈值:(任务失败不支持配置) -1
suppressThreshold String 单个统计周期内告警阈值:(仅支持用户自定义延迟)
statisticsType String 聚合类型:
max:最大值
avg:平均值
sum:总和
-1
operator String 定义的多个统计周期内运算符:(任务失败不支持配置)
>=:大于等于
<:小于
-1

1.4.10 JobStatusEnum 任务运行状态

名称 枚举值 任务状态
JobStatus
STARTING 启动中
START_FAILED 启动失败
RUNNING 运行中
FINISHED 任务执行结束,并正确的退出
FAILED 任务因为异常运行失败
STOPPING 停止中
STOPPED 已停止
UNKNOWN 未知(无法访问集群)

1.5 响应实体列表

1.5.1 JobBasicInfo

名称 类型 描述
jobId int 任务id
jobName String 任务名称
jarInfo JarInfo 对Jar任务的描述
jobType String boolean 任务类型
rawSql String SQL文本信息
udfList ResourceFileSimpleInfo 依赖的udf信息
fileList ResourceFileSimpleInfo 依赖的文件信息
creator String 任务的创建者
createTime String 任务的创建时间
publisher String 任务的发布人
env int 环境id
category int 任务所属目录id
tags Array 任务的标签信息
description String 任务的描述信息
lastStartJobExecHistoryId int 最后一次启动任务的执行id
startExecId int 任务的启动id
lastClusterId int 任务所属的集群id
jobStatus String 任务的运行状态
applicationId String 对应flink任务id
flinkUrl String Flink web URL
monitorUrl String 任务的Grafana监控地址

1.5.2 ResourceFileSimpleInfo

名称 类型 描述
id int 依赖资源id
name String 依赖资源名称
path String 依赖资源的绝对路径
udfName String udf函数名称(udf独有)

二、OpenAPI列表

2.1 OpenAPI总览

模块 名称 支持版本 状态 请求方法 请求路径
任务管理 新建SQL任务 v3.9.1 已上线 POST /job/v1/sql/create
修改SQL任务 v3.9.1 已上线 POST /job/v1/sql/modify
删除任务 v3.9.1 已上线 POST /job/v1/delete
获取任务信息 v3.9.1 已上线 POST /job/v1/detail
启动任务 v3.9.1 已上线 POST /job/v1/start
停止任务 v3.9.1 已上线 POST /job/v1/stop
重启任务 v3.9.1 已上线 POST /job/v1/restart
获取任务当前状态 v3.9.1 已上线 POST /job/v1/status

2.1 新建SQL 任务

POST /job/v1/sql/create

版本:3.9.1

描述:创建SQL类型的任务

URL参数/请求体

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobName String 任务名称
engineType EngineTypeEnum 引擎类型,枚举值
folderPath
String
保存的文件夹路径,为空:位于根目录,格式:aa/bb,最多支持两级目录
rawSql String sql语句,sql最好传入\n用于分隔
description String 任务描述
dependencyIds Array 依赖文件id列表
udfIds Array 依赖的Udf id列表
jobConfig JobConfig 任务配置参数
alarmInfo AlarmInfo 告警配置 默认配置

请求示例: | Prolog
{
“jobName”: “jarName”,
“engineType”: “FLINK1_12_0”,
“folderPath”: “/aaa/bbb”,
“rawSql”: “this is sql”,
“description”: “jar job description”,
“dependencyIds”: [
1,
2,
4
],
“udfIds”: [
1,
3,
4
],
“jobConfig”: {},
“alarmInfo”: {},
“user”: “yangxiaohang@corp.netease.com”,
“product”: “sloth”
} | | —- |

返回示例:

名称 类型 描述
jobId Integer 新创建任务的id

2.2 修改SQL任务

POST /job/v1/sql/modify

版本:3.9.1

描述:修改SQL类型的任务

URL参数/请求体

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobId String 任务id
engineType EngineTypeEnum 引擎类型,枚举值
folderPath
String
保存的文件夹路径,为空:位于根目录,格式:aa/bb,最多支持两级目录
rawSql String sql语句,需要传入分割符
description String 任务描述
dependencyIds Array 依赖文件id列表
udfIds Array 依赖的Udf id列表
jobConfig JobConfig 任务配置参数
alarmInfo AlarmInfo 告警配置 不传不会修改

请求示例:

JSON
{
“jobId”: “104722”,
“jobName”: “sqlOpenApiTest3”,
“engineType”: “FLINK1_10”,
“folderPath”: “”,
“rawSql”: “— nabab\n set ‘user_action.connections.group.id’ = ‘test_group_id’;\nset ‘user_action.connector.startup-mode’ = ‘latest-offset’;\nset ‘tempTable.connector.topic’=’kafka_sink’;\nset ‘tempTable.format.type’ = ‘json’;\ninsert into sloth_autotest_kafka_eleven.mem.tempTable\nselect * from sloth_default_db.user_action;”,
“description”: “Test2”,
“dependencyIds”: [10],
“udfIds”: [26],
“jobConfig”: {
“clusterName”: “sloth-internal-test”,
“queueName”: “root.sloth”
},
“user”: “yangxiaohang@corp.netease.com”,
“product”: “sloth”,
“authType”: “TEST”
}

返回示例:

JSON
{
“code”: 0,
“msg”: “success”,
“result”:null,
“requestId”:null,
“reqId”:null,
“cost”: 2072,
“success”: true
}

2.3 删除任务

POST /job/v1/delete

版本:3.9.1

描述:删除调用方传入的任务列表

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobIds String 待删除的任务id集合,支持传入多个任务,任务id间用逗号分割

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobIds”: “1000074”
}

响应体:

JSON
{
“code”: 0,
“msg”: “success”,
“result”: 1354,
“requestId”: dcaf583d,
“success”:true
}

2.4 获取任务信息

POST /job/v1/detail

版本:3.9.1

描述:获取最新版本任务信息

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobId int 任务id

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobId”: 1000056
}

响应体:

JSON
{
“code”: 0,
“msg”: “success”,
“result”: {
“jobBasicInfo”: {
“jobId”: 1000056,
“jobName”: “测试任务2-rename”,
“jarInfo”: {
“jarName”: “”,
“jarPath”: “”,
“mainClass”: “”,
“mainArgs”: “”,
“jarVersion”: “”
},
“jobType”: “SQL”,
“rawSql”: “[{\“cellType\“:\“flink.sql\“,\“enable\“:true,\“key\“:\“743a695f-9099-11ec-ad7c-fa163ea49987\“,\“source\“:[\“—SQL\“,\“—********************************************************************—\“,\“—Author: 杨晓航\“,\“—CreateTime: 2021-12-09 15:57:41\“,\“—Comment: 请输入业务注释信息\“,\“—********************************************************************—\“,\“CREATE TABLE user_log (\“,\“user_id VARCHAR,\“,\“item_id VARCHAR,\“,\“category_id VARCHAR,\“,\“behavior VARCHAR,\“,\“ts VARCHAR\“,\“) WITH (\“,\“‘connector’ = ‘datagen’\“,\“— , ‘number-of-rows’ = ‘10’\“,\“);\“,\“\“,\“create table print_table WITH (‘connector’ = ‘print’)\“,\“LIKE user_log (EXCLUDING ALL);\“,\“\“,\“insert into print_table\“,\“select * from user_log;\“],\“title\“:\“SQL\“}]“,
“udfList”: [
{
“id”: 11378,
“name”: “changeId”,
“path”: “/sloth/sloth/RESOURCE/dirid_200458/original-sloth-udf-1.0.1_1596533529506_1609405318875_1623849633571/1623849633599/original-sloth-udf-1.0.1_1596533529506_1609405318875_1623849633571.jar”,
“udfName”: “changeId”
}
],
“fileList”: [],
“creator”: “yangxiaohang@corp.netease.com”,
“createTime”: “2021-12-09T07:57:41.000+0000”,
“publisher”: “yangxiaohang@corp.netease.com”,
“env”: 381,
“category”: 351,
“tags”: [],
“description”: “hffg”,
“lastStartJobExecHistoryId”: 2745,
“startExecId”: 2036,
“lastClusterId”: 1,
“jobStatus”: “FAILED”,
“applicationId”: “”,
“flinkUrl”: “http://sloth-test2.dg.163.org:8088/proxy/application\_1645065290019\_0491/#/overview“,
“monitorUrl”: “http://sloth.bdms.netease.com/grafana/d/rZuGINgWk/sloth?orgId=1&var-jobAppKey=1000056&var-jobSource=sloth&from=1648028948000&to=1648696837704
}
},
“requestId”: dcaf583d,
“success”:true
}

响应体描述:

名称 类型 描述
jobBasicInfo JobBasicInfo 任务的基本信息

2.5 启动任务

POST /job/v1/start

版本:3.9.1

描述:启动调用方传入的任务

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobInfos Array[StartJobInfo](https://as9e2kjei0.feishu.cn/docs/doccnmlWpU6CwjNUSVmMs5IN4uh#RdHi0o)\ 任务的启动信息

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobInfos”: [
{
“jobId”: 1000074,
“startMode”: “NONE”,
“statePath”: “”
}
]
}

响应体:

JSON
{
“code”: 0,
“msg”: “success”,
“result”: {
“results”: [
{
“jobId”: 1000074,
“jobName”: “testOpenApi”,
“isSuccess”: true,
“errorMessage”: “”
}
]
},
“requestId”: “914abf5efc64444d92f92418e3c9840c”,
“success”:true
}

2.6 停止任务

POST /job/v1/stop

版本:3.9.1

描述:停止调用方传入的任务

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobInfos Array[StopJobInfo](https://as9e2kjei0.feishu.cn/docs/doccnmlWpU6CwjNUSVmMs5IN4uh#fVRUrb)\ 任务的停止信息

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobInfos”: [
{
“jobId”: 1000096,
“triggerSavepoint”:false
}
]
}

响应体:

JSON
{
“code”: 0,
“msg”: “success”,
“result”: {
“results”: [
{
“jobId”: 1000096,
“jobName”: “sqlOpenApiTest3”,
“isSuccess”:true,
“errorMessage”: “”
}
]
},
“requestId”: “9483bdc3f6d84be7a74f26cb78bd4b6a”,
“success”:true
}

2.7 重启任务

POST /job/v1/restart

版本:3.9.1

描述:重新启动调用方传入的任务

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobInfos Array[RestartJobInfo](https://as9e2kjei0.feishu.cn/docs/doccnmlWpU6CwjNUSVmMs5IN4uh#c5HM5Q)\ 任务的重启信息

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobInfos”: [
{
“jobId”: 1000096,
“startMode”: “NONE”,
“statePath”: “”,
“triggerSavepoint”:false
}
]
}

响应体:

JSON
{
“code”: 0,
“msg”: “success”,
“result”: {
“results”: [
{
“jobId”: 1000096,
“isSuccess”:true,
“errorMessage”: “”
}
]
},
“requestId”: “19cd594800684549aa420e100f241fc5”,
“success”:true
}

2.7 获取任务当前状态

POST /job/v1/status

版本:3.9.1

描述:批量获取调用方传入任务的当前状态信息

URL参数/请求体:

名称 类型 描述 是否必传 默认值
user String 用户邮箱
product String 产品名称
jobIds String 待查询的任务id集合,支持传入多个任务,任务id间用逗号分割

请求示例:

JSON
{
“user”: “wanglei20@corp.netease.com”,
“product”: “sloth”,
“jobIds”: “100056”
}

响应体:

| JSON
{
“code”: 0,
“msg”: “success”,
“result”: {
“jobStatusList”: [
{
“jobId”: “1000056”,
“jobStatus”: “FAILED”
}
]
},
“requestId”: dcaf583d,
“success”:true
} | | —- |