首页 > 其他分享 >Hbase 协处理器 RegionObserver

Hbase 协处理器 RegionObserver

时间:2023-05-06 15:04:34浏览次数:40  
标签:RegionObserver NAME Bytes toBytes DOWNLOAD Hbase 协处理器 TYPE final

 

RegionObserver

注:每次更新协处理器方法,最好加上版本更新,否则可能会出现更新失败

  • 协处理器安装-表级别安装
disable 'wechat_article'
alter 'wechat_article' , METHOD =>'table_att','coprocessor'=>'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.6-SNAPSHOT.jar|com.izhonghong.hbase.coprocessor.WechatUserObserver|1001'
enable 'wechat_article'
  • 卸载协处理器:
disable 'wechat_article'
alter 'wechat_article', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable
  • 查看是否成功
hbase(main):002:0> desc 'wechat_article'
Table wechat_article is ENABLED                                                                                                                            
wechat_article, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.14-SNAPSHOT.jar|com.izhonghong.hbase.coproces
sor.WechatUserObserver|1001'}                                                                                                                              
COLUMN FAMILIES DESCRIPTION                                                                                                                                
{NAME => 'fn', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',
 COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}                                         
1 row(s) in 0.3030 seconds

代码开发

代码查看(RegionObserverDemo.java):http://note.youdao.com/noteshare?id=9e8b53139c00840d00308356dda0f203&sub=B241C0BD8E46423EBB48DDC285EC5BC2

  • prePut-插入前处理数据
插入数据前判断download_type类型是否在200-300范围中,如果在直接将用户id插入到另外一个表中,并做关联
@Override
	public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
			final Put put, final WALEdit edit, final Durability durability)
					throws IOException {
		LOG.warn("###########################################");
		// 获取列簇为FAMAILLY_NAME,列名为DOWNLOAD_TYPE的数据

		List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),
				Bytes.toBytes(DOWNLOAD_TYPE));
		//判断列名为DOWNLOAD_TYPE是否包含数据,不包含直接return,退出处理
		if (cells == null || cells.size() == 0) {
			LOG.warn("download_type 字段不存在退出过滤");
			return;
		}
		// 列名为DOWNLOAD_TYPE已包含数据,进行处理
		byte[] aValue = null;
		for (Cell cell : cells) {
			try {
				//DOWNLOAD_TYPE转换为数字
				aValue = CellUtil.cloneValue(cell);
				Integer valueOf = Integer.valueOf(Bytes.toString(aValue));
				if(valueOf>=200 &&valueOf<=300) {
					//如果DOWNLOAD_TYPE范围在200-300之间,获取用户UID信息
					List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),
							Bytes.toBytes(UID));
					//判断用户是否包含UID
					if (list == null || list.size() == 0) {
						LOG.warn("用户BIZ不存在,或者为空");
						return;
					}
					//获取用户UID
					Cell cell2 = list.get(0);
					LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));
					//将用户UID设置为rowkey,原数据的rowkey当作列名,download_type当作列值
					Put put2 = new Put(CellUtil.cloneValue(cell2));
					put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);
					//插入数据到table中
					table.put(put2);
					table.close();
				}else {
					LOG.warn("Download type is not in range.");
				}
			} catch (Exception e1) {
				LOG.error("异常------->>>>>> "+e1.getMessage());
				return ;
			}
		}
	}
  • preGetOp-处理返回结果只对get有效
@Override
	public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
			final Get get, final List<Cell> results) throws IOException { 
		//判断查询的rowkey是否等于test
		if(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) { 
			//新增返回数据fn:time,值为当前时间戳给客户端
			KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));
			results.add(kv);
		}
	}
  • preScannerOpen-在客户端打开新扫描仪之前过滤,此方法会覆盖原有filter
@Override
	public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
			final RegionScanner s) throws IOException {

		//查询rowkey等于test的行进行过滤(显示不等test的数据),会覆盖原有filter
		Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));
		scan.setFilter(filter);
		return s;
	}
  • postScannerNext,对返回结果进行过滤
@Override
	public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
			final List<Result> results, final int limit, final boolean hasMore) throws IOException {
		Result result = null;
		//获取返回结果,如果rowkey等于test则过滤掉
		Iterator<Result> iterator = results.iterator();
		while (iterator.hasNext()) {
			result = iterator.next();
			if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {
				iterator.remove();
				break;
			}
		}
		return hasMore;
	}
  • preDelete,,删除列前进行判断该列是否可以删除
@Override
    public void preDelete(
            final ObserverContext<RegionCoprocessorEnvironment> e,
            final Delete delete, final WALEdit edit, final Durability durability)
            throws IOException {

        // 判断是否对列簇FAMAILLY_NAME操作
        List<Cell> cells = delete.getFamilyCellMap().get(
                Bytes.toBytes(FAMAILLY_NAME));
        if (cells == null || cells.size() == 0) {
       //如果没有对指定列簇操作则跳过判断,直接执行语句
            return;
        }

        byte[] qualifierName = null;
        boolean aDeleteFlg = false;
        for (Cell cell : cells) {
        	//获取带操作的列名
            qualifierName = CellUtil.cloneQualifier(cell);

            // 如果列名等于DOWNLOAD_TYPE ,则抛出异常。注:
            //需查看该值在集群中配置多少,否则重试好几百次性能会很差。
            //conf.set("hbase.client.retries.number", "1");//client失败重试次数
            if (Arrays.equals(qualifierName, Bytes.toBytes(DOWNLOAD_TYPE))) {
                throw new IOException("You cannot delete read-only columns.");
            }

            // 检查是否存在对UID列进行删除
            if (Arrays.equals(qualifierName, Bytes.toBytes(UID))) {
                aDeleteFlg = true;
            }
        }

        // 如果对于UID列有删除,则需要对DOWNLOAD_TYPE列也要删除
        if (aDeleteFlg)
        {
            delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(DOWNLOAD_TYPE));
        }
    }

标签:RegionObserver,NAME,Bytes,toBytes,DOWNLOAD,Hbase,协处理器,TYPE,final
From: https://blog.51cto.com/u_13721902/6249921

相关文章

  • Hbase 协处理器之将数据保存到es (二级索引)
    利用HbaseCoprocessor实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的版本:Hbase:2.1ES:6.3.0一、Coprocessor代码开发协处理器类packagewiki.hadoop.coprocessor;importorg.apache.hadoop.hbase.Cell;importorg.apache.hadoop.hbase.CellUtil;importorg.ap......
  • Springboot 系列 (30) - Springboot+HBase 大数据存储(八)| Springboot Client/Server
    Kerberos(SecureNetworkAuthenticationSystem,网络安全认证系统),是一种网络认证协议,其设计目标是通过密钥系统为Client/Server提供强大的认证服务。该认证过程的实现不依赖于主机操作系统的认证,无需基于的信任,不要求网络上所有主机的物理安全,并假定网络上传送的数据包可以被......
  • 【图文详解】一文全面彻底搞懂HBase、LevelDB、RocksDB等NoSQL背后的存储原理:LSM-tree
    LSM树广泛用于数据存储,例如RocksDB、ApacheAsterixDB、Bigtable、HBase、LevelDB、ApacheAccumulo、SQLite4、Tarantool、WiredTiger、ApacheCassandra、InfluxDB和ScyllaDB等。在这篇文章中,我们将深入探讨LogStructuredMergeTree,又名LSM树:许多高度可扩展的NoSQL分......
  • HBase初步学习与性能测试
    1、HBase定义HBase(HadoopDatabase)是一个分布式、可扩展的NoSQL数据库。基于BigTable,为Hadoop框架当中的结构化数据提供存储服务,是面向列的分布式数据库。这一点与HDFS是不一样的,HDFS是分布式文件系统,管理的是存放在多个硬盘上的数据文件,不支持随机修改,而Hbase管理的是类似于k......
  • HBase
    HBasehttps://www.cnblogs.com/zhh567/p/17275625.html用于存储数十亿行数百万列的大数据的kv数据库,基于Google的bigtable论文。Bigtable是一个稀疏的、分布式的、持久的多维排序map。该map以行键、列键、时间戳作为索引,对应的值为一个序列化的字节数组。数据存储的逻辑架构:......
  • 08.hbase创建表
    [root@ecs-0001bin]#hbaseshell查看所有表hbase(main):001:0>listcreate'SAAS:DWS_MCHT_SHOP_PORTRAYS','BASE','DATA'查看表详情desc 'SAAS:DWS_MCHT_SHOP_PORTRAYS'查看所有命名空间hbase(main):008:0>list_namespaceNAMESPACE       ......
  • Hbase:namespace异常处理
       Hbase集群部署启动后几秒自动退出异常处理,1.hadoop与hbase版本不兼容,会导致此异常。2.log为 org.apache.hadoop.hbase.TableExistsException:hbase:namespace异常,很可能是更换了Hbase的版本过后zookeeper还保留着上一次的Hbase设置,所以造成了冲突。  解决方案:(这里......
  • Spark+HBase数据处理与存储实验部分内容
    0.Scala+Spark+HBase的IDEA环境配置需要下载的内容:Scala、Java,注意两者之间版本是否匹配。环境:Win10,Scala2.10.6,JDK1.7,IDEA2022.3.1创建maven工程。下载Scala插件。右键项目,添加Scala框架支持。项目结果如图所示:scala添加为源目录,下存scala代码添加依赖包。将property的......
  • 轻松应对亿级数据,HBase Scan读取速度翻倍!
    HBase是一种基于Hadoop的分布式列存储数据库,它支持大规模结构化数据的存储和随机访问。在HBase中,扫描(Scan)是一种读取表中数据的方式,它可以返回表中满足条件的一部分或全部数据。本文将介绍HBase中扫描的概念、使用方法和性能优化。1扫描的概念扫描是一种读取表中数据的方式,它可以......
  • MongoDB、Redis、HBase、Cassandra、Elasticsearch、ClickHouse等NoSQL数据库简介及优
    MongoDBMongoDB是一个基于文档的NoSQL数据库,它使用BSON(二进制JSON)格式存储数据。MongoDB支持动态查询,可以轻松地处理非结构化数据。它还支持水平扩展,可以在多个节点上分布数据。优点:灵活性高,支持非结构化数据存储。支持水平扩展,可以在多个节点上分布数据。支持动态查询,可......