首页 > 其他分享 >使用HBase EndPoint(coprocessor)进行计算

使用HBase EndPoint(coprocessor)进行计算

时间:2023-03-28 10:09:29浏览次数:52  
标签:EndPoint byte region public RPC coprocessor HBase hbase


如果要统对hbase中的数据,进行某种统计,比如统计某个字段最大值,统计满足某种条件的记录数,统计各种记录特点,并按照记录特点分类(类似于sql的group by)~

常规的做法就是把hbase中整个表的数据scan出来,或者稍微环保一点,加一个filter,进行一些初步的过滤(对于rowcounter来说,就加了FirstKeyOnlyFilter),但是这么做来说还是会有很大的副作用,比如占用大量的网络带宽(当标级别到达千万级别,亿级别之后)尤为明显,RPC的量也是不容小觑的。

理想的方式应该是怎样?

拿row counter这个简单例子来说,我要统计总行数,如果每个region 告诉我他又多少行,然后把结果告诉我,我再将他们的结果汇总一下,不就行了么?
现在的问题是hbase没有提供这种接口,来统计每个region的行数,那是否我们可以自己来实现一个呢?
没错,正如本文标题所说,我们可以自己来实现一个Endpoint,然后让hbase加载起来,然后我们远程调用即可。

什么是Endpoint?

先弄清楚什么是hbase coprocessor

hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器),另外一种就是EndPoint,类似于关系数据库的存储过程。

观察者这里就多做介绍了,这里介绍Endpoint。

EndPoint是动态RPC插件的接口,它的实现代码被部署在服务器端(regionServer),从而能够通过HBase RPC调用。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个EndPoint,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

怎么实现一个EndPoint

1. 定义一个新的protocol接口,必须继承CoprocessorProtocol.
2. 实现终端接口,继承抽象类BaseEndpointCoprocessor,改实现代码需要部署到
3. 在客户端,终端可以被两个新的HBase Client API调用 。单个region:HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row) 。rigons区域:HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable),这里的region是通过一个row来标示的,就是说,改row落到那个region,RPC就发给哪个region,对于start-end的,[start,end)范围内的region都会受到RPC调用。

如图

使用HBase EndPoint(coprocessor)进行计算_RPC


 

view source print ?


1.    public interface CounterProtocol extends CoprocessorProtocol {
    
     2.    public long count(byte[] start, byte[] end) throws IOException;
    
     3.    }

 


view source print ?


01.    public class CounterEndPoint extends BaseEndpointCoprocessor implements CounterProtocol {
    
     02.     
    
     03.    @Override
    
     04.    public long count(byte[] start, byte []end) throws IOException {
    
     05.    // aggregate at each region
    
     06.    Scan scan = new Scan();
    
     07.    long numRow = 0;
    
     08.     
    
     09.    InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()
    
     10.    .getScanner(scan);
    
     11.    try {
    
     12.    List<KeyValue> curVals = new ArrayList<KeyValue>();
    
     13.    boolean hasMore = false;
    
     14.    do {
    
     15.    curVals.clear();
    
     16.    hasMore = scanner.next(curVals);
    
     17.    if (Bytes.compareTo(curVals.get(0).getRow(), start)<0) {
    
     18.    continue;
    
     19.    }
    
     20.    if (Bytes.compareTo(curVals.get(0).getRow(), end)>= 0) {
    
     21.    break;
    
     22.    }
    
     23.    numRow++;
    
     24.    } while (hasMore);
    
     25.    } finally {
    
     26.    scanner.close();
    
     27.    }
    
     28.    return numRow;
    
     29.    }
    
     30.     
    
     31.    }

view source print ?


01.    public class CounterEndPointDemo {
    
     02.    public static void main(String[] args) throws IOException, Throwable {
    
     03.    final String startRow = args[0];
    
     04.    final String endRow = args[1];
    
     05.     
    
     06.    @SuppressWarnings("resource")
    
     07.    HTableInterface table = new HTable(HBaseConfiguration.create(), "tc");
    
     08.    Map<byte[], Long> results;
    
     09.     
    
     10.    // scan: for all regions
    
     11.    results = table.coprocessorExec(CounterProtocol.class, startRow.getBytes(),
    
     12.    endRow.getBytes(), new Batch.Call<CounterProtocol, Long>() {
    
     13.    public Long call(CounterProtocol instance) throws IOException {
    
     14.    return instance.count(startRow.getBytes(), endRow.getBytes());
    
     15.    }
    
     16.    });
    
     17.     
    
     18.    long total = 0;
    
     19.    for (Map.Entry<byte[], Long> e : results.entrySet()) {
    
     20.    System.out.println(e.getValue());
    
     21.    total += e.getValue();
    
     22.    }
    
     23.     
    
     24.    System.out.println("total:" + total);
    
     25.    }
    
     26.    }

整个程序的框架其实又是另外一个mapreduce,只是运行在region server上面,reduce运行在客户端,其中map计算量较大,reduce计算量很小!

另外需要提醒的是:
protocol的返回类型,可以是基本类型。
如果是一个自定义的类型需要实现org.apache.hadoop.io.Writable接口。
关于详细的支持类型,请参考代码hbase源码:org.apache.hadoop.hbase.io.HbaseObjectWritable

怎么部署?

1. 通过hbase-site.xml增加

 


view source print ?


1.    <property>
    
     2.    <name>hbase.coprocessor.region.classes</name>
    
     3.    <value>xxxx.CounterEndPoint </value>
    
     4.    </property>

  1. 如果要配置多个,就用逗号(,)分割。
  2. 包含此类的jar必须位于hbase的classpath
  3. 这种coprocessor是作用于所有的表,如果你只想作用于部分表,请使用下面一种方式。

2. 通过shell方式
增加:

 


view source print ?


1.    hbase(main):005:0&gt; alter 't1', METHOD =&gt; 'table_att',
    
     2.    'coprocessor'=&gt;'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
    
     3.    Updating all regions with the new schema...
    
     4.    1/1 regions updated.
    
     5.    Done.
    
     6.    0 row(s) in 1.0730 seconds

coprocessor格式为:
[FilePath]|ClassName|Priority|arguments
arguments: k=v[,k=v]+

  1. 其中FilePath是hdfs路径,例如/tmp/zhenhe/cp/zhenhe-1.0.jar
  2. ClassNameEndPoint实现类的全名
  3. Priority为,整数,框架会根据这个数据决定多个cp的执行顺序
  4. Arguments,传给cp的参数
  5. 如果hbase的classpath包含改类,FilePath可以留空

卸载:

  1. 先describe “tableName‘,查看你要卸载的cp的编号
  2. 然后alter 't1', METHOD => 'table_att_unset', NAME=> 'coprocessor$3',coprocessor$3可变。

应用场景

这是一个最简单的例子,另外还有很多统计场景,可以用在这种方式实现,有如下好处:

  1. 节省网络带宽
  2. 减少RPC调用(scan的调用随着CacheSzie的变小而线性增加),减轻hbase压力
  3. 可以提高统计效率,那我之前写过的一个groupby类型的例子来说,大约可以提高50%以上的统计速度。

其他应用场景?

  1. 一个保存着用户信息的表,可以统计每个用户信息(counter job)
  2. 统计最大值,最小值,平均值,参考:https://issues.apache.org/jira/browse/HBASE-1512
  3. 批量删除记录,批量删除某个时间戳的记录 
  4.  

标签:EndPoint,byte,region,public,RPC,coprocessor,HBase,hbase
From: https://blog.51cto.com/u_2650279/6153804

相关文章

  • 减少Symantec Endpoint Protection 12误报的方法
     最近安装了SymantecEndpointProtection12。结果很多文件都遭了殃。。 VB写的报毒。C#写的也不放过。。。Symantec向卡巴小红伞360学习精神可嘉啊。。。 解决......
  • HBase Java API操作数据库
    场景在上面将开发环境搭建起来,要想操作操作数据库除了使用HBaseShell还可以使用JAVAAPI对HBase进行操作。注:关注公众号霸道的程序猿获取编程相关电子书、教程推送与免费......
  • Springboot 系列 (24) - Springboot+HBase 大数据存储(二)| 安装配置 Apache HBase 和 A
    ApacheHBase是Java语言编写的一款Apache开源的NoSQL型数据库,不支持SQL,不支持事务,不支持Join操作,没有表关系。ApacheHBase构建在ApacheHadoop和ApacheZoo......
  • GitHub项目Storm-HBase介绍
    ​​Storm-HBase​​​,该项目是​​TwitterStorm​​​和​​ApacheHBase​​​的结合,它使用HBasecluster作为Storm的Spout数据源,目前只是初步实现,后续会进一步完善。​......
  • 简单介绍一下HBase、Cassandra、Voldemort、Redis、VoltDB、MySQL(转)
    hbase1.简介:HBase–HadoopDatabase,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PCServer上搭建起大规模结构化存储集群2.HBase和R......
  • Hbase报错ERROR: KeeperErrorCode = NoNode for /hbase/master
    场景在上面搭建Hadoop和Hbase的基础上,三台服务器全部重启了。再执行hbase的命令时提示:ERROR:KeeperErrorCode=NoNodefor/hbase/master 注:关注公众号霸道的程序猿获......
  • 4.docker错误Error response from daemon: driver failed programming external conne
    1.docker端口映射或启动容器时报错Errorresponsefromdaemon:driverfailedprogrammingexternalconnectivityonendpointquirky_allenErrorresponsefromdaemon......
  • HBase
    HBase初识HBase为什么列式存储广泛应用于OLAP(列式存储)中传统的事务型数据库OLTP如Orcle、Mysql等都是以行的方式进行存储的分析型数据库OLAP使用的是列式存储,如HBase......
  • ES008-Elasticsearch+hbase整合
    1:设计索引库的settings信息的mappings信息,并把这些配置信息保存到一个配置文件中。1.1viarticles.json{"settings":{"number_of_shards":3,"nu......
  • 什么是HBase
    HbaseHBase即HadoopDatabase,是高可靠、高性能,面向列、可伸缩的分布式存储系统,利用HBase可以对大表数据的读、写达到实时级别。面向列是指的是面向列检索,表的结构与mysql......