Demo - HiveOverHbase
更新时间: 2024-03-11 02:50:46
阅读 1227
DEMO-HiveOverHbase
适用模块
hive
具体说明
方案一:通过Hive外表+Hbase API写入
方案二:Hive外表+bulkload写入数据
方案三:通过自定义Jar包写入数据
使用示例
## 方案一:通过Hive外表+Hbase API写入
通过Hive外表+ Hbase API方式直接向Hbase集群写入数据量过大将会导致Hbase集群瞬时QPS过高告警,影响Hbase集群性能;
# Hive外表映射Hbase
-- zk_hostname需替换成用户hbase集群对应zk主机名
set hbase.zookeeper.quorum=zk_hostname1:port,zk_hostname2:port,zk_hostname3:port;
-- /hbase_namespace需替换为hbase集群注册至zk中对应namespace
set zookeeper.znode.parent=/hbase_namespace;
--创建Hive外表映射Hbase表
use bdservice;
create external table hive_hbase_api_simple (
key int,
issue string,
summary string,
created string,
creator string
) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES (
"hbase.columns.mapping" = ":key,cf1:issue,cf1:summary,cf1:created,cf1:creator"
)
TBLPROPERTIES
("hbase.table.name" = "hbase_api_simple");
--通过Hive外表写入数据到Hbase
insert into table hive_bulkload_hbase_simple
select * from hive_bulkload_hbase_simple;
1. 创建hive外表要求Hbase中数据表已存在;
2. 需Hbase相关Jar包支持;(将Hbase中$HBASE_HOME/lib下hbase*相关Jar包拷贝至$HIVE_HOME/lib下)
3. Hive需hbase-site.xml支持;(也可通过HiveCli进行设置hbase.zookeeper.quorum及zookeeper.znode.parent参数,如果zk端口非默认,需添加set clientPort = xxx参数)
4. 若Hbase集群未开启Kerberos认证,则需使用Mammut提供hbase-client-1.2.6.jar替换原生Jar包,否则无法支持`zookeeper.sasl.client`参数( hbase-client-1.2.6.jar),并配置以下参数:
set zookeeper.sasl.client=false;
set ipc.client.fallback-to-simple-auth-allowed=true;
## 方案二:Hive外表+bulkload写入数据
通过Hive外表+bulkload方式可实现支持大批量数据加载至Hbase,并且不影响Hbase集群性能,但此方式不支持Hbase表多region;
# Hive外表映射Hbase
```SQL
-- zk_hostname需替换成用户hbase集群对应zk主机名
set hbase.zookeeper.quorum=zk_hostname1:port,zk_hostname2:port,zk_hostname3:port;
-- /hbase_namespace需替换为hbase集群注册至zk中对应namespace
set zookeeper.znode.parent=/hbase_namespace;
--创建Hive外表映射Hbase表
use bdservice;
create external table hive_hbase_api_simple (key int ,issue string ,summary string,created string,creator string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:issue,cf1:summary,cf1:created,cf1:creator")
TBLPROPERTIES ("hbase.table.name" = "hbase_api_simple");
###### 通过Hive外表生成Hfile
set hive.hbase.generatehfiles=true;
set hfile.family.path=/hbase/hfiles/cf1;
set mapreduce.job.reduces=100;
dfs -rm -r -f /hbase/hfiles/cf1;
use bdservice;
insert
overwrite table hive_bulkload_hbase_simple
select
*
from
hive_bulkload_hbase_source sort by rowkey;
>指定的hdfs路径的末级目录名称必须与Hbase表列族名一致;
>指定的hdfs目录要求不存在,如目录已存在,需删除对应目录;
>插入数据时必须对rowkey字段进行升序排序,如rowkey为非字符串类型,需显示转换为字符串后进行排序;
>当数据量较大时,建议手动配置reduce数量,每个reduce对应生成一个hfile文件;
# 跨集群拷贝
# ipc.client.fallback-to-simple-auth-allowed指定鉴权回落为简单认证
# --config 指定当前集群配置文件目录
hadoop --config ${2}/conf distcp -D ipc.client.fallback-to-simple-auth-allowed=true hdfs://bdms/user/poc/ws/hfiles/ hdfs://hdfscluster-sc/tmp/poc/hfiles
# 加载Hfile到Hbase中
# --config 指定hbase所在集群hdfs配置文件
hadoop --config ./conf jar ./lib/hbase-server-1.2.6.jar completebulkload -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=120 hdfs://hdfscluster-sc/tmp/poc/hfiles/ hive_bulkload_hbase_simple
>指定待加载Hfile文件路径时,需指定至列族同名文件夹上一级;
单次加载Hfile文件过多时,需指定参数 -Dhbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily=120;
加载过程中出现java.lang.ClassNotFoundException,则需显示export HBASE_HOME及HADOOP_CLASSPATH变量;
加载Hfile时,如Hbase集群未启用安全认证,需在hdfs客户端配置文件hdfs-site.xml中添加以下参数回退集群安全配置;
<property>
<name>ipc.client.fallback-to-simple-auth-allowed</name>
<value>true</value>
</property>
<property>
<name>hbase.ipc.client.fallback-to-simple-auth-allowed</name>
<value>true</value>
</property>
## 方案三:通过自定义Jar包写入数据
通过自定义Jar包可实现根据hdfs上文件生成Hfile,支持Hbase多region表;
# 配置文件准备
需提前准备配置文件包括`hdfs-site.xml`、`hbase-site.xml`及`table.schema`
+ hdfs-site.xml:需包括hdfs文件所在集群及hbase所在集群配置信息;
+ hbase-site.xml:主要包含hbase依赖zk服务相关配置信息;
+ job.conf:主要mr任务相关信息;
```bash
# 数据路径
inPath=/user/poc/hive_db/poc.db/hbase_userinfo
# 数据文件分隔符
delimiter=,
# 数据文件包含字段(有序)
colArr=id,name,sex
# hfile存储路径
outPath=/user/poc/semon/hfile
# 指定任务运行名称
jobName=idea-debug
# hadoop相关配置文件所在相对路径(相对Jar包)
resPath=/00-conf/hadoop
# hbase表名
tableName=demo:userinfo
# hbase表rowkey对应列
rowKey=id
# Hbase表列族及字段定义
tableMeta=[{"cfName":"info","colName":["name","sex"]}]
# 是否启用kerberos
auth=kerberos
# kerberos配置文件相对路径
krbFile=/00-conf/krb5.conf
# keytab文件相对路径
keyFile=/00-conf/demo-wangsong03.keytab
# Jar包依赖信息
```groovy
provided group: 'org.apache.hadoop', name: 'hadoop-client', version: '2.9.2'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-client
provided group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.6'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-server
provided group: 'org.apache.hbase', name: 'hbase-server', version: '1.2.6'
// https://mvnrepository.com/artifact/org.apache.hbase/hbase-common
provided group: 'org.apache.hbase', name: 'hbase-common', version: '1.2.6'
# Jar包代码
ResourceLoader类:读取本地配置文件并解析
package pers.semon.bulkload;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.security.UserGroupInformation;
import sun.security.krb5.internal.ktab.KeyTab;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Properties;
/**
* @Description:加载配置文件信息
* @Author: Semon
* @Version: v1.0
* @Date: 2021/9/28 22:05
*/
public class ResourceLoader {
public static String confFile;
public static Properties prop;
public static String resPath;
public static String auth;
public static String krbFile;
public static String keyFile;
public static String principal;
public static UserGroupInformation ugi;
public static Path inPath;
public static Path outPath;
public static TableName tableName;
public static int taskNum;
public static String jobName;
public static Configuration conf;
static {
prop = new Properties();
String basePath = System.getProperty("user.dir");
confFile= basePath+ "/00-conf/job.conf";
try {
prop.load(new FileInputStream(confFile));
} catch (IOException e) {
e.printStackTrace();
}
inPath = new Path(prop.getProperty("inPath"));
outPath = new Path(prop.getProperty("outPath"));
tableName = TableName.valueOf(prop.getProperty("tableName"));
if (prop.containsKey("taskNum")) {
taskNum = Integer.valueOf(prop.getProperty("taskNum"));
}
else {
taskNum=0;
}
jobName = prop.getProperty("jobName");
resPath = basePath + prop.getProperty("resPath");
conf = HBaseConfiguration.create();
for (File f : new File(resPath).listFiles()) {
if (f.getAbsolutePath().endsWith("xml")) {
try {
conf.addResource(f.toURI().toURL());
//本地调试添加,将任务提交至集群执行
conf.set("mapreduce.framework.name","yarn");
//本地运行需指定对应jar包,否则或出现mapper class not found
if (prop.containsKey("mapreduce.job.jar")) {
conf.set("mapreduce.job.jar","/Users/semon/wks/CodeSediment/01-utils/build/libs/RunBulkload-v1.jar");
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
}
}
auth = prop.getProperty("auth");
if (auth.equals("kerberos")) {
krbFile = basePath + prop.getProperty("krbFile");
keyFile = basePath+ prop.getProperty("keyFile");
principal = KeyTab.getInstance(new File(keyFile)).getOneName().getName();
System.setProperty("java.security.krb.conf" ,krbFile);
conf.set("hadoop.security.authentication","kerberos");
UserGroupInformation.setConfiguration(conf);
try {
ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal,keyFile);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
HfileGenerator类:生成Hfile文件到指定hdfs目录
package pers.semon.bulkload;
import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @Description:通过mr引擎生成Hfile
* @Author: Semon
* @Version: v1.0
* @Date: 2021/9/28 22:43
*/
public class HfileGenerator {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//启用kerberos认证
ResourceLoader.ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
generateJob(args);
return null;
}
});
}
public static void generateJob(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
//解析参数,通过GenericOptionsParser类,将MR相关参数添加至configuration中,如设置队列、容器大小等;
Configuration conf = ResourceLoader.conf;
String[] remainingArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
//创建Job实例,并指定任务配置信息及任务名
Job job = Job.getInstance(conf, ResourceLoader.jobName);
//设置Job任务对应的类文件
job.setJarByClass(HfileGenerator.class);
//设置job任务对应mapper类
job.setMapperClass(TextMapper.class);
//设置mapper输出kv数据类型
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//设置输入输出hdfs路径,如果输出路径已存在则需删除
FileSystem fs = FileSystem.get(conf);
if ( fs.exists(ResourceLoader.outPath) || fs.isDirectory(ResourceLoader.outPath)) {
fs.delete(ResourceLoader.outPath,true);
System.out.println(ResourceLoader.outPath.toString() + " already exists, Job delete it success!");
}
FileInputFormat.addInputPath(job,ResourceLoader.inPath);
FileOutputFormat.setOutputPath(job,ResourceLoader.outPath);
//设置配置文件上传至hdfs路径,如不存在则创建
Path tmpPath = new Path("/tmp/" + ResourceLoader.ugi.getShortUserName() + "/" + ResourceLoader.jobName);
if (!fs.exists(tmpPath) && !fs.isDirectory(tmpPath)) {
fs.mkdirs(tmpPath);
}
//上传配置文件至hdfs
fs.copyFromLocalFile(new Path(ResourceLoader.confFile),tmpPath);
job.addCacheFile(new URI(tmpPath+ "/" + new Path(ResourceLoader.confFile).getName()));
//设置mapreduce task数量,如未设置,则与Hbase Region数量保持一致
if (ResourceLoader.taskNum != 0) {
job.setNumReduceTasks(ResourceLoader.taskNum);
}
//获取Hbase表元数据信息,包括表名、列族等信息
HTableDescriptor hTableDescriptor = new HTableDescriptor(ResourceLoader.tableName);
Connection conn = ConnectionFactory.createConnection(conf);
HFileOutputFormat2.configureIncrementalLoad(job,hTableDescriptor,conn.getRegionLocator(ResourceLoader.tableName));
//启动任务
System.exit(job.waitForCompletion(true)? 0:1);
}
public static void cancel() {}
public static class TextMapper extends Mapper<LongWritable, Text , ImmutableBytesWritable, Put> {
public static Properties prop;
public static byte[] rowKey;
public static String delimiter;
public static String tableMeta;
public static String[] colArr;
public static Map<String,Integer> srcMap = new HashMap<>();
public static Map<String,String> metaMap = new HashMap<>();
@Override
protected void setup(Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
URI[] cacheFiles = context.getCacheFiles();
FileSystem fs = FileSystem.get(context.getConfiguration());
prop = new Properties();
for (URI uri : cacheFiles) {
if (uri.toString().endsWith("conf")) {
prop.load(new BufferedInputStream(fs.open(new Path(uri.getPath()))));
}
}
rowKey = Bytes.toBytes(prop.getProperty("rowKey"));
delimiter = prop.getProperty("delimiter");
colArr = prop.getProperty("colArr").split(",");
tableMeta = prop.getProperty("tableMeta");
//colArr为文件字段顺序,将字段名称与循序存入map中
for (int i=0;i<colArr.length;i++) {
srcMap.put(colArr[i],i );
}
//获取字段与列族对应关系
JSONArray jsonArray = new JSONArray(tableMeta);
for (int i=0;i<jsonArray.length();i++) {
JSONObject jsonObject = jsonArray.getJSONObject(i);
JSONArray colArr = jsonObject.getJSONArray("colName");
for (int j=0;j<colArr.length();j++) {
metaMap.put(colArr.get(i).toString(),jsonObject.get("cfName").toString());
}
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context) throws IOException, InterruptedException {
//map读取hdfs文件,并按照分割符进行切分;
String[] data = value.toString().split(delimiter);
byte[] rowKeyByte = Bytes.toBytes(data[srcMap.get("id")]);
Put put = new Put(rowKeyByte);
// put.addColumn(列族名,列名,列数据)
for(String k : metaMap.keySet()) {
put.addColumn(Bytes.toBytes(metaMap.get(k)), Bytes.toBytes(k),Bytes.toBytes(data[srcMap.get(k)]));
}
context.write(new ImmutableBytesWritable(rowKeyByte),put);
}
@Data
static class TableMeta {
String cfName;
String[] colName;
}
}
}
BulkloadHfiles类:加载Hfile到Hbase表中
package pers.semon.bulkload;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
/**
* @Description:
* @Author: Semon
* @Version: v1.0
* @Date: 2021/9/29 21:31
*/
public class BulkloadHfiles {
public static void main(String[] args) throws Exception {
LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(ResourceLoader.conf);
Connection conn = ConnectionFactory.createConnection(ResourceLoader.conf);
loadIncrementalHFiles.doBulkLoad(ResourceLoader.outPath,conn.getAdmin(),conn.getTable(ResourceLoader.tableName),conn.getRegionLocator(ResourceLoader.tableName));
}
}
作者:wangsong
文档反馈
以上内容对您是否有帮助?