首页 > 其他分享 >Hbase 协处理器之将数据保存到es (二级索引)

Hbase 协处理器之将数据保存到es (二级索引)

时间:2023-05-06 15:04:15浏览次数:46  
标签:java hadoop import apache org Hbase 协处理器 hbase es


利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的

版本:

Hbase: 2.1

ES:6.3.0

一、Coprocessor代码开发

协处理器类

package wiki.hadoop.coprocessor;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;

import wiki.hadoop.es.ESClient;
import wiki.hadoop.es.ElasticSearchBulkOperator;

import java.io.IOException;
import java.util.*;

public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {

    private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);

    private String index = "user_test";
    private String type = "user_test_type";
    public Optional<RegionObserver> getRegionObserver() {
        return Optional.of(this);
    }

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        // init ES client
        ESClient.initEsClient();
        LOG.info("****init start*****");
    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        ESClient.closeEsClient();
        // shutdown time task
        ElasticSearchBulkOperator.shutdownScheduEx();
        LOG.info("****end*****");
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(put.getRow());
        try {
            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String, Object> infoJson = new HashMap<>();
            Map<String, Object> json = new HashMap<>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
                for (Cell cell : entry.getValue()) {
                    String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                    String value = Bytes.toString(CellUtil.cloneValue(cell));
                    json.put(key, value);
                }
            }
            // set hbase family to es
            infoJson.put("info", json);
            LOG.info(json.toString());
            ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));
            LOG.info("**** postPut success*****");
        } catch (Exception ex) {
            LOG.error("observer put  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
        }
    }
    @Override
    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
        String indexId = new String(delete.getRow());
        try {
            ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));
            LOG.info("**** postDelete success*****");
        } catch (Exception ex) {
            LOG.error(ex);
            LOG.error("observer delete  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());

        }
    }
}

es工具类

package wiki.hadoop.es;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class ElasticSearchBulkOperator {

    private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);

    private static final int MAX_BULK_COUNT = 10000;

    private static BulkRequestBuilder bulkRequestBuilder = null;

    private static final Lock commitLock = new ReentrantLock();

    private static ScheduledExecutorService scheduledExecutorService = null;
    static {
        // 初始化  bulkRequestBuilder
        bulkRequestBuilder = ESClient.client.prepareBulk();
        bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

        // 初始化线程池大小为1
        scheduledExecutorService = Executors.newScheduledThreadPool(1);

       //创建一个Runnable对象,提交待写入的数据,并使用commitLock锁保证线程安全
        final Runnable beeper = () -> {
            commitLock.lock();
            try {
            	LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
            	//提交数据至es
                bulkRequest(0);
                LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
            } catch (Exception ex) {
                System.out.println(ex.getMessage());
            } finally {
                commitLock.unlock();
            }
        };

        //初始化延迟10s执行 runnable方法,后期每隔30s执行一次
        scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);

    }
    public static void shutdownScheduEx() {
        if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
            scheduledExecutorService.shutdown();
        }
    }
    private static void bulkRequest(int threshold) {
        if (bulkRequestBuilder.numberOfActions() > threshold) {
            BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
            if (!bulkItemResponse.hasFailures()) {
                bulkRequestBuilder = ESClient.client.prepareBulk();
            }
        }
    }

    /**
     * add update builder to bulk
     * use commitLock to protected bulk as thread-save
     * @param builder
     */
    public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }

    /**
     * add delete builder to bulk
     * use commitLock to protected bulk as thread-save
     *
     * @param builder
     */
    public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
        commitLock.lock();
        try {
            bulkRequestBuilder.add(builder);
            bulkRequest(MAX_BULK_COUNT);
        } catch (Exception ex) {
            LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());
        } finally {
            commitLock.unlock();
        }
    }
}
package wiki.hadoop.es;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;


import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * ES Cleint class
 */
public class ESClient {

	public static Client client;
	
	private static final Log log = LogFactory.getLog(ESClient.class);
	/**
	 * init ES client
	 */
	public static void initEsClient() throws UnknownHostException {
		log.info("初始化es连接开始");
		
		System.setProperty("es.set.netty.runtime.available.processors", "false");
		
		Settings esSettings = Settings.builder()
				.put("cluster.name", "log_cluster")//设置ES实例的名称
				.put("client.transport.sniff", true)
				.build();
		
		client = new  PreBuiltTransportClient(esSettings)
				.addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300))
				.addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300))
				.addTransportAddress(new TransportAddress(InetAddress.getByName("host3"), 9300));
		
		log.info("初始化es连接完成");

	}

	/**
	 * Close ES client
	 */
	public static void closeEsClient() {
		client.close();
		log.info("es连接关闭");
	}
}

二、ES创建索引

简单创建个测试的

PUT user_test
{
  "settings": {
    "number_of_replicas": 1
    , "number_of_shards": 5
  }
}


PUT user_test/_mapping/user_test_type
{
  
  "user_test_type":{
    "properties":{
      "name":{"type":"text"},
      "city":{"type":"text"},
      "province":{"type":"text"},
      "followers_count":{"type":"long"},
	  "friends_count":{"type":"long"},
	  "statuses_count":{"type":"long"}
    }
  }
}

三、协处理器安装

将jar包传至HDFS中,执行命令进行协处理器安装

disable 'es_test'
alter 'es_test' , METHOD =>'table_att','coprocessor'=>'/es-coprocessor-0.0.4-jar-with-dependencies.jar|wiki.hadoop.coprocessor.HbaseDataSyncEsObserver|1001'
enable 'es_test'
desc 'es_test'

 四、添加数据进行测试

在kibana中进行查询es数据发现成功插入,在regionServer中查询日志,也发现了我们代码中打印的日志。

 

五、可能遇到的问题

5.1 提示netty jar包冲突

2020-08-12 09:24:55,842 INFO wiki.hadoop.es.ESClient: 初始化es连接开始
2020-08-12 09:24:56,145 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: HBase metrics system started
2020-08-12 09:24:56,260 ERROR org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost: Failed to load coprocessor wiki.hadoop.coprocessor.HbaseDataSyncEsObserver
java.lang.NoSuchMethodError: io.netty.util.AttributeKey.newInstance(Ljava/lang/String;)Lio/netty/util/AttributeKey;
        at org.elasticsearch.transport.netty4.Netty4Transport.<clinit>(Netty4Transport.java:232)
        at org.elasticsearch.transport.Netty4Plugin.getSettings(Netty4Plugin.java:56)
        at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)
        at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
        at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)
        at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:144)
        at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:280)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
        at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
        at wiki.hadoop.es.ESClient.initEsClient(ESClient.java:35)
        at wiki.hadoop.coprocessor.HbaseDataSyncEsObserver.start(HbaseDataSyncEsObserver.java:85)
        at org.apache.hadoop.hbase.coprocessor.BaseEnvironment.startup(BaseEnvironment.java:72)
        at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.checkAndLoadInstance(CoprocessorHost.java:263)
        at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:226)
        at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:185)
        at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.loadTableCoprocessors(RegionCoprocessorHost.java:378)
        at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.<init>(RegionCoprocessorHost.java:274)
        at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:800)
        at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:702)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.hbase.regionserver.HRegion.newHRegion(HRegion.java:6785)
        at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:6985)

查看后发现hbase-server中包含netty包,我们直接把他过滤掉,他和es的netty冲突了

<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase-server</artifactId>
			<exclusions>
				<exclusion>
					<groupId>io.netty</groupId>
					<artifactId>netty-all</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<exclusions>
				<exclusion>
					<groupId>org.apache.htrace</groupId>
					<artifactId>htrace-core</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch</groupId>
			<artifactId>elasticsearch</artifactId>
		</dependency>
		<dependency>
			<groupId>org.elasticsearch.client</groupId>
			<artifactId>transport</artifactId>
		</dependency>
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
		</dependency>

重新打包安装,查看hbase regionserver 中发现运行正常。去es中查询也运行正常

2020-08-12 16:08:28,215 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=跟我一起学知识}
2020-08-12 16:08:28,216 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=教你做点心}
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****

 

具体代码

https://github.com/zhangshenghang/hbase-coprocessors

 

标签:java,hadoop,import,apache,org,Hbase,协处理器,hbase,es
From: https://blog.51cto.com/u_13721902/6249924

相关文章

  • Hive表 Parquet压缩 , Gzip,Snappy,uncompressed 效果对比
     创建两张表,通过一种是parquet,一种使用parquetsnappy压缩创建表使用snappyCREATEEXTERNALTABLEIFNOTEXISTStableName(xxxstring)partitionedby(pt_xvcstring)ROWFORMATDELIMITEDFIELDSTERMINATEDBY'\001'STOREDASPARQUETTBLPROPERTIES('parquet.compre......
  • org.apache.maven.archetypes:maven-archetype-quickstart 这个是干啥的
    org.apache.maven.archetypes:maven-archetype-quickstart是一个Maven项目的骨架(archetype),用于快速创建一个简单的Maven项目。当我们在命令行执行:bashmvnarchetype:generate-DgroupId=com.example-DartifactId=my-project-Darchetype......
  • SpringBoot 自动扫描第三方包及spring.factories失效的问题
    为什么会找不到Spring依赖注入就是要让spring找到要注入的类并且识别到了@Component、@Service等注解。1.当在开发的第三方包里写明了@Component、@Service等等2.引入了包,不论第三方库的引入,还是本地jar。总之是要引入到工程的这时候还加入不到IOC容器,那就说明Spri......
  • Controllable Guarantees for Fair Outcomes via Contrastive Information Estimation
    目录概符合说明Motivation优化目标代码GuptaU.,FerberA.M.,DilkinaB.andSteegG.V.Controllableguaranteesforfairoutcomesviacontrastiveinformationestimation.AAAI,2021.概本文提出了一种类似InformationBottleneck的方式用于保证两个群体的fairn......
  • 一统天下 flutter - 存储: shared_preferences - 用于操作 android 的 SharedPreferen
    源码https://github.com/webabcd/flutter_demo作者webabcd一统天下flutter-存储:shared_preferences-用于操作android的SharedPreferences,ios的NSUserDefaults,web的LocalStorage示例如下:lib\storage\shared_preferences.dart/**shared_preferences......
  • typescript
    什么是typescripttypescript是微软开发的编程语言,它的后缀名是ts,通过编译可以将ts文件编译成ts文件,它定义了一些新语法使得开发起来可维护性更高也更好用,ts与js的区别如下图所示:在进行angular开发时,开发者不需要引入ts依赖,angular已经导入了ts依赖。快速上手ts编译tschell......
  • useeffect下调用`window.onresize`不生效的解决办法
    组件化开发,多个子组件多次调用onresize使主页面的onresize无法生效解决办法时使用addEventListener添加onresize函数useeffect(()=>{window.addEventListener('resize',function(){//当浏览器窗口大小发生变化时,触发的functionfn()console.log('1......
  • Module build failed (from ./node_modules/css-loader/dist/cjs.js): TypeError: thi
    Modulebuildfailed(from./node_modules/css-loader/dist/cjs.js):TypeError:this.getOptionsisnotafunctionModulebuildfailed(from./node_modules/css-loader/dist/cjs.js):TypeError:this.getOptionsisnotafunction 用了各种办法,没有解决问题,直接把node_m......
  • aggressive
    #aggressive#手中无剑和有剑不用是两码事,一种是软弱别无选择,一种是善良。我们必须利用手里的大棒给他人威慑,让一些“伥鬼”见之畏惧,同时可以保护好自己。社会本身就是分层的,从几千万前还是一只猿人就如此,你会因为自己的长相,才能,社交能力而身处于不同位置,不断往上爬的能力一直刻在......
  • Struts2----中使用ValueStack、ActionContext、ServletContext、request、session等
     声明:本文参考Struts2版本为2.3.1.2,内容仅供参考,限于笔者水平有限,难免有所疏漏,望您能友善指出。本文发表于ITEYE,谢绝转载。1.ValueStack  ValueStack在中文版的《Struts2深入浅出》一书中译作“值栈”。其本身数据结构是一个栈,使用者可以把一些对象(又称作bean)存入值栈中,然后......