DEMO-HiveOverHbase

适用模块

hive

具体说明

方案一:通过Hive外表+Hbase API写入
方案二:Hive外表+bulkload写入数据
方案三:通过自定义Jar包写入数据

使用示例

## 方案一:通过Hive外表+Hbase API写入

通过Hive外表+ Hbase API方式直接向Hbase集群写入数据量过大将会导致Hbase集群瞬时QPS过高告警,影响Hbase集群性能;
# Hive外表映射Hbase


-- zk_hostname需替换成用户hbase集群对应zk主机名
set hbase.zookeeper.quorum=zk_hostname1:port,zk_hostname2:port,zk_hostname3:port;
-- /hbase_namespace需替换为hbase集群注册至zk中对应namespace
set zookeeper.znode.parent=/hbase_namespace;

--创建Hive外表映射Hbase
use bdservice;
create external table hive_hbase_api_simple (
  key int,
  issue string,
  summary string,
  created string,
  creator string
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES (
  "hbase.columns.mapping" = ":key,cf1:issue,cf1:summary,cf1:created,cf1:creator"
)
TBLPROPERTIES
  ("hbase.table.name" = "hbase_api_simple");

--通过Hive外表写入数据到Hbase
insert into table hive_bulkload_hbase_simple
select  * from hive_bulkload_hbase_simple;

1. 创建hive外表要求Hbase中数据表已存在;
2. Hbase相关Jar包支持;(将Hbase$HBASE_HOME/libhbase*相关Jar包拷贝至$HIVE_HOME/lib下) 
3. Hivehbase-site.xml支持;(也可通过HiveCli进行设置hbase.zookeeper.quorumzookeeper.znode.parent参数,如果zk端口非默认,需添加set clientPort = xxx参数)
4. Hbase集群未开启Kerberos认证,则需使用Mammut提供hbase-client-1.2.6.jar替换原生Jar包,否则无法支持`zookeeper.sasl.client`参数( hbase-client-1.2.6.jar),并配置以下参数:
        set zookeeper.sasl.client=false; 
        set ipc.client.fallback-to-simple-auth-allowed=true;
## 方案二:Hive外表+bulkload写入数据
通过Hive外表+bulkload方式可实现支持大批量数据加载至Hbase,并且不影响Hbase集群性能,但此方式不支持Hbase表多region
# Hive外表映射Hbase
```SQL
-- zk_hostname需替换成用户hbase集群对应zk主机名
set hbase.zookeeper.quorum=zk_hostname1:port,zk_hostname2:port,zk_hostname3:port;

-- /hbase_namespace需替换为hbase集群注册至zk中对应namespace
set zookeeper.znode.parent=/hbase_namespace;

--创建Hive外表映射Hbase表
use bdservice;
create external table hive_hbase_api_simple (key int ,issue string ,summary string,created string,creator string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:issue,cf1:summary,cf1:created,cf1:creator")
TBLPROPERTIES ("hbase.table.name" = "hbase_api_simple");

###### 通过Hive外表生成Hfile

set hive.hbase.generatehfiles=true;
set hfile.family.path=/hbase/hfiles/cf1;
set mapreduce.job.reduces=100;
dfs -rm -r -f /hbase/hfiles/cf1;

use bdservice;
insert
  overwrite table hive_bulkload_hbase_simple
select
  *
from
  hive_bulkload_hbase_source sort by rowkey;

>指定的hdfs路径的末级目录名称必须与Hbase表列族名一致;    
>指定的hdfs目录要求不存在,如目录已存在,需删除对应目录;    
>插入数据时必须对rowkey字段进行升序排序,如rowkey为非字符串类型,需显示转换为字符串后进行排序;    
>当数据量较大时,建议手动配置reduce数量,每个reduce对应生成一个hfile文件;

# 跨集群拷贝

# ipc.client.fallback-to-simple-auth-allowed指定鉴权回落为简单认证
# --config 指定当前集群配置文件目录
hadoop --config ${2}/conf distcp -D ipc.client.fallback-to-simple-auth-allowed=true hdfs://bdms/user/poc/ws/hfiles/ hdfs://hdfscluster-sc/tmp/poc/hfiles

# 加载Hfile到Hbase中

# --config 指定hbase所在集群hdfs配置文件   
hadoop --config ./conf jar ./lib/hbase-server-1.2.6.jar completebulkload -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=120 hdfs://hdfscluster-sc/tmp/poc/hfiles/ hive_bulkload_hbase_simple

>指定待加载Hfile文件路径时,需指定至列族同名文件夹上一级;    
单次加载Hfile文件过多时,需指定参数 -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=120;    
加载过程中出现java.lang.ClassNotFoundException,则需显示export HBASE_HOME及HADOOP_CLASSPATH变量;    
加载Hfile时,如Hbase集群未启用安全认证,需在hdfs客户端配置文件hdfs-site.xml中添加以下参数回退集群安全配置;


<property>
     <name>ipc.client.fallback-to-simple-auth-allowed</name>
      <value>true</value>
    </property>
    <property>
      <name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
      <value>true</value>
</property>
## 方案三:通过自定义Jar包写入数据
通过自定义Jar包可实现根据hdfs上文件生成Hfile,支持Hbaseregion表;
# 配置文件准备
需提前准备配置文件包括`hdfs-site.xml``hbase-site.xml``table.schema`
+ hdfs-site.xml:需包括hdfs文件所在集群及hbase所在集群配置信息;
+ hbase-site.xml:主要包含hbase依赖zk服务相关配置信息;
+ job.conf:主要mr任务相关信息;
    ```bash
    # 数据路径
    inPath=/user/poc/hive_db/poc.db/hbase_userinfo
    # 数据文件分隔符
    delimiter=,
    # 数据文件包含字段(有序)
    colArr=id,name,sex
    # hfile存储路径
    outPath=/user/poc/semon/hfile
    # 指定任务运行名称
    jobName=idea-debug
    # hadoop相关配置文件所在相对路径(相对Jar包)
    resPath=/00-conf/hadoop
    # hbase表名
    tableName=demo:userinfo
    # hbase表rowkey对应列
    rowKey=id
    # Hbase表列族及字段定义
    tableMeta=[{"cfName":"info","colName":["name","sex"]}]

    # 是否启用kerberos
    auth=kerberos
    # kerberos配置文件相对路径
    krbFile=/00-conf/krb5.conf
    # keytab文件相对路径
    keyFile=/00-conf/demo-wangsong03.keytab


# Jar包依赖信息

```groovy
provided group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.9.2'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-client
provided group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.6'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-server
provided group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.6'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-common
provided group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.6'

# Jar包代码

ResourceLoader类:读取本地配置文件并解析

package pers.semon.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.security.UserGroupInformation;
import sun.security.krb5.internal.ktab.KeyTab;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Properties;

/**
 * @Description:加载配置文件信息
 * @Author: Semon
 * @Version: v1.0
 * @Date: 2021/9/28 22:05
 */
public  class ResourceLoader {

    public static String confFile;
    public static Properties prop;
    public static String resPath;
    public static String auth;
    public static String krbFile;
    public static String keyFile;
    public static String principal;
    public static UserGroupInformation ugi;
    public static Path inPath;
    public static Path outPath;
    public static TableName tableName;
    public static int taskNum;
    public static String jobName;
    public static Configuration  conf;


    static {
        prop = new Properties();
        String basePath = System.getProperty("user.dir");
        confFile= basePath+ "/00-conf/job.conf";
        try {
            prop.load(new FileInputStream(confFile));
        } catch (IOException e) {
            e.printStackTrace();
        }


        inPath = new Path(prop.getProperty("inPath"));
        outPath = new Path(prop.getProperty("outPath"));

        tableName = TableName.valueOf(prop.getProperty("tableName"));
        if (prop.containsKey("taskNum")) {
            taskNum = Integer.valueOf(prop.getProperty("taskNum"));
        }
        else {
            taskNum=0;
        }

        jobName = prop.getProperty("jobName");


        resPath = basePath + prop.getProperty("resPath");
        conf = HBaseConfiguration.create();
        for (File f : new File(resPath).listFiles()) {
            if (f.getAbsolutePath().endsWith("xml")) {
                try {
                    conf.addResource(f.toURI().toURL());
                    //本地调试添加,将任务提交至集群执行
                    conf.set("mapreduce.framework.name","yarn");
                    //本地运行需指定对应jar包,否则或出现mapper class not found
                    if (prop.containsKey("mapreduce.job.jar")) {
                        conf.set("mapreduce.job.jar","/Users/semon/wks/CodeSediment/01-utils/build/libs/RunBulkload-v1.jar");
                    }

                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
            }
        }
        auth = prop.getProperty("auth");
        if (auth.equals("kerberos")) {
            krbFile = basePath + prop.getProperty("krbFile");
            keyFile = basePath+ prop.getProperty("keyFile");
            principal = KeyTab.getInstance(new File(keyFile)).getOneName().getName();
            System.setProperty("java.security.krb.conf" ,krbFile);
            conf.set("hadoop.security.authentication","kerberos");
            UserGroupInformation.setConfiguration(conf);
            try {
                ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,keyFile);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }


}
HfileGenerator类:生成Hfile文件到指定hdfs目录

package pers.semon.bulkload;

import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * @Description:通过mr引擎生成Hfile
 * @Author: Semon
 * @Version: v1.0
 * @Date: 2021/9/28 22:43
 */
public class HfileGenerator {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {


        //启用kerberos认证
        ResourceLoader.ugi.doAs(new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
                generateJob(args);
                return null;
            }
        });

    }

    public static void generateJob(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
        //解析参数,通过GenericOptionsParser类,将MR相关参数添加至configuration中,如设置队列、容器大小等;
        Configuration conf = ResourceLoader.conf;
        String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();

        //创建Job实例,并指定任务配置信息及任务名
        Job job = Job.getInstance(conf, ResourceLoader.jobName);

        //设置Job任务对应的类文件
        job.setJarByClass(HfileGenerator.class);

        //设置job任务对应mapper类
        job.setMapperClass(TextMapper.class);

        //设置mapper输出kv数据类型
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //设置输入输出hdfs路径,如果输出路径已存在则需删除
        FileSystem fs = FileSystem.get(conf);
        if ( fs.exists(ResourceLoader.outPath) || fs.isDirectory(ResourceLoader.outPath)) {
            fs.delete(ResourceLoader.outPath,true);
            System.out.println(ResourceLoader.outPath.toString() + " already exists, Job delete it success!");
        }

        FileInputFormat.addInputPath(job,ResourceLoader.inPath);
        FileOutputFormat.setOutputPath(job,ResourceLoader.outPath);

        //设置配置文件上传至hdfs路径,如不存在则创建
        Path tmpPath = new Path("/tmp/" + ResourceLoader.ugi.getShortUserName() + "/" + ResourceLoader.jobName);
        if (!fs.exists(tmpPath) && !fs.isDirectory(tmpPath)) {
            fs.mkdirs(tmpPath);
        }

        //上传配置文件至hdfs
        fs.copyFromLocalFile(new Path(ResourceLoader.confFile),tmpPath);
        job.addCacheFile(new URI(tmpPath+ "/" + new Path(ResourceLoader.confFile).getName()));

        //设置mapreduce task数量,如未设置,则与Hbase Region数量保持一致
        if (ResourceLoader.taskNum != 0) {
            job.setNumReduceTasks(ResourceLoader.taskNum);
        }

        //获取Hbase表元数据信息,包括表名、列族等信息
        HTableDescriptor hTableDescriptor = new HTableDescriptor(ResourceLoader.tableName);
        Connection conn = ConnectionFactory.createConnection(conf);
        HFileOutputFormat2.configureIncrementalLoad(job,hTableDescriptor,conn.getRegionLocator(ResourceLoader.tableName));

        //启动任务
        System.exit(job.waitForCompletion(true)? 0:1);
    }

    public static void cancel() {}

    public static class TextMapper extends Mapper<LongWritable, Text , ImmutableBytesWritable, Put> {

        public static Properties prop;
        public static byte[] rowKey;
        public static String delimiter;
        public static String tableMeta;
        public static String[] colArr;
        public static Map<String,Integer> srcMap = new HashMap<>();
        public static Map<String,String> metaMap = new HashMap<>();


        @Override
        protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
            URI[] cacheFiles = context.getCacheFiles();
            FileSystem fs = FileSystem.get(context.getConfiguration());
            prop = new Properties();
            for (URI uri : cacheFiles) {
                if (uri.toString().endsWith("conf")) {
                    prop.load(new BufferedInputStream(fs.open(new Path(uri.getPath()))));
                }
            }

            rowKey = Bytes.toBytes(prop.getProperty("rowKey"));
            delimiter = prop.getProperty("delimiter");
            colArr = prop.getProperty("colArr").split(",");
            tableMeta = prop.getProperty("tableMeta");


            //colArr为文件字段顺序,将字段名称与循序存入map中
            for (int i=0;i<colArr.length;i++) {
                srcMap.put(colArr[i],i );
            }

            //获取字段与列族对应关系
            JSONArray jsonArray = new JSONArray(tableMeta);
            for (int i=0;i<jsonArray.length();i++) {
                JSONObject jsonObject = jsonArray.getJSONObject(i);
                JSONArray colArr = jsonObject.getJSONArray("colName");
                for (int j=0;j<colArr.length();j++) {
                    metaMap.put(colArr.get(i).toString(),jsonObject.get("cfName").toString());
                }
            }

        }

        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {

            //map读取hdfs文件,并按照分割符进行切分;
            String[] data = value.toString().split(delimiter);

            byte[] rowKeyByte = Bytes.toBytes(data[srcMap.get("id")]);

            Put put = new Put(rowKeyByte);
            // put.addColumn(列族名,列名,列数据)
            for(String k : metaMap.keySet()) {
                put.addColumn(Bytes.toBytes(metaMap.get(k)), Bytes.toBytes(k),Bytes.toBytes(data[srcMap.get(k)]));
            }

            context.write(new ImmutableBytesWritable(rowKeyByte),put);

        }

        @Data
        static class TableMeta {
            String cfName;
            String[] colName;
        }

    }


}

BulkloadHfiles类:加载HfileHbase表中
package pers.semon.bulkload;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

/**
 * @Description:
 * @Author: Semon
 * @Version: v1.0
 * @Date: 2021/9/29 21:31
 */
public class BulkloadHfiles {
    public static void main(String[] args) throws Exception {
        LoadIncrementalHFiles loadIncrementalHFiles =  new LoadIncrementalHFiles(ResourceLoader.conf);
        Connection conn = ConnectionFactory.createConnection(ResourceLoader.conf);
        loadIncrementalHFiles.doBulkLoad(ResourceLoader.outPath,conn.getAdmin(),conn.getTable(ResourceLoader.tableName),conn.getRegionLocator(ResourceLoader.tableName));
    }
}

作者:wangsong