首页 > 其他分享 >ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!因此,ES

ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!因此,ES

时间:2023-05-31 15:06:38浏览次数:49  
标签:netty sync bulk field client new bulkRequest ES

ES transport client底层是netty实现,netty本质上是异步方式,但是netty自身可以使用sync或者await(future超时机制)来实现类似同步调用!

因此,ES transport client可以同步调用也可以异步(不过底层的socket必然是异步实现)。

发送端例子

对于java client的数据发送(这里以bulk为例),写过的人都知道,其实是很简单的,因为大部分事情都已经被client做掉了,那么我们先给出例子感知一下:

client初始化

Settings settings = Settings.settingsBuilder()
            .put("cluster.name", "myClusterName")
            .put("client.transport.sniff", true).build();
client=new TransportClient.builder().settings(settings).build()
    .addTransportAddress(new InetSocketTransportAddress("host1",9300))
    .addTransportAddress(new InetSocketTransportAddress("host2",9300));

bulk数据发送

对于数据的发送ES提供了两种方式:

第一种bulk api:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

可以看到这种方式是由client端自己添加数据,然后调用BulkResponse bulkResponse = bulkRequest.get();来完成数据的发送。

第二种叫做Bulk Processor:

import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .build();

初始化bulk processor之后,客户端只需要往bulkProcessor添加数据即可bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));,你可以先配置好bulk的size、interval等,其他的事情就交给processor自己去做吧。

两种方式各有利弊,第一种要自己控制bulk size和interval,但是有利于对发送失败的处理;而第二种简单易用,用户只管add数据就好,但是对于使用回调函数来处理异常会不那么方便,如何选择就看使用场景的了。

 

部分内容摘自:http://www.opscoder.info/es_javaclient.html

标签:netty,sync,bulk,field,client,new,bulkRequest,ES
From: https://blog.51cto.com/u_11908275/6386869

相关文章

  • sklearn的train_test_split,果然很好用啊!
    sklearn的train_test_split train_test_split函数用于将矩阵随机划分为训练子集和测试子集,并返回划分好的训练集测试集样本和训练集测试集标签。格式:X_train,X_test,y_train,y_test=cross_validation.train_test_split(train_data,train_target,test_size=0.3,random_state=0)......
  • bitsandbytes通过源码安装后调用报错AttributeError: module 'bitsandbytes.nn' has n
    通过github下载的源码使用pipinstall-e.方式安装的时候会出现题目中的问题。这个时候先卸载掉bitsandbytes,然后重新使用pipinstallbitsandbytes安装,这种方式直接从仓库中安装,问题就解决了。目前尚不清楚问题出现原因,虽然两种方式的安装版本都是0.38.1......
  • uniapp onShareAppMessage里面请求后调分享(微信小程序)
    onShareAppMessage(){constpromise=newPromise(resolve=>{request({api:'请求名',method:'POST',data:{"data":{}}}).then(res=>{resolve({......
  • 前端 React + vite + Typescript 后端 java + springmvc + jwt 跨域 解决方案
    首先后端配置跨域:web.xml文件: <!--配置跨域--><filter><filter-name>header</filter-name><filter-class>org.zhiyi.config.Cross</filter-class></filter><filter-mapping><......
  • 根据视频帧率,使用requestAnimationFrame播放动画
    当时使用webRTC进行视频通话时,通常会设置视频流的帧率,行业内一般默认帧数为15或者30,一般每秒只需要渲染15或30次当要需要对本地视频或者远端视频流进行特殊处理时,通常会使用requestAnimationFrame方法进行再次渲染requestAnimationFrame,这个方法是用来在页面重绘之前,通知浏览器......
  • Pytest
    pytest1.插件pytest-html生成html格式的自动化测试报告pytest--html=report/report.htmltest_sdk2_1.pypytest-xdist测试用例分布式执行,多CPU分发pytest-ordering用于改变测试用例的执行顺寻pytest-rerunfailures用例失败后重跑allure-pytest用于生成美观的测试报告......
  • 使用wireshark抓包RTP流
    最近笔者在疯狂使用wireshar抓包RTP包进行分析,偶然发现别人的wireshark可以把RTP协议进一步解析成H.264协议,就很馋。经过一番了解,发现通过如下操作可以实现想要的效果。一开始笔者的wireshark的效果是如下图这样子的 但是别人是如下图这样子的, 很明显可以......
  • pip install时遇到subprocess-exited-with-error错误
    当我使用容器在外网下载的peft源码包中使用pipinstall-e.命令进行安装时,安装顺利进行。而当我在公司内网使用相同容器进行安装时,报出题目中的错误,因为是离线安装,所以我猜测是不是网络问题,因为这两个区别就是一个联网一个没有联网,于是我在内网pipinstall-e.命令后面加上了......
  • Could not autowire. No beans of 'AddressBookService' type found.
    错误:错误原因:Service实现类未继承Service接口解决方法: ......
  • SimpleAdmin手摸手教学之:基于Ant Design Tree组件实现树形结构数据的异步加载
    一、说明当有一个树形结构的数据有非常多个节点的时候,一次性加载所有节点会显得过于臃肿,可能会对性能造成影响,正好AntDesign的树(Tree)组件支持异步加载,于是我就想把异步加载封装为一个组件,可以减少接口数据返回,点击展开节点,动态加载数据。非常好用!二、前端实现需要接收一些......