Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。
HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance
由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode
HiveMetaStoreClient.Java
1. public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws
2. MetaException, TException {
3. if (localMetaStore) {
4. throw new UnsupportedOperationException("getDelegationToken() can be " +
5. "called only in thrift (non local) mode");
6. }
7. return client.get_delegation_token(owner, renewerKerberosPrincipalName);
8. }
HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表
HCatInputFormat API:
1. public static void setInput(Job job,
2. throws IOException;
先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据
1. public static HCatSchema getTableSchema(JobContext context)
2. throws IOException;
在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息
HCatOutputFormat API:
1. public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;
OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values
比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写
1. Map<String, String> partitionValues = new HashMap<String, String>();
2. partitionValues.put("dt", "2013-06-13");
3. partitionValues.put("country", "china");
4. HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
5. HCatOutputFormat.setOutput(job, info);
1. public static HCatSchema getTableSchema(JobContext context) throws IOException;
获取之前HCatOutputFormat.setOutput指定的table schema信息
1. public static void setSchema(final Job job, final HCatSchema schema) throws IOException;
设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息
下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中
1. import java.io.IOException;
2. import java.util.Iterator;
3.
4. import org.apache.hadoop.conf.Configuration;
5. import org.apache.hadoop.conf.Configured;
6. import org.apache.hadoop.io.IntWritable;
7. import org.apache.hadoop.io.Text;
8. import org.apache.hadoop.io.WritableComparable;
9. import org.apache.hadoop.mapreduce.Job;
10. import org.apache.hadoop.mapreduce.Mapper;
11. import org.apache.hadoop.mapreduce.Reducer;
12. import org.apache.hadoop.util.Tool;
13. import org.apache.hadoop.util.ToolRunner;
14. import org.apache.hcatalog.data.DefaultHCatRecord;
15. import org.apache.hcatalog.data.HCatRecord;
16. import org.apache.hcatalog.data.schema.HCatSchema;
17. import org.apache.hcatalog.mapreduce.HCatInputFormat;
18. import org.apache.hcatalog.mapreduce.HCatOutputFormat;
19. import org.apache.hcatalog.mapreduce.InputJobInfo;
20. import org.apache.hcatalog.mapreduce.OutputJobInfo;
21.
22. public class GroupByGuid extends Configured implements Tool {
23.
24. @SuppressWarnings("rawtypes")
25. public static class Map extends
26. Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
27. HCatSchema schema;
28. Text guid;
29. IntWritable one;
30.
31. @Override
32. protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
33. throws IOException, InterruptedException {
34. new Text();
35. new IntWritable(1);
36. schema = HCatInputFormat.getTableSchema(context);
37. }
38.
39. @Override
40. protected void map(WritableComparable key, HCatRecord value,
41. throws IOException, InterruptedException {
42. "guid", schema));
43. context.write(guid, one);
44. }
45. }
46.
47. @SuppressWarnings("rawtypes")
48. public static class Reduce extends
49. Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
50. HCatSchema schema;
51.
52. @Override
53. protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
54. throws IOException, InterruptedException {
55. schema = HCatOutputFormat.getTableSchema(context);
56. }
57.
58. @Override
59. protected void reduce(Text key, Iterable<IntWritable> values,
60. throws IOException, InterruptedException {
61. int sum = 0;
62. Iterator<IntWritable> iter = values.iterator();
63. while (iter.hasNext()) {
64. sum++;
65. iter.next();
66. }
67. new DefaultHCatRecord(2);
68. "guid", schema, key.toString());
69. "count", schema, sum);
70. null, record);
71. }
72. }
73.
74. @Override
75. public int run(String[] args) throws Exception {
76. Configuration conf = getConf();
77.
78. 0];
79. 1];
80. 2];
81. 3];
82. int reduceNum = Integer.parseInt(args[4]);
83.
84. new Job(conf,
85. "GroupByGuid, Calculating every guid's pageview");
86. HCatInputFormat.setInput(job,
87. InputJobInfo.create(dbname, inputTable, filter));
88.
89. class);
90. class);
91. class);
92. class);
93. class);
94. class);
95. class);
96. class);
97. job.setNumReduceTasks(reduceNum);
98.
99. HCatOutputFormat.setOutput(job,
100. null));
101. HCatSchema s = HCatOutputFormat.getTableSchema(job);
102. HCatOutputFormat.setSchema(job, s);
103.
104. class);
105.
106. return (job.waitForCompletion(true) ? 0 : 1);
107. }
108.
109. public static void main(String[] args) throws Exception {
110. int exitCode = ToolRunner.run(new GroupByGuid(), args);
111. System.exit(exitCode);
112. }
113. }
其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了
标签:hcatalog,简介,使用,import,apache,org,throws,schema From: https://blog.51cto.com/u_16087105/6223669