INFO-小文件合并

适用模块

离线开发-sql节点-spark引擎

具体说明

通过控制
conf.spark.sql.shuffle.partitions=xxx
sql添加distribute by rand()进行小文件合并
hive jdbc连接串取自 select hiveserverurl from mammut.pf_hive_cluster \G;

使用示例

1、使用shell脚本自动合并小文件
#!/bin/bash
databasename=$1
tablename=$2
partitionname=$3
str1=`$HIVE_HOME/bin/beeline -u "jdbc:hive2://demo4.jdlt.163.org:2182,demo5.jdlt.163.org:2182,demo6.jdlt.163.org:2182/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@BDMS.163.COM" -e "show create table $databasename.$tablename"`
 echo "$str1"
str2="PARTITIONED BY"
if [[ $str1 =~ $str2 ]]
then
    $HIVE_HOME/bin/beeline -u "jdbc:hive2://demo4.jdlt.163.org:2182,demo5.jdlt.163.org:2182,demo6.jdlt.163.org:2182/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@BDMS.163.COM" -e "set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;create table $databasename.temp$(date "+%Y%m%d") as select * from $databasename.$tablename;
insert
  OVERWRITE table $databasename.$tablename partition ($partitionname)
select
*
from
$databasename.temp$(date "+%Y%m%d")
distribute by rand();
drop table $databasename.temp$(date "+%Y%m%d");"
else
    $HIVE_HOME/bin/beeline -u "jdbc:hive2://demo4.jdlt.163.org:2182,demo5.jdlt.163.org:2182,demo6.jdlt.163.org:2182/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@BDMS.163.COM" -e "set hive.exec.dynamic.partition=true;set hive.exec.dynamic.partition.mode=nonstrict;create table $databasename.temp$(date "+%Y%m%d") as select * from $databasename.$tablename;
insert
  OVERWRITE table $databasename.$tablename
select
*
from
$databasename.temp$(date "+%Y%m%d")
distribute by rand();
drop table $databasename.temp$(date "+%Y%m%d");"
fi

INFO-小文件合并 - 图1

2、可通过临时表对原表合并小文件
create table ${databasename}.temp${azkaban.flow.current.date.simple} as (
    select * from ${databasename}.${tablename}
);
insert
  OVERWRITE table ${databasename}.${tablename} partition (${partitionname})
select
*
from
${databasename}.temp${azkaban.flow.current.date.simple}
distribute by rand();
drop table ${databasename}.temp${azkaban.flow.current.date.simple};

INFO-小文件合并 - 图2 INFO-小文件合并 - 图3


作者:林帅