首页 > 其他分享 >HBase 写优化之 BulkLoad 实现数据快速入库

HBase 写优化之 BulkLoad 实现数据快速入库

时间:2023-01-01 21:02:44浏览次数:65  
标签:hadoop apache job org BulkLoad new HBase class 入库


目录[-]



  • ​​1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题?​​
  • ​​2、bulkload 流程与实践​​
  • ​​3、说明与注意事项:​​
  • ​​4、Refer:​​



1、为何要 BulkLoad 导入?传统的 HTableOutputFormat 写 HBase 有什么问题?

我们先看下 HBase 的写流程:

HBase 写优化之 BulkLoad 实现数据快速入库_java

通常 MapReduce 在写HBase时使用的是 TableOutputFormat 方式,在reduce中直接生成put对象写入HBase,该方式在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应),而HBase支持 bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。
通过使用先生成HFile,然后再BulkLoad到Hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:
(1)消除了对HBase集群的插入压力
(2)提高了Job的运行速度,降低了Job的执行时间
目前此种方式仅仅适用于只有一个列族的情况,在新版 HBase 中,单列族的限制会消除。



2、bulkload 流程与实践


bulkload 方式需要两个Job配合完成: 

(1)第一个Job还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat写入到HBase,而是先写入到HDFS上的一个中间目录下(如 middata) 


(2)第二个Job以第一个Job的输出(middata)做为输入,然后将其格式化HBase的底层存储文件HFile 


(3)调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中


下面给出相应的范例代码:


import​​           ​​java.io.IOException;​​          





​​import​​ ​​org.apache.hadoop.conf.Configuration;​​


​​import​​ ​​org.apache.hadoop.fs.Path;​​


​​import​​ ​​org.apache.hadoop.hbase.HBaseConfiguration;​​


​​import​​ ​​org.apache.hadoop.hbase.KeyValue;​​


​​import​​ ​​org.apache.hadoop.hbase.client.HTable;​​


​​import​​ ​​org.apache.hadoop.hbase.client.Put;​​


​​import​​ ​​org.apache.hadoop.hbase.io.ImmutableBytesWritable;​​


​​import​​ ​​org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;​​


​​import​​ ​​org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;​​


​​import​​ ​​org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;​​


​​import​​ ​​org.apache.hadoop.hbase.util.Bytes;​​


​​import​​ ​​org.apache.hadoop.io.IntWritable;​​


​​import​​ ​​org.apache.hadoop.io.LongWritable;​​


​​import​​ ​​org.apache.hadoop.io.Text;​​


​​import​​ ​​org.apache.hadoop.mapreduce.Job;​​


​​import​​ ​​org.apache.hadoop.mapreduce.Mapper;​​


​​import​​ ​​org.apache.hadoop.mapreduce.Reducer;​​


​​import​​ ​​org.apache.hadoop.mapreduce.lib.input.FileInputFormat;​​


​​import​​ ​​org.apache.hadoop.mapreduce.lib.input.TextInputFormat;​​


​​import​​ ​​org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;​​


​​import​​ ​​org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;​​


​​import​​ ​​org.apache.hadoop.util.GenericOptionsParser;​​





​​public​​ ​​class​​ ​​GeneratePutHFileAndBulkLoadToHBase {​​





​​public​​ ​​static​​ ​​class​​ ​​WordCountMapper ​​ ​​extends​​ ​​Mapper<LongWritable, Text, Text, IntWritable>​​


​​{​​





​​private​​ ​​Text wordText=​​ ​​new​​ ​​Text();​​


​​private​​ ​​IntWritable one=​​ ​​new​​ ​​IntWritable(​​ ​​1​​ ​​);​​


​​@Override​​


​​protected​​ ​​void​​ ​​map(LongWritable key, Text value, Context context)​​


​​throws​​ ​​IOException, InterruptedException {​​


​​// TODO Auto-generated method stub​​


​​String line=value.toString();​​


​​String[] wordArray=line.split(​​ ​​" "​​ ​​);​​


​​for​​ ​​(String word:wordArray)​​


​​{​​


​​wordText.set(word);​​


​​context.write(wordText, one);​​


​​}​​





​​}​​


​​}​​





​​public​​ ​​static​​ ​​class​​ ​​WordCountReducer ​​ ​​extends​​ ​​Reducer<Text, IntWritable, Text, IntWritable>​​


​​{​​





​​private​​ ​​IntWritable result=​​ ​​new​​ ​​IntWritable();​​


​​protected​​ ​​void​​ ​​reduce(Text key, Iterable<IntWritable> valueList,​​


​​Context context)​​


​​throws​​ ​​IOException, InterruptedException {​​


​​// TODO Auto-generated method stub​​


​​int​​ ​​sum=​​ ​​0​​ ​​;​​


​​for​​ ​​(IntWritable value:valueList)​​


​​{​​


​​sum+=value.get();​​


​​}​​


​​result.set(sum);​​


​​context.write(key, result);​​


​​}​​





​​}​​





​​public​​ ​​static​​ ​​class​​ ​​ConvertWordCountOutToHFileMapper ​​ ​​extends​​ ​​Mapper<LongWritable, Text, ImmutableBytesWritable, Put>​​


​​{​​





​​@Override​​


​​protected​​ ​​void​​ ​​map(LongWritable key, Text value, Context context)​​


​​throws​​ ​​IOException, InterruptedException {​​


​​// TODO Auto-generated method stub​​


​​String wordCountStr=value.toString();​​


​​String[] wordCountArray=wordCountStr.split(​​ ​​"\t"​​ ​​);​​


​​String word=wordCountArray[​​ ​​0​​ ​​];​​


​​int​​ ​​count=Integer.valueOf(wordCountArray[​​ ​​1​​ ​​]);​​





​​//创建HBase中的RowKey​​


​​byte​​ ​​[] rowKey=Bytes.toBytes(word);​​


​​ImmutableBytesWritable rowKeyWritable=​​ ​​new​​ ​​ImmutableBytesWritable(rowKey);​​


​​byte​​ ​​[] family=Bytes.toBytes(​​ ​​"cf"​​ ​​);​​


​​byte​​ ​​[] qualifier=Bytes.toBytes(​​ ​​"count"​​ ​​);​​


​​byte​​ ​​[] hbaseValue=Bytes.toBytes(count);​​


​​// Put 用于列簇下的多列提交,若只有一个列,则可以使用 KeyValue 格式​​


​​// KeyValue keyValue = new KeyValue(rowKey, family, qualifier, hbaseValue);​​


​​Put put=​​ ​​new​​ ​​Put(rowKey);​​


​​put.add(family, qualifier, hbaseValue);​​


​​context.write(rowKeyWritable, put);​​





​​}​​





​​}​​





​​public​​ ​​static​​ ​​void​​ ​​main(String[] args) ​​ ​​throws​​ ​​Exception {​​


​​// TODO Auto-generated method stub​​


​​Configuration hadoopConfiguration=​​ ​​new​​ ​​Configuration();​​


​​String[] dfsArgs = ​​ ​​new​​ ​​GenericOptionsParser(hadoopConfiguration, args).getRemainingArgs();​​





​​//第一个Job就是普通MR,输出到指定的目录​​


​​Job job=​​ ​​new​​ ​​Job(hadoopConfiguration, ​​ ​​"wordCountJob"​​ ​​);​​


​​job.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.​​ ​​class​​ ​​);​​


​​job.setMapperClass(WordCountMapper.​​ ​​class​​ ​​);​​


​​job.setReducerClass(WordCountReducer.​​ ​​class​​ ​​);​​


​​job.setOutputKeyClass(Text.​​ ​​class​​ ​​);​​


​​job.setOutputValueClass(IntWritable.​​ ​​class​​ ​​);​​


​​FileInputFormat.setInputPaths(job, ​​ ​​new​​ ​​Path(dfsArgs[​​ ​​0​​ ​​]));​​


​​FileOutputFormat.setOutputPath(job, ​​ ​​new​​ ​​Path(dfsArgs[​​ ​​1​​ ​​]));​​


​​//提交第一个Job​​


​​int​​ ​​wordCountJobResult=job.waitForCompletion(​​ ​​true​​ ​​)?​​ ​​0​​ ​​:​​ ​​1​​ ​​;​​





​​//第二个Job以第一个Job的输出做为输入,只需要编写Mapper类,在Mapper类中对一个job的输出进行分析,并转换为HBase需要的KeyValue的方式。​​


​​Job convertWordCountJobOutputToHFileJob=​​ ​​new​​ ​​Job(hadoopConfiguration, ​​ ​​"wordCount_bulkload"​​ ​​);​​





​​convertWordCountJobOutputToHFileJob.setJarByClass(GeneratePutHFileAndBulkLoadToHBase.​​ ​​class​​ ​​);​​


​​convertWordCountJobOutputToHFileJob.setMapperClass(ConvertWordCountOutToHFileMapper.​​ ​​class​​ ​​);​​


​​//ReducerClass 无需指定,框架会自行根据 MapOutputValueClass 来决定是使用 KeyValueSortReducer 还是 PutSortReducer​​


​​//convertWordCountJobOutputToHFileJob.setReducerClass(KeyValueSortReducer.class);​​


​​convertWordCountJobOutputToHFileJob.setMapOutputKeyClass(ImmutableBytesWritable.​​ ​​class​​ ​​);​​


​​convertWordCountJobOutputToHFileJob.setMapOutputValueClass(Put.​​ ​​class​​ ​​);​​





​​//以第一个Job的输出做为第二个Job的输入​​


​​FileInputFormat.addInputPath(convertWordCountJobOutputToHFileJob, ​​ ​​new​​ ​​Path(dfsArgs[​​ ​​1​​ ​​]));​​


​​FileOutputFormat.setOutputPath(convertWordCountJobOutputToHFileJob, ​​ ​​new​​ ​​Path(dfsArgs[​​ ​​2​​ ​​]));​​


​​//创建HBase的配置对象​​


​​Configuration hbaseConfiguration=HBaseConfiguration.create();​​


​​//创建目标表对象​​


​​HTable wordCountTable =​​ ​​new​​ ​​HTable(hbaseConfiguration, ​​ ​​"word_count"​​ ​​);​​


​​HFileOutputFormat.configureIncrementalLoad(convertWordCountJobOutputToHFileJob,wordCountTable);​​





​​//提交第二个job​​


​​int​​ ​​convertWordCountJobOutputToHFileJobResult=convertWordCountJobOutputToHFileJob.waitForCompletion(​​ ​​true​​ ​​)?​​ ​​0​​ ​​:​​ ​​1​​ ​​;​​





​​//当第二个job结束之后,调用BulkLoad方式来将MR结果批量入库​​


​​LoadIncrementalHFiles loader = ​​ ​​new​​ ​​LoadIncrementalHFiles(hbaseConfiguration);​​


​​//第一个参数为第二个Job的输出目录即保存HFile的目录,第二个参数为目标表​​


​​loader.doBulkLoad(​​ ​​new​​ ​​Path(dfsArgs[​​ ​​2​​ ​​]), wordCountTable);​​





​​//最后调用System.exit进行退出​​


​​System.exit(convertWordCountJobOutputToHFileJobResult);​​





​​}​​





​​}​​



比如原始的输入数据的目录为:/rawdata/test/wordcount/20131212 


中间结果数据保存的目录为:/middata/test/wordcount/20131212 

最终生成的HFile保存的目录为:/resultdata/test/wordcount/20131212 


运行上面的Job的方式如下: 


hadoop jar test.jar /rawdata/test/wordcount/20131212 /middata/test/wordcount/20131212 /resultdata/test/wordcount/20131212 




3、说明与注意事项:

(1)HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作。

(2)最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
否则报这样的错误:


​​java.lang.IllegalArgumentException: Can't read partitions file​​         


​​...​​


​​Caused by: java.io.IOException: wrong key ​​ ​​class​​ ​​: org.apache.hadoop.io.*** is not ​​ ​​class​​ ​​org.apache.hadoop.hbase.io.ImmutableBytesWritable​​

(3)最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer,这个 SorterReducer 可以不指定,因为源码中已经做了判断:

​​

​​if​​           ​​(KeyValue.​​          ​​class​​          ​​.equals(job.getMapOutputValueClass())) {​​          


​​job.setReducerClass(KeyValueSortReducer.​​ ​​class​​ ​​);​​


​​} ​​ ​​else​​ ​​if​​ ​​(Put.​​ ​​class​​ ​​.equals(job.getMapOutputValueClass())) {​​


​​job.setReducerClass(PutSortReducer.​​ ​​class​​ ​​);​​


​​} ​​ ​​else​​ ​​{​​


​​LOG.warn(​​ ​​"Unknown map output value type:"​​ ​​+ job.getMapOutputValueClass());​​


​​}​​


(4) MR例子中job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat只适合一次对单列族组织成HFile文件,多列簇需要起多个 job,不过新版本的 Hbase 已经解决了这个限制。 


(5) MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库HBase,相当于move HFile到HBase的Region中,HFile子目录的列族内容没有了。


(6)最后一个 Reduce 没有 setNumReduceTasks 是因为,该设置由框架根据region个数自动配置的。

(7)下边配置部分,注释掉的其实写不写都无所谓,因为看源码就知道configureIncrementalLoad方法已经把固定的配置全配置完了,不固定的部分才需要手动配置。

​​public​​           ​​class​​           ​​HFileOutput {​​          


​​//job 配置​​


​​public​​ ​​static​​ ​​Job configureJob(Configuration conf) ​​ ​​throws​​ ​​IOException {​​


​​Job job = ​​ ​​new​​ ​​Job(configuration, ​​ ​​"countUnite1"​​ ​​);​​


​​job.setJarByClass(HFileOutput.​​ ​​class​​ ​​);​​


​​//job.setNumReduceTasks(2); ​​


​​//job.setOutputKeyClass(ImmutableBytesWritable.class);​​


​​//job.setOutputValueClass(KeyValue.class);​​


​​//job.setOutputFormatClass(HFileOutputFormat.class);​​





​​Scan scan = ​​ ​​new​​ ​​Scan();​​


​​scan.setCaching(​​ ​​10​​ ​​);​​


​​scan.addFamily(INPUT_FAMILY);​​


​​TableMapReduceUtil.initTableMapperJob(inputTable, scan,​​


​​HFileOutputMapper.​​ ​​class​​ ​​, ImmutableBytesWritable.​​ ​​class​​ ​​, LongWritable.​​ ​​class​​ ​​, job);​​


​​//这里如果不定义reducer部分,会自动识别定义成KeyValueSortReducer.class 和PutSortReducer.class​​


​​job.setReducerClass(HFileOutputRedcuer.​​ ​​class​​ ​​);​​


​​//job.setOutputFormatClass(HFileOutputFormat.class);​​


​​HFileOutputFormat.configureIncrementalLoad(job, ​​ ​​new​​ ​​HTable(​​


​​configuration, outputTable));​​


​​HFileOutputFormat.setOutputPath(job, ​​ ​​new​​ ​​Path());​​


​​//FileOutputFormat.setOutputPath(job, new Path()); //等同上句​​


​​return​​ ​​job;​​


​​}​​





​​public​​ ​​static​​ ​​class​​ ​​HFileOutputMapper ​​ ​​extends​​


​​TableMapper<ImmutableBytesWritable, LongWritable> {​​


​​public​​ ​​void​​ ​​map(ImmutableBytesWritable key, Result values,​​


​​Context context) ​​ ​​throws​​ ​​IOException, InterruptedException {​​


​​//mapper逻辑部分​​


​​context.write(​​ ​​new​​ ​​ImmutableBytesWritable(Bytes()), LongWritable());​​


​​}​​


​​}​​





​​public​​ ​​static​​ ​​class​​ ​​HFileOutputRedcuer ​​ ​​extends​​


​​Reducer<ImmutableBytesWritable, LongWritable, ImmutableBytesWritable, KeyValue> {​​


​​public​​ ​​void​​ ​​reduce(ImmutableBytesWritable key, Iterable<LongWritable> values,​​


​​Context context) ​​ ​​throws​​ ​​IOException, InterruptedException {​​


​​//reducer逻辑部分​​


​​KeyValue kv = ​​ ​​new​​ ​​KeyValue(row, OUTPUT_FAMILY, tmp[​​ ​​1​​ ​​].getBytes(),​​


​​Bytes.toBytes(count));​​


​​context.write(key, kv);​​


​​}​​


​​}​​


​​}​​


4、Refer:

1、Hbase几种数据入库(load)方式比较

2、MapReduce生成HFile入库到HBase及源码分析

​http://blog.pureisle.net/archives/1950.html​

3、MapReduce生成HFile入库到HBase

​http://shitouer.cn/2013/02/hbase-hfile-bulk-load/​

标签:hadoop,apache,job,org,BulkLoad,new,HBase,class,入库
From: https://blog.51cto.com/u_15785444/5983343

相关文章

  • Hive篇---Hive与Hbase整合
    =========================================================声明:由于不同平台阅读格式不一致(尤其源码部分),所以获取更多阅读体验!!个人网站地址:​​http://www.lhworldblog.......
  • 几个函数的使用例子:更新VBRK-XBLNR,IB01设备BOM创建,LI11N输入库存盘点
    最近用到一些函数,网上的相关资料不多,这里记录一下。本文链接:https://www.cnblogs.com/hhelibeb/p/17012303.html 1,使用RV_INVOICE_HEAD_MAINTAIN更新VBRK-ZUNOR和VBR......
  • MybatisPlus 中文入库变成问号
    环境依赖pom <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.1</version> </dependency......
  • 部署hadoop-hbase
     将hbase-env.sh的最后一行注释去掉:exportHBASE_DISABLE_HADOOP_CLASSPATH_LOOKUP="true"修改主机名的位置/hbase/conf/regionservers/hbase/conf/backup......
  • 网格数据入库程序
    一、文件流(读取文件)使用BufferedReader解析文件,可以指定解析格式使用BufferedInputStream解析文件中的中文会乱码二、程序打包jar所有的路径只能用 '/',不能使用......
  • 【分布式存储数据恢复】hbase和hive数据库数据恢复案例
    分布式存储数据恢复环境:16台物理服务器,每台物理服务器上有数台虚拟机;虚拟机上配置分布式,上层部署hbase数据库和hive数据库。分布式存储故障&分析:误删除数据库底层文件,数......
  • Hive与HBase的整合
    开场白:Hive与HBase的整合功能的实现是利用两者本身对外的API接口互相进行通信,相互通信主要是依靠hive_hbase-handler.jar工具类(​​HiveStorageHandlers​​),大致意思......
  • HBase RowKey设计
    1HBase表热点1.1什么是热点检索habse的记录首先要通过rowkey来定位数据行。当大量的client访问hbase集群的一个或少数几个节点,造成少数regionserver的读/写请求过多、负......
  • 【大数据入门核心技术-HBase】(七)HBase Python API 操作
    OverridetheentrypointofanimageIntroducedinGitLabandGitLabRunner9.4.Readmoreaboutthe extendedconfigurationoptions.Beforeexplainingtheav......
  • 【Python】数据入库出库处理/list列表/数组/转字符串
     #!/usr/bin/envpython#-*-coding:utf-8-*-"""@Time:@Author:@File:dbDataTool.py@Version:1.0数据入库出库处理相关工具@Function:"""importha......