工作流基本规则


  1. 开始节点可有多条输出;
  2. API上游不可再为API节点;
  3. API上游若为条件节点,则条件节点上游不可再为API节点;
  4. 条件节点上游不可再为条件节点,且上游仅能有一个节点;
  5. 结束节点上游不可为开始和条件节点;
  6. 不允许发生自环;
  7. 画布中不允许存在游离节点。

节点说明


开始节点

开始节点为编排工作流的唯一输入节点,在该节点可配置整个工作流的入参定义。

结束节点

结束节点为编排工作流的唯一输出节点,在该节点可配置整个工作流的返回参数定义。

需要注意的是,服务编排的输出结构体中,必须有查询结果集列表,统一放在名为data的字段下。而结束节点配置的返回参数定义,实际上是定义了返回结果集列表中每个元素的字段,如下:

服务编排功能 - 图1

结束节点定义了day和userid两个返回字段,结果集列表输出为:

"data":[{"day":"2022-01-14 00:00:00","userid":"25"},{"day":"2022-01-14 00:00:00","userid":"26"}]

那么在服务编排返回时,会输出如下返回结构:

{
   "reqId":"09d81294cdfb44f6996e4583916d162b"      //请求id,返回结构默认字段
    "code":0     //返回状态码,正常为0,返回结构默认字段
    "cost":57    //接口响应时间,返回结构默认字段
    "message":"success"  //接口返回描述信息,返回结构默认字段
    //外层这个data是接口返回结构默认字段,包含了查询结果的所有信息
    "data":{
       "result_num":2,    //查询结果数量,非必须返回字段
       //内层这个data代表结果集列表字段,该字段对应是查询结果集返回列表,为必须返回的字段
       "data":[
          {"day":"2022-01-14 00:00:00","userid":"25"},
          {"day":"2022-01-14 00:00:00","userid":"26"}
      ]
     },
}

API节点

api节点提供对数据服务原子api的调用,每个api节点关联一个已发布的数据服务原子api。

脚本节点

脚本节点提供python脚本的执行,提供用户完成一些定制化的取数逻辑,需在配置时自行编写python脚本。

脚本节点提供python编辑区域以及初始化的代码块,用户需在自定义代码区实现自定义的逻辑。

服务编排功能 - 图2

注意事项:脚本节点的脚本输入为json,若脚本节点上游只有一个输入节点,则该json即为上游节点的输出;若脚本节点上游有多个输入节点,则json会按照上游节点的id、对应输出结果按key-value形式组成,类似于:{“node1”:{…}, “node2”:{…},”node3”:{…}}。

条件节点

条件节点的功能是对上游的输出结果进行条件表达式匹配,根据匹配结果决定走下游具体某些分支。

使用方式:可编写python脚本对上游输入进行解析;对于下游每一个相邻节点,都需要配置条件表达式;python脚本的输出与每个相邻节点的条件表达式进行匹配,根据匹配结果决定是否走下游某些分支。

UDF节点

对于不熟悉python的用户,udf节点提供了java语法的支持,用户需按照udf模板规范,继承相关interface,自行编写取数逻辑并打包上传至udf节点即可执行。

demo样例


如下是获取用户id信息场景(非活跃用户存储在HBase数据源,活跃用户存储在Redis数据源),根据传入用户的类型,去不同原子api获取用户信息,并得到用户id列表,从上到下的节点类型为:开始节点 -> 用户是否活跃(条件节点)-> 非活跃用户信息api、活跃用户信息api (api节点)-> 获得用户id列表(python脚本节点)。

服务编排功能 - 图3

开始节点配置

服务编排功能 - 图4

条件节点配置

服务编排功能 - 图5

脚本节点python代码

import json,time,random,pickle,re,math

def handle(inputJson):
    # 入参字典
    input = json.loads(inputJson)
    # 统一用output字典封装返回结果
    output = {}
    #自定义代码区
    #获取上游节点1(id为'50840054640528')的返回参数dict
    dict1 = input['50840054640528']
    #获取上游节点2(id为'8895801755048528')的返回参数dict
    dict2 = input['8895801755048528']
    dataList = []
    dataListOfDict1 = []
    dataListOfDict2 = []
    #获得每个上游节点的结果集列表,key为'data'
    if 'data' in dict1:
      dataListOfDict1 = dict1['data']
    if 'data' in dict2:
      dataListOfDict2 = dict2['data']
    #获得每个用户的id
    for d in dataListOfDict1:
      data = {}
      data['userid'] = d['userid']
      dataList.append(data)
    for d in dataListOfDict2:
      data = {}
      data['userid'] = d['userid']
      dataList.append(data)
    #返回结构必须要有'data'字段,作为结果集列表放入输出的dict
    output['data'] = dataList
    return json.dumps(output)

工作流测试结果

{
"code":0,
"cost":149,
"data":{
    "data":[
         {"userid":"25"},
         {"userid":"26"}
        ]
    },
"message":"success",
"reqId":"f5339134df104afaac665ba8564d498a"
}