首页 > 其他分享 >Elasticsearch数据同步优化

Elasticsearch数据同步优化

时间:2024-02-20 14:34:52浏览次数:33  
标签:同步 批量 数据 request param executionId Elasticsearch BulkProcessor 优化

Elasticsearch数据同步优化

背景

为了满足项目需求,需要将大量数据的数据写入到ES进行检索,预估数据量是40亿左右,目前需要同步进去的是2亿左右。

ES集群配置

  • 三台128G的国产服务器 国产linux系统 CPU主频低的拉跨
  • JDK8的版本
  • 机械硬盘

遇到的问题

后端使用Java调用es的bulk api进行数据同步,数据同步特别慢,在测试环境做同步的时候速度是很快的,但是在正式环境下速度出奇的慢。

直接上优化方案

  • 升级JDK版本 将JDK的版本升级到JDK17(中途先升级到JDK11的),升级之后速度提升明显 JDK8的垃圾回收器到底是比不过JDK17
  • ES索引的副本数据在数据同步阶段设置为0 多个副本就意味着要多写几份数据
  • ES索引的分片数量设置为3 和集群数量一致
  • 调整Java调用ES bulk api的代码 使用异步批量调用的方式,后面会详细介绍
    经过一阵鼓捣 数据同步速度极大提升,

Java调用ES bulk api

首先es是有一个bulk的批量接口的,一般来说做批量数据同步的时候是使用的这个api,实际上还有一种更加灵活的api,在ES7里面是BulkProcessor这个类,在ES8里面是BulkIngester类,两者功能基本一致。
先说一下这两个api的工作原理
bulk api 接收到批量数据之后 会立即将数据提交给es集群,es集群如果在使用默认写入配置的情况下,会很快将数据进行落盘的,数据落盘的这个过程是比较耗时的。
BulkProcessor BulkIngester 这两个类中是可以动态配置数据提交给es的机制,总体来说就是 数据会在内存中暂存起来,等数据的指标达到我们配置的值的时候 api就会异步的将数据提交给es集群,从而减少es集群数据落盘的次数

代码 ES7版本

    @Bean
    public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                LOGGER.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (!response.hasFailures()) {
                    LOGGER.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                } else {
                    BulkItemResponse[] items = response.getItems();
                    for (BulkItemResponse item : items) {
                        if (item.isFailed()) {
                            LOGGER.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                            break;
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                throwable.printStackTrace();
            }
        };

        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener);
        //到达指定条数时刷新  -1则禁用该配置
        builder.setBulkActions(bulkActions);
        //内存到达指定大小时刷新
        builder.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB));
        //设置的刷新间隔 单位是s  -1则禁用该配置
        builder.setFlushInterval(TimeValue.timeValueSeconds(flushInterval));
        //设置允许执行的并发请求数
        builder.setConcurrentRequests(concurrentRequests);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), maxNumberOfRetries));
        return builder.build();
    }

这里将该类配置为spring bean,在使用的时候直接注入使用即可,剩下的交给BulkProcessor即可

    @Resource
    private BulkProcessor bulkProcessor;

    IndexRequest request = new IndexRequest();
    request.id(id);
    request.index(tableToEs.getIndexName());
    request.source(JSON.toJSONString(esTopicCollectModel, serializeConfig), XContentType.JSON);
    bulkProcessor.add(request);

代码 ES8版本

    @Bean
    public BulkIngester<String> bulkIngester() throws Exception {

        BulkListener<String> listener = new BulkListener<String>() {

            /**
             *
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             */
            @Override
            public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
                LOGGER.info("【beforeBulk】批次[{}】 携带 【{}】 请求数量", executionId, contexts.size());
            }

            /**
             * 批量请求之后调用
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             * @param response 返回值
             */
            @Override
            public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
                LOGGER.info("【afterBulk】批次[{}】 提交数据量【{}】 提交结果【{}】", executionId,contexts.size(),response.errors()?"失败":"成功");
            }

            /**
             * 当批量请求无法发送到Elasticsearch时调用
             * @param executionId 此请求的id
             * @param request 将发送的批量请求
             * @param contexts 数据集
             * @param failure 异常信息
             */
            @Override
            public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
                LOGGER.error("Bulk request " + executionId + " failed", failure);
            }
        };

        ElasticsearchClient elasticsearchClient = elasticsearchClient();

        BulkIngester<String> ingester = BulkIngester.of(b -> b
                .client(elasticsearchClient)
                .maxOperations(-1)
                .maxSize(bulkSize)
                .maxConcurrentRequests(concurrentRequests)
                .flushInterval(flushInterval, TimeUnit.SECONDS)
                .listener(listener)
        );

        return ingester;
    }

使用方式

    @Resource
    private BulkIngester<String> bulkIngester;

    IndexOperation<EsTopicCollectModel> indexOperation = new IndexOperation.Builder<EsTopicCollectModel>()
        // 索引
        .index(tableToEs.getIndexName())
        // 文档id
        .id(tableToEs.getTableName() + "_" + data.getOrDefault(StrUtil.toCamelCase(tableToEs.getPkColumn()), ""))
        // 文档内容
        .document(esTopicCollectModel)
        .build();

    BulkOperation bulkOperation = new BulkOperation.Builder()
        .index(indexOperation)
        .build();

    bulkIngester.add(bulkOperation);
```java

标签:同步,批量,数据,request,param,executionId,Elasticsearch,BulkProcessor,优化
From: https://www.cnblogs.com/sxxs/p/18022972

相关文章

  • BOSHIDA DC电源模块在太阳能系统中的应用及优化
    BOSHIDADC电源模块在太阳能系统中的应用及优化BOSHIDADC电源模块在太阳能系统中有广泛的应用,主要用于转换太阳能电池板产生的直流电能为可用的电源。太阳能系统的优化主要集中在提高转换效率、稳定输出电压和延长电源模块的寿命等方面。 在太阳能系统中,DC电源模块通常用于......
  • 15期中同步测试卷a卷
    2:二元可微概念判断回顾12:原来f(xy)对x求导和对y求导f()的形式都是一样的,不区分x一撇和y一撇13:方向导数为各分量xyz求导再乘以对应方向余弦;定义法之间把z变成z+▲z,不用关心z是什么,代完之后再把z具体的值带进去(正负号弄反了)14:极坐标,圆的切线(不是四分之pai,是二分之pai)15:三角函数碰上......
  • 【DBSyncer】用于数据同步的工具尝试
    1 前言DBSyncer,不知道大家用没用过,我之前看过,今儿空了,来试试。地址:https://gitee.com/ghi/dbsyncer地址如上,主要是用于数据库层面的同步,废话不多说,我简单玩了玩先,还真不错,空了我再细看看。2 数据同步例子2.1 定义目标这是我两个数据库的两张表,我们来看看如何用这个工......
  • 常规代码性能优化的总结
    今天同事发开中遇到了一个代码性能优化的问题,原本需求是:从一个数据库中查询某个表数据,存放到datatable中,然后遍历datatable,看这些数据在另一个数据库的表中是否存在,存在的话就要更新,不存在就要插入。就这个需求本身来说很简单,但是随着数据量的增大,之前通过循环遍历的方......
  • 关于代码性能优化的总结
    今天同事发开中遇到了一个代码性能优化的问题,原本需求是:从一个数据库中查询某个表数据,存放到datatable中,然后遍历datatable,看这些数据在另一个数据库的表中是否存在,存在的话就要更新,不存在就要插入。就这个需求本身来说很简单,但是随着数据量的增大,之前通过循环遍历的方式......
  • Elasticsearch
    1,Elasticsearch简介1)分布式实时文件存储,可以将每一个字段都编入索引,使其可以被检索2)可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据3)Elasticsearch不是什么新技术,主要是将全文检索、数据分析以及分布式技术,合并在了一起,才形成了独一无二的ES2,基本概念1)Node(节点):Ela......
  • CentOS上如何配置手动和定时任务自动进行时间同步
    场景Linux(Centos)上使用crontab实现定时任务(定时执行脚本):https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/134576630Winserver上如何配置和开启NTP客户端进行时间同步:https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/135220767在Centos上如何进行......
  • Unity UGUI的DrawCall优化
    UnityUGUI是一种强大的用户界面设计工具,它可以帮助开发者快速创建各种界面元素,从按钮和文本到滑块和面板等。然而,在使用UGUI时,一个常见的性能瓶颈就是DrawCall过多导致的性能下降。在本文中,我们将深入探讨UGUI的DrawCall优化方法,并给出对应的代码实现。什么是DrawCall?在Unity......
  • 学习笔记#5:单调队列优化&斜率优化
    学习笔记#5:单调队列优化&斜率优化单调队列首先要搞懂什么是单调队列。单调队列是一种求区间最值问题的一种方式,与其他RSQ问题的求解方法不同的是,它更善于解决滑动窗口式的RSQ问题,一般来说,假设我们要维护最大值,则需维护一个单调递减的队列,这样队首最大,每次取队首即可。而当......
  • 【ElasticSearch】入门-ES的选主流程
    一、ES集群模式ES使用主从模式,因为ES的典型场景中的另一个简化是集群中没有那么多节点。通常节点数量远远小于单个节点能够维护的连接数,并且网络环境并不需要经常处理节点的加入和离开。1、选举算法ES中主要使用Bully算法作为选举算法(优点是易于实现)Bully算法:假定所有的节......