利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的
版本:
Hbase: 2.1
ES:6.3.0
一、Coprocessor代码开发
协处理器类
package wiki.hadoop.coprocessor;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;
import wiki.hadoop.es.ESClient;
import wiki.hadoop.es.ElasticSearchBulkOperator;
import java.io.IOException;
import java.util.*;
public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {
private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);
private String index = "user_test";
private String type = "user_test_type";
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
// init ES client
ESClient.initEsClient();
LOG.info("****init start*****");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
ESClient.closeEsClient();
// shutdown time task
ElasticSearchBulkOperator.shutdownScheduEx();
LOG.info("****end*****");
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(put.getRow());
try {
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> infoJson = new HashMap<>();
Map<String, Object> json = new HashMap<>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
// set hbase family to es
infoJson.put("info", json);
LOG.info(json.toString());
ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));
LOG.info("**** postPut success*****");
} catch (Exception ex) {
LOG.error("observer put a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(delete.getRow());
try {
ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));
LOG.info("**** postDelete success*****");
} catch (Exception ex) {
LOG.error(ex);
LOG.error("observer delete a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
}
es工具类
package wiki.hadoop.es;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ElasticSearchBulkOperator {
private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);
private static final int MAX_BULK_COUNT = 10000;
private static BulkRequestBuilder bulkRequestBuilder = null;
private static final Lock commitLock = new ReentrantLock();
private static ScheduledExecutorService scheduledExecutorService = null;
static {
// 初始化 bulkRequestBuilder
bulkRequestBuilder = ESClient.client.prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// 初始化线程池大小为1
scheduledExecutorService = Executors.newScheduledThreadPool(1);
//创建一个Runnable对象,提交待写入的数据,并使用commitLock锁保证线程安全
final Runnable beeper = () -> {
commitLock.lock();
try {
LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
//提交数据至es
bulkRequest(0);
LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally {
commitLock.unlock();
}
};
//初始化延迟10s执行 runnable方法,后期每隔30s执行一次
scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);
}
public static void shutdownScheduEx() {
if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
}
}
private static void bulkRequest(int threshold) {
if (bulkRequestBuilder.numberOfActions() > threshold) {
BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
if (!bulkItemResponse.hasFailures()) {
bulkRequestBuilder = ESClient.client.prepareBulk();
}
}
}
/**
* add update builder to bulk
* use commitLock to protected bulk as thread-save
* @param builder
*/
public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
/**
* add delete builder to bulk
* use commitLock to protected bulk as thread-save
*
* @param builder
*/
public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
package wiki.hadoop.es;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
/**
* ES Cleint class
*/
public class ESClient {
public static Client client;
private static final Log log = LogFactory.getLog(ESClient.class);
/**
* init ES client
*/
public static void initEsClient() throws UnknownHostException {
log.info("初始化es连接开始");
System.setProperty("es.set.netty.runtime.available.processors", "false");
Settings esSettings = Settings.builder()
.put("cluster.name", "log_cluster")//设置ES实例的名称
.put("client.transport.sniff", true)
.build();
client = new PreBuiltTransportClient(esSettings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("host3"), 9300));
log.info("初始化es连接完成");
}
/**
* Close ES client
*/
public static void closeEsClient() {
client.close();
log.info("es连接关闭");
}
}
二、ES创建索引
简单创建个测试的
PUT user_test
{
"settings": {
"number_of_replicas": 1
, "number_of_shards": 5
}
}
PUT user_test/_mapping/user_test_type
{
"user_test_type":{
"properties":{
"name":{"type":"text"},
"city":{"type":"text"},
"province":{"type":"text"},
"followers_count":{"type":"long"},
"friends_count":{"type":"long"},
"statuses_count":{"type":"long"}
}
}
}
三、协处理器安装
将jar包传至HDFS中,执行命令进行协处理器安装
disable 'es_test'
alter 'es_test' , METHOD =>'table_att','coprocessor'=>'/es-coprocessor-0.0.4-jar-with-dependencies.jar|wiki.hadoop.coprocessor.HbaseDataSyncEsObserver|1001'
enable 'es_test'
desc 'es_test'
四、添加数据进行测试
在kibana中进行查询es数据发现成功插入,在regionServer中查询日志,也发现了我们代码中打印的日志。
五、可能遇到的问题
5.1 提示netty jar包冲突
2020-08-12 09:24:55,842 INFO wiki.hadoop.es.ESClient: 初始化es连接开始
2020-08-12 09:24:56,145 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: HBase metrics system started
2020-08-12 09:24:56,260 ERROR org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost: Failed to load coprocessor wiki.hadoop.coprocessor.HbaseDataSyncEsObserver
java.lang.NoSuchMethodError: io.netty.util.AttributeKey.newInstance(Ljava/lang/String;)Lio/netty/util/AttributeKey;
at org.elasticsearch.transport.netty4.Netty4Transport.<clinit>(Netty4Transport.java:232)
at org.elasticsearch.transport.Netty4Plugin.getSettings(Netty4Plugin.java:56)
at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)
at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)
at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:144)
at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:280)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
at wiki.hadoop.es.ESClient.initEsClient(ESClient.java:35)
at wiki.hadoop.coprocessor.HbaseDataSyncEsObserver.start(HbaseDataSyncEsObserver.java:85)
at org.apache.hadoop.hbase.coprocessor.BaseEnvironment.startup(BaseEnvironment.java:72)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.checkAndLoadInstance(CoprocessorHost.java:263)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:226)
at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:185)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.loadTableCoprocessors(RegionCoprocessorHost.java:378)
at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.<init>(RegionCoprocessorHost.java:274)
at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:800)
at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:702)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.regionserver.HRegion.newHRegion(HRegion.java:6785)
at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:6985)
查看后发现hbase-server中包含netty包,我们直接把他过滤掉,他和es的netty冲突了
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
</dependency>
重新打包安装,查看hbase regionserver 中发现运行正常。去es中查询也运行正常
2020-08-12 16:08:28,215 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=跟我一起学知识}
2020-08-12 16:08:28,216 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=教你做点心}
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
具体代码
标签:java,hadoop,import,apache,org,Hbase,协处理器,hbase,es From: https://blog.51cto.com/u_13721902/6249924