RegionObserver
注:每次更新协处理器方法,最好加上版本更新,否则可能会出现更新失败
- 协处理器安装-表级别安装
disable 'wechat_article'
alter 'wechat_article' , METHOD =>'table_att','coprocessor'=>'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.6-SNAPSHOT.jar|com.izhonghong.hbase.coprocessor.WechatUserObserver|1001'
enable 'wechat_article'
- 卸载协处理器:
disable 'wechat_article'
alter 'wechat_article', METHOD => 'table_att_unset', NAME => 'coprocessor$1'
enable
- 查看是否成功
hbase(main):002:0> desc 'wechat_article'
Table wechat_article is ENABLED
wechat_article, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://test111:8020/coprocessor/hbase-coprocessor-0.0.14-SNAPSHOT.jar|com.izhonghong.hbase.coproces
sor.WechatUserObserver|1001'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'fn', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER',
COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
1 row(s) in 0.3030 seconds
代码开发
代码查看(RegionObserverDemo.java):http://note.youdao.com/noteshare?id=9e8b53139c00840d00308356dda0f203&sub=B241C0BD8E46423EBB48DDC285EC5BC2
- prePut-插入前处理数据
插入数据前判断download_type类型是否在200-300范围中,如果在直接将用户id插入到另外一个表中,并做关联
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Put put, final WALEdit edit, final Durability durability)
throws IOException {
LOG.warn("###########################################");
// 获取列簇为FAMAILLY_NAME,列名为DOWNLOAD_TYPE的数据
List<Cell> cells = put.get(Bytes.toBytes(FAMAILLY_NAME),
Bytes.toBytes(DOWNLOAD_TYPE));
//判断列名为DOWNLOAD_TYPE是否包含数据,不包含直接return,退出处理
if (cells == null || cells.size() == 0) {
LOG.warn("download_type 字段不存在退出过滤");
return;
}
// 列名为DOWNLOAD_TYPE已包含数据,进行处理
byte[] aValue = null;
for (Cell cell : cells) {
try {
//DOWNLOAD_TYPE转换为数字
aValue = CellUtil.cloneValue(cell);
Integer valueOf = Integer.valueOf(Bytes.toString(aValue));
if(valueOf>=200 &&valueOf<=300) {
//如果DOWNLOAD_TYPE范围在200-300之间,获取用户UID信息
List<Cell> list = put.get(Bytes.toBytes(FAMAILLY_NAME),
Bytes.toBytes(UID));
//判断用户是否包含UID
if (list == null || list.size() == 0) {
LOG.warn("用户BIZ不存在,或者为空");
return;
}
//获取用户UID
Cell cell2 = list.get(0);
LOG.warn("UID--->"+Bytes.toString(CellUtil.cloneValue(cell2)));
//将用户UID设置为rowkey,原数据的rowkey当作列名,download_type当作列值
Put put2 = new Put(CellUtil.cloneValue(cell2));
put2.addColumn(Bytes.toBytes(FAMAILLY_NAME), put.getRow(), aValue);
//插入数据到table中
table.put(put2);
table.close();
}else {
LOG.warn("Download type is not in range.");
}
} catch (Exception e1) {
LOG.error("异常------->>>>>> "+e1.getMessage());
return ;
}
}
}
- preGetOp-处理返回结果只对get有效
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
//判断查询的rowkey是否等于test
if(Bytes.equals(get.getRow(),Bytes.toBytes("test"))) {
//新增返回数据fn:time,值为当前时间戳给客户端
KeyValue kv = new KeyValue(get.getRow(),Bytes.toBytes("fn"),Bytes.toBytes("time"),1535555555555l ,Bytes.toBytes(String.valueOf(System.currentTimeMillis())));
results.add(kv);
}
}
- preScannerOpen-在客户端打开新扫描仪之前过滤,此方法会覆盖原有filter
@Override
public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e, final Scan scan,
final RegionScanner s) throws IOException {
//查询rowkey等于test的行进行过滤(显示不等test的数据),会覆盖原有filter
Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(Bytes.toBytes("test")));
scan.setFilter(filter);
return s;
}
- postScannerNext,对返回结果进行过滤
@Override
public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> e, final InternalScanner s,
final List<Result> results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
//获取返回结果,如果rowkey等于test则过滤掉
Iterator<Result> iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), Bytes.toBytes("test2"))) {
iterator.remove();
break;
}
}
return hasMore;
}
- preDelete,,删除列前进行判断该列是否可以删除
@Override
public void preDelete(
final ObserverContext<RegionCoprocessorEnvironment> e,
final Delete delete, final WALEdit edit, final Durability durability)
throws IOException {
// 判断是否对列簇FAMAILLY_NAME操作
List<Cell> cells = delete.getFamilyCellMap().get(
Bytes.toBytes(FAMAILLY_NAME));
if (cells == null || cells.size() == 0) {
//如果没有对指定列簇操作则跳过判断,直接执行语句
return;
}
byte[] qualifierName = null;
boolean aDeleteFlg = false;
for (Cell cell : cells) {
//获取带操作的列名
qualifierName = CellUtil.cloneQualifier(cell);
// 如果列名等于DOWNLOAD_TYPE ,则抛出异常。注:
//需查看该值在集群中配置多少,否则重试好几百次性能会很差。
//conf.set("hbase.client.retries.number", "1");//client失败重试次数
if (Arrays.equals(qualifierName, Bytes.toBytes(DOWNLOAD_TYPE))) {
throw new IOException("You cannot delete read-only columns.");
}
// 检查是否存在对UID列进行删除
if (Arrays.equals(qualifierName, Bytes.toBytes(UID))) {
aDeleteFlg = true;
}
}
// 如果对于UID列有删除,则需要对DOWNLOAD_TYPE列也要删除
if (aDeleteFlg)
{
delete.addColumn(Bytes.toBytes(FAMAILLY_NAME), Bytes.toBytes(DOWNLOAD_TYPE));
}
}
标签:RegionObserver,NAME,Bytes,toBytes,DOWNLOAD,Hbase,协处理器,TYPE,final
From: https://blog.51cto.com/u_13721902/6249921