hadoop mr 输出需要导入hbase的话最好先输出成HFile格式, 再导入到HBase,因为HFile是HBase的内部存储格式, 所以导入效率很高,下面是一个示例
1. 创建HBase表t1
1. hbase(main):157:0* create 't1','f1'
2. 0 row(s) in 1.3280 seconds
3.
4. hbase(main):158:0>
5. ROW COLUMN+CELL
6. 0 row(s) in 1.2770 seconds
2.写MR作业
HBaseHFileMapper.java
1. package com.test.hfile;
2. import java.io.IOException;
3. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4. import org.apache.hadoop.hbase.util.Bytes;
5. import org.apache.hadoop.io.LongWritable;
6. import org.apache.hadoop.io.Text;
7. import org.apache.hadoop.mapreduce.Mapper;
8.
9. public class HBaseHFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Text>
10. immutableBytesWritable = new
11. @Override
12. protected void map(LongWritable key, Text value,
13. org.apache.hadoop.mapreduce.Mapper.Context context)
14. throws IOException, InterruptedException {
15. immutableBytesWritable.set(Bytes.toBytes(key.get()));
16. context.write(immutableBytesWritable, value);
17. }
18. }
HBaseHFileReducer.java
1. package com.test.hfile;
2. import java.io.IOException;
3. import org.apache.hadoop.hbase.KeyValue;
4. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
5. import org.apache.hadoop.hbase.util.Bytes;
6. import org.apache.hadoop.io.Text;
7. import org.apache.hadoop.mapreduce.Reducer;
8.
9. public class HBaseHFileReducer extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable, KeyValue>
10. <Text>
11. Context context)
12. throws IOException, InterruptedException {
13. value="";
14. while(values.iterator().hasNext())
15. {
16. value = values.iterator().next().toString();
17. if(value != null && !"".equals(value))
18. {
19. kv = createKeyValue(value.toString());
20. if(kv!=null)
21. context.write(key, kv);
22. }
23. }
24. row:family:qualifier:value 简单模拟下
25. private KeyValue createKeyValue(String str)
26. {
27. strstrs
28. <4)
29. return null;
30. row=strs[0];
31. family=strs[1];
32. qualifier=strs[2];
33. value=strs[3];
34. return new KeyValue(Bytes.toBytes(row),Bytes.toBytes(family),Bytes.toBytes(qualifier),System.currentTimeMillis(), Bytes.toBytes(value));
35. }
36. }
HbaseHFileDriver.java
1. package com.test.hfile;
2. import java.io.IOException;
3. import org.apache.hadoop.conf.Configuration;
4. import org.apache.hadoop.fs.Path;
5. import org.apache.hadoop.hbase.HBaseConfiguration;
6. import org.apache.hadoop.hbase.client.HTable;
7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8. import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
9. import org.apache.hadoop.io.Text;
10. import org.apache.hadoop.mapreduce.Job;
11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13. import org.apache.hadoop.util.GenericOptionsParser;
14.
15. public class HbaseHFileDriver {
16. public static void main(String[] args) throws IOException,
17. InterruptedException, ClassNotFoundException {
18.
19. conf = new
20. otherArgs = new
21.
22. job = new
23. job.setJarByClass(HbaseHFileDriver.class);
24.
25. job.setMapperClass(com.test.hfile.HBaseHFileMapper.class);
26. job.setReducerClass(com.test.hfile.HBaseHFileReducer.class);
27.
28. job.setMapOutputKeyClass(ImmutableBytesWritable.class);
29. job.setMapOutputValueClass(Text.class);
30. // 偷懒, 直接写死在程序里了,实际应用中不能这样, 应从命令行获取
31. /home/yinjie/input"));
32. /home/yinjie/output"));
33.
34. HBASE_CONFIG = new
35. HBASE_CONFIG.set("hbase.zookeeper.quorum", "localhost");
36. HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
37. cfg = new
38. tableName = "t1";
39. htable = new
40. HFileOutputFormat.configureIncrementalLoad(job, htable);
41.
42. System.exit(job.waitForCompletion(true) ? 0 : 1);
43. }
44. }
/home/yinjie/input目录下有一个hbasedata.txt文件,内容为
1. [root@localhost input]# cat hbasedata.txt
2. r1:f1:c1:value1
3. r2:f1:c2:value2
4. r3:f1:c3:value3
将作业打包,我的到处路径为/home/yinjie/job/hbasetest.jar
提交作业到hadoop运行:
1. [root@localhost job]# hadoop jar /home/yinjie/job/hbasetest.jar com.test.hfile.HbaseHFileDriver -libjars /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar
作业运行完毕后查看下输出目录:
1. [root@localhost input]# hadoop fs -ls /home/yinjie/output
2. Found 2 items
3. drwxr-xr-x - root supergroup 0 2011-08-28 21:02 /home/yinjie/output/_logs
4. drwxr-xr-x - root supergroup 0 2011-08-28 21:03 /home/yinjie/output/f1
OK, 已经生成以列族f1命名的文件夹了。
接下去使用Bulk Load将数据导入到HBbase
1. [root@localhost job]# hadoop jar /home/yinjie/hbase-0.90.3/hbase-0.90.3.jar completebulkload /home/yinjie/output t1
导入完毕,查询hbase表t1进行验证
1. hbase(main):166:0>
2. ROW COLUMN+CELL
3. column=f1:c1, timestamp=1314591150788, value=value1
4. column=f1:c2, timestamp=1314591150814, value=value2
5. column=f1:c3, timestamp=1314591150815, value=value3
6. 3 row(s) in 0.0210 seconds
数据已经导入!
本文出自 “炽天使” 博客,请务必保留此出处http://3199782.blog.51cto.com/3189782/652244
标签:hadoop,HFileOutputFormat,job,import,apache,org,HBase,hbase From: https://blog.51cto.com/u_16255870/7548713