INFO-数据传输Local模式
更新时间: 2024-03-11 02:52:35
阅读 2898
INFO-数据传输Local模式
适用模块
数据传输
具体说明
通过spark local模式支持用户紧急传输需求
使用示例
1.安装配置
1. 通过技术支持获取支持特定数据源easy_transfer_client压缩包以及任务配置模板文件(json格式);
2. 在azkaban exec节点(可通过ambari或easyops查看所在节点)进行解压;
3. 配置调整:从当前easy_transfer_client或ndi_client中拷贝conf/ndi.properties至解压文件夹的conf目录中,并修改一下参数:
ndi.worker-home=压缩包解压后绝对路径
# 例如:/home/bdms/spark2/default_spark2/client/20210301180406515fd159aac/current
ndi.spark.spark-home=当前节点spark客户端绝对路径
# 例如:/home/bdms/spark2/default_spark2/client/20210301180406515fd159aac/config
ndi.spark.spark-conf-dir=当前节点spark客户段对应配置文件目录绝对路径
# 例如:/home/bdms/hdfs/default_hdfs/client/202103022009056373bbb2922/config
ndi.hadoop-conf-dir=当前节点hdfs客户端对应配置文件目录绝对路径
2.调度脚本
# taskExec.sh 任务执行脚本名称,可自定义
prefixPath=`pwd`
cd 解压包路径
./bin/ndi-submit
--file $prefixPath'/ck2hive.json' #ck2hive.json 为任务配置文件名,需与任务执行脚本位于同级目录,否则需自行调整任务配置文件绝对路径拼接
--spark-conf spark.transmit.metrics.enable=false
[--keytab xxx]
[--principal xxx]
[--spark-argument queue=${mapreduce.job.queuename}]
-- master local[*]
3.Azkaban参数支持
通过离线传输-script节点进行任务调度时,可支持azkaban内置时间参数,${azkaban.flow.1.days.ago}等,使用方式为在节点参数中传入:
env.ndi.summary_date ${azkaban.flow.1.days.ago}
此时可在任务配置文件中通过${ndi.summary_date}进行引用
4.指定任务执行节点
作为临时方案,可仅将客户端部署至某一个az执行节点,在“开发模式--设置并运行”或“线上模式--设置并提交调度”弹出窗口中,添加自定义参数:useExecutor 1或2指定任务指定节点;
主机与编号对应关系可通过以下sql查询:
select * from azkaban.executors;
也可通过开发模式从编号1开始测试执行支持成功;
5.任务配置文件模板
执行脚本之前需要进行
export HADOOP_CONF_DIR=/usr/ndp/current/hdfs_client/conf
{
"readerType": "clickhouse", //来源类型
"reader": {
"dataSources": [
{
"connectionInformation": {
"password": "youdata", // 密码
"user": "youdata", // 用户名
"url": "jdbc:clickhouse://10.200.129.52:8123/default" //连接信息
},
"database": "data_transfrom", // 库名
"table": [
"ndi_test1_copy" // 表名
]
}
],
"setting": {
"conditions": "create_time<=unix_timestamp(DATE_ADD(str_to_date(${ndi.summary_date}, '%Y%m%d'), interval 1 day))" // 过滤条件
},
"conf": [
{
"value": "",
"key": ""
}
]
},
"writerType": "hive",
"writer": {
"dataSource": {
"database": "data_transform", // 库名
"table": "ndi_test1_partition" // 表名
},
"insertType": "overwrite",
"conf": [],
"partitionList": [
{
"valueType": " ",
"value": "${ndi.summary_date}", // 分区值
"partitionType": "static", // 分区类型:静态分区
"key": "type" //分区列名
}
],
"flowControl": ""
},
"handlers": [
{
"add": [],
"type": "columnHandler",
"map": [
{
"newName": "id", // 去向列名
"oldName": "id" // 来源列名
},
{
"newName": "name",
"oldName": "name"
},
{
"newName": "age",
"oldName": "age"
}
]
}
],
"paramSetKeyAndValues": []
}
作者:wangsong
文档反馈
以上内容对您是否有帮助?