首页 > 其他分享 >hcatalog简介和使用

hcatalog简介和使用

时间:2023-04-25 11:34:53浏览次数:32  
标签:hcatalog 简介 使用 import apache org throws schema


Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

 

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

 

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.Java

1. public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws  
2.     MetaException, TException {  
3. if (localMetaStore) {  
4. throw new UnsupportedOperationException("getDelegationToken() can be " +  
5. "called only in thrift (non local) mode");  
6.   }  
7. return client.get_delegation_token(owner, renewerKerberosPrincipalName);  
8. }



 

HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:

 

1. public static void setInput(Job job,  
2. throws IOException;


 

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

 

1. public static HCatSchema getTableSchema(JobContext context)   
2. throws IOException;



在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

 

 

HCatOutputFormat API:

1. public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;



OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt='2013-06-13',country='china' ),可以这样写

 


1. Map<String, String> partitionValues = new HashMap<String, String>();  
2. partitionValues.put("dt", "2013-06-13");  
3. partitionValues.put("country", "china");  
4. HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);  
5. HCatOutputFormat.setOutput(job, info);


 



1. public static HCatSchema getTableSchema(JobContext context) throws IOException;



获取之前HCatOutputFormat.setOutput指定的table schema信息

 

 

1. public static void setSchema(final Job job, final HCatSchema schema) throws IOException;



设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

 

 

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中

 

1. import java.io.IOException;  
2. import java.util.Iterator;  
3.   
4. import org.apache.hadoop.conf.Configuration;  
5. import org.apache.hadoop.conf.Configured;  
6. import org.apache.hadoop.io.IntWritable;  
7. import org.apache.hadoop.io.Text;  
8. import org.apache.hadoop.io.WritableComparable;  
9. import org.apache.hadoop.mapreduce.Job;  
10. import org.apache.hadoop.mapreduce.Mapper;  
11. import org.apache.hadoop.mapreduce.Reducer;  
12. import org.apache.hadoop.util.Tool;  
13. import org.apache.hadoop.util.ToolRunner;  
14. import org.apache.hcatalog.data.DefaultHCatRecord;  
15. import org.apache.hcatalog.data.HCatRecord;  
16. import org.apache.hcatalog.data.schema.HCatSchema;  
17. import org.apache.hcatalog.mapreduce.HCatInputFormat;  
18. import org.apache.hcatalog.mapreduce.HCatOutputFormat;  
19. import org.apache.hcatalog.mapreduce.InputJobInfo;  
20. import org.apache.hcatalog.mapreduce.OutputJobInfo;  
21.   
22. public class GroupByGuid extends Configured implements Tool {  
23.   
24. @SuppressWarnings("rawtypes")  
25. public static class Map extends  
26.             Mapper<WritableComparable, HCatRecord, Text, IntWritable> {  
27.         HCatSchema schema;  
28.         Text guid;  
29.         IntWritable one;  
30.   
31. @Override  
32. protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)  
33. throws IOException, InterruptedException {  
34. new Text();  
35. new IntWritable(1);  
36.             schema = HCatInputFormat.getTableSchema(context);  
37.         }  
38.   
39. @Override  
40. protected void map(WritableComparable key, HCatRecord value,  
41. throws IOException, InterruptedException {  
42. "guid", schema));  
43.             context.write(guid, one);  
44.         }  
45.     }  
46.   
47. @SuppressWarnings("rawtypes")  
48. public static class Reduce extends  
49.             Reducer<Text, IntWritable, WritableComparable, HCatRecord> {  
50.         HCatSchema schema;  
51.   
52. @Override  
53. protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)  
54. throws IOException, InterruptedException {  
55.             schema = HCatOutputFormat.getTableSchema(context);  
56.         }  
57.   
58. @Override  
59. protected void reduce(Text key, Iterable<IntWritable> values,  
60. throws IOException, InterruptedException {  
61. int sum = 0;  
62.             Iterator<IntWritable> iter = values.iterator();  
63. while (iter.hasNext()) {  
64.                 sum++;  
65.                 iter.next();  
66.             }  
67. new DefaultHCatRecord(2);  
68. "guid", schema, key.toString());  
69. "count", schema, sum);  
70. null, record);  
71.         }  
72.     }  
73.   
74. @Override  
75. public int run(String[] args) throws Exception {  
76.         Configuration conf = getConf();  
77.   
78. 0];  
79. 1];  
80. 2];  
81. 3];  
82. int reduceNum = Integer.parseInt(args[4]);  
83.   
84. new Job(conf,  
85. "GroupByGuid, Calculating every guid's pageview");  
86.         HCatInputFormat.setInput(job,  
87.                 InputJobInfo.create(dbname, inputTable, filter));  
88.   
89. class);  
90. class);  
91. class);  
92. class);  
93. class);  
94. class);  
95. class);  
96. class);  
97.         job.setNumReduceTasks(reduceNum);  
98.   
99.         HCatOutputFormat.setOutput(job,  
100. null));  
101.         HCatSchema s = HCatOutputFormat.getTableSchema(job);  
102.         HCatOutputFormat.setSchema(job, s);  
103.   
104. class);  
105.   
106. return (job.waitForCompletion(true) ? 0 : 1);  
107.     }  
108.   
109. public static void main(String[] args) throws Exception {  
110. int exitCode = ToolRunner.run(new GroupByGuid(), args);  
111.         System.exit(exitCode);  
112.     }  
113. }


 

其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了


 

标签:hcatalog,简介,使用,import,apache,org,throws,schema
From: https://blog.51cto.com/u_16087105/6223669

相关文章

  • 前端使用CryptoJS加密解密
    1、安装crypto-js;npminstallcrypto-js--save-devyarnaddcrypto-js--dev2、新建unit.js写成公共方法;constCryptoJS=require('crypto-js');//16位十六进制数作为密钥(秘钥为随机生成,必须与后端保持一致!)constkey=CryptoJS.enc.Utf8.parse("xxxxxxxxxxxxxx");//......
  • Xxl-job安装部署以及SpringBoot集成Xxl-job使用
    1、安装Xxl-job:可以使用docker拉取镜像部署和源码编译两种方式,这里选择源码编译安装。代码拉取地址:https://github.com/xuxueli/xxl-job/tree/2.1.2官方开发文档:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA......
  • kubernetes 使用 1
    安装1.用以下命令下载最新发行版:curl-LO"https://dl.k8s.io/release/$(curl-L-shttps://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"可以用以下方式指定版本curl-LOhttps://dl.k8s.io/release/v1.27.0/bin/linux/amd64/kubectl2.验证该可执行文件(可选步......
  • QueryWrapper中or的使用
    queryWrapper.and(wrapper->{//拼接sqlwrapper.like("user_name",name).or().like("user_admin_name",name);});where(user_id='1'and(user_namelike'%111%'oruser_admin_namelike'%222%......
  • 接口测试工具-Postman使用详解
    ✅作者简介:热爱科研的算法开发者,Python、Matlab项目可交流、沟通、学习。......
  • 微信公众号使用隐藏页判断登录
    <scripttype="text/javascript">   $(document).ready(function(){      document.getElementById("over").style.display="block";      document.getElementById("layout").style.display="block";      //判断......
  • Go 使用 MongoDB 实现分页查询
    解决过程CSDN中搜到一个有Bug的代码import( "context" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options")funcFind(database*mongo.Databas......
  • 使用PyTorch和Flower 进行联邦学习
    本文将介绍如何使用Flower构建现有机器学习工作的联邦学习版本。我们将使用PyTorch在CIFAR-10数据集上训练卷积神经网络,然后将展示如何修改训练代码以联邦的方式运行训练。完整文章:https://avoid.overfit.cn/post/8d05a12c208c4f499573c9966d0fe415......
  • java中使用RedisTemplate读取数据异常 Missing type id when trying to resolve subt
    java中使用RedisTemplate读取数据异常Missingtypeidwhentryingtoresolvesubtypeof[simpletype,classjava.lang.Object]:missingtypeidproperty'@class'at[Source:(byte[])"报错:Causedby:com.fasterxml.jackson.databind.exc.InvalidTypeIdExcep......
  • 拒绝“千人一面”!使用CDP实现市场细分助力GMV增长
    哈佛商学院在美国进行的一项调查发现,95%的新品牌失败是由于无效的营销细分,其失败的原因在于每个消费者都是独立的个体,他们也有着独特的要求。因此,千人一面的营销策略无法吸引所有用户。而成功品牌总是会向正确的消费者提供正确的产品,为此,营销人员需要彻底了解他们的客户,市场细分可......