INFO-数据传输Local模式

适用模块

数据传输

具体说明

通过spark local模式支持用户紧急传输需求

使用示例

1.安装配置

1. 通过技术支持获取支持特定数据源easy_transfer_client压缩包以及任务配置模板文件(json格式);
2. azkaban exec节点(可通过ambarieasyops查看所在节点)进行解压;
3. 配置调整:从当前easy_transfer_clientndi_client中拷贝conf/ndi.properties至解压文件夹的conf目录中,并修改一下参数:
    ndi.worker-home=压缩包解压后绝对路径
    # 例如:/home/bdms/spark2/default_spark2/client/20210301180406515fd159aac/current
    ndi.spark.spark-home=当前节点spark客户端绝对路径
    # 例如:/home/bdms/spark2/default_spark2/client/20210301180406515fd159aac/config
    ndi.spark.spark-conf-dir=当前节点spark客户段对应配置文件目录绝对路径
    # 例如:/home/bdms/hdfs/default_hdfs/client/202103022009056373bbb2922/config
    ndi.hadoop-conf-dir=当前节点hdfs客户端对应配置文件目录绝对路径

2.调度脚本

# taskExec.sh 任务执行脚本名称,可自定义
prefixPath=`pwd`
cd  解压包路径
./bin/ndi-submit
--file $prefixPath'/ck2hive.json'  #ck2hive.json 为任务配置文件名,需与任务执行脚本位于同级目录,否则需自行调整任务配置文件绝对路径拼接
--spark-conf spark.transmit.metrics.enable=false
[--keytab xxx]
[--principal xxx] 
[--spark-argument queue=${mapreduce.job.queuename}]
-- master local[*]

3.Azkaban参数支持

通过离线传输-script节点进行任务调度时,可支持azkaban内置时间参数,${azkaban.flow.1.days.ago}等,使用方式为在节点参数中传入:
env.ndi.summary_date ${azkaban.flow.1.days.ago}
此时可在任务配置文件中通过${ndi.summary_date}进行引用

HZ94r8

4.指定任务执行节点

作为临时方案,可仅将客户端部署至某一个az执行节点,在“开发模式--设置并运行”或“线上模式--设置并提交调度”弹出窗口中,添加自定义参数:useExecutor 12指定任务指定节点;
主机与编号对应关系可通过以下sql查询:
select * from azkaban.executors;
也可通过开发模式从编号1开始测试执行支持成功;

SKypou

5.任务配置文件模板

执行脚本之前需要进行

export HADOOP_CONF_DIR=/usr/ndp/current/hdfs_client/conf

{
    "readerType": "clickhouse", //来源类型
    "reader": {
        "dataSources": [
            {
                "connectionInformation": {
                    "password": "youdata", // 密码
                    "user": "youdata",  // 用户名
                    "url": "jdbc:clickhouse://10.200.129.52:8123/default" //连接信息
                },
                "database": "data_transfrom", // 库名
                "table": [
                    "ndi_test1_copy" // 表名
                ]
            }
        ],

        "setting": {
         "conditions": "create_time<=unix_timestamp(DATE_ADD(str_to_date(${ndi.summary_date}, '%Y%m%d'), interval 1 day))" // 过滤条件
        },
        "conf": [
            {
                "value": "",
                "key": ""
            }
        ]
    },
    "writerType": "hive",
    "writer": {
        "dataSource": {
            "database": "data_transform",  // 库名
            "table": "ndi_test1_partition" // 表名
        },
        "insertType": "overwrite",
        "conf": [],
        "partitionList": [
            {
                "valueType": " ",
                "value": "${ndi.summary_date}", // 分区值
                "partitionType": "static", // 分区类型:静态分区
                "key": "type" //分区列名
            }
        ],
        "flowControl": ""
    },
    "handlers": [
        {
            "add": [],
            "type": "columnHandler",
            "map": [
                {
                    "newName": "id", // 去向列名
                    "oldName": "id" // 来源列名
                },
                {
                    "newName": "name",
                    "oldName": "name"
                },
                {
                    "newName": "age",
                    "oldName": "age"
                }
            ]
        }
    ],
    "paramSetKeyAndValues": []
}

作者:wangsong