首页 > 其他分享 >ES7.3版本,批量添加,索引创建,索引判断

ES7.3版本,批量添加,索引创建,索引判断

时间:2024-01-10 09:35:17浏览次数:33  
标签:ES7.3 return 批量 request 索引 elasticsearch import org logger

import com.link.risk.model.RiskTradeDetail;
import com.link.util.BeanBuilder;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 *
 * @className Es工具类
 * @author ysq
 * @date 2018-08-29
 */
public class EsUtils {

    protected static Logger logger = LoggerFactory.getLogger(EsUtils.class);

    private RestHighLevelClient restHighLevelClient;

    public EsUtils(RestHighLevelClient restHighLevelClient){
        this.restHighLevelClient = restHighLevelClient;
    }


    /**
     * 判断索引是否存在
     * @param restHighLevelClient
     * @param esIndex
     * @return true表示存在,false表示不存在
     */
    public Boolean checkIndexExists(String esIndex){
        try {
            return restHighLevelClient.indices().exists(new GetIndexRequest(esIndex), RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * ES 批量插入工具方法
     * @param restHighLevelClient
     * @return BulkProcessor
     */
    public BulkProcessor createBulkProcessor() {

        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                logger.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (!response.hasFailures()) {
                    logger.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                } else {
                    BulkItemResponse[] items = response.getItems();
                    for (BulkItemResponse item : items) {
                        if (item.isFailed()) {
                            logger.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                            break;
                        }
                    }
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request,Throwable failure) {
                List<DocWriteRequest<?>> requests = request.requests();
                List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                logger.error("【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
            }
        };

        BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
            restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
        }), listener);
        //到达10000条时刷新
        builder.setBulkActions(10000);
        //内存到达8M时刷新
        builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
        //设置的刷新间隔10s
        builder.setFlushInterval(TimeValue.timeValueSeconds(10));
        //设置允许执行的并发请求数。
        builder.setConcurrentRequests(8);
        //设置重试策略
        builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
        return builder.build();
    }

    /**
     * 批量添加数据
     * @param list
     * @param esIndex
     * @param <T>
     * @return
     * @throws Exception
     */
    public <T> Integer instBulkList(List<T> list,String esIndex) throws Exception {

        //确认索引存在开始写入数据
        if(this.checkIndexExists(esIndex)){
            List<IndexRequest> indexRequests = new ArrayList<>();
            try {
                for (T t : list) {

                    Class<?> clazz = t.getClass();

                    Field nameField = clazz.getDeclaredField("id");
                    // 设置name字段可访问
                    nameField.setAccessible(true);
                    // 获取name字段的值
                    String idValue = (String) nameField.get(t);

                    if(StringUtils.isBlank(idValue)){
                        throw new Exception("写入文档的值必须指定id");
                    }

                    IndexRequest request = new IndexRequest();
                    Map<String,Object> map = com.link.util.BeanBuilder.beanToMap(t);
                    request.id(idValue); //文档唯一id
                    request.index(esIndex);
                    request.source(map);
                    indexRequests.add(request);
                }
                indexRequests.forEach(this.createBulkProcessor()::add);
            }catch (Exception e){
                logger.info("--------交易数据添加异常-------------");
                logger.info(e.getMessage());
                logger.debug("交易数据添加异常,错误:{}",e.getMessage());
                throw new Exception(e.getMessage());
            }
            return list.size();
        }
        return 0;
    }

    /**
     * 创建索引
     * @param esIndex 索引名称
     * @param properties 文档json
     * @return
     * @throws IOException
     */
    public boolean careteIndex(String esIndex,String properties) throws IOException {
        CreateIndexRequest request = new CreateIndexRequest(esIndex);

        //设置分片和副本数
        request.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 1)
                .put("index.codec","best_compression"));

        request.mapping(properties, XContentType.JSON);

        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);

        boolean acknowledged = response.isAcknowledged();
        boolean shardsAcknowledged = response.isShardsAcknowledged();

        if(acknowledged && shardsAcknowledged){
            logger.info("交易索引创建成功,acknowledged:{},shardsAcknowledged:{}",acknowledged,shardsAcknowledged);
            return true;
        }else{
            logger.info("交易索引创建失败,acknowledged:{},shardsAcknowledged:{}",acknowledged,shardsAcknowledged);
            return false;
        }
    }
}

 

标签:ES7.3,return,批量,request,索引,elasticsearch,import,org,logger
From: https://www.cnblogs.com/M87-A/p/17955796

相关文章

  • shell命令mac怎么批量重命名某个目录下的文件
    在Mac上批量重命名文件很简单,你可以使用“终端”应用程序或者Finder中的“批量重命名”功能。下面是两种方法:使用终端:打开终端应用程序。使用cd命令切换到目标目录,例如:cd/路径/到/你的/目标目录使用mv命令进行批量重命名,例如:forfilein*;domv"$file""新名称${file##*_}";do......
  • 速卖通跨境智星:解决IP及环境问题,实现批量注册轻松搞定
    如果想要注册大批量的速卖通买家号,关键问题之一就是IP及浏览环境的管理。为了确保每个账号都能独立运行,使用独立的IP是必不可少的。近期,速卖通跨境智星备受关注,支持绑定代理IP,并内置反指纹技术,为用户提供了解决IP及环境问题的便捷途径。首先,使用代理IP可以模拟国外的运行环境,使每个......
  • oracle复合索引怎么建立
    在Oracle中,可以使用以下语法来创建复合索引:CREATEINDEXindex_nameONtable_name(column1,column2,...);其中,index_name是你给索引起的名称,table_name是要在其上创建索引的表名,column1,column2,...是要包含在索引中的列名(按照你希望的顺序)。以下是一个示例,展示如何创建......
  • 文件批量拷贝的脚本(尤其适用于大小写敏感向不敏感的磁盘拷贝时发生冲突的情形)
    在Linux系统下,NTFS可以支持文件名大小写区分;但在MaxOS的exFAT格式中,则无法区分大小写。当从NTFS向exFAT拷贝文件时,当同一个目录下而在文件名相同但大小写不同的两个及以上文件时,向exFAT写入会中断,使得文件拷贝操作无法完成。因此,特别编写了下面这个脚本,用来解决这个问题。它可......
  • 批量爬取百度图片(异步+网络请求解析)
     4、分析百度图片搜索返回结果的HTML代码,或找一图片网站,编写爬虫抓取图片并下载形成专题图片。#########分析#########  #使用网络工具查看百度图片的组成,我们可以发现他的分类中的模块是  #通过一个a标签包揽的,这就表明,我们可以设置两层循环(由于此时下载的东西会......
  • MySQL中的索引:深入理解与案例解析
    引言在数据库中,索引是提高查询速度的关键。特别是在MySQL这样的关系型数据库中,索引的作用尤为重要。本文将深入探讨MySQL中的索引,通过案例解析帮助您更好地理解其工作原理和应用。一、索引的基本概念索引是什么?:简而言之,索引是数据库中用于快速查找数据的数据结构。它类似于书籍......
  • 数据库索引
    一、索引的基本概念数据库索引是一种数据结构,用于快速定位到表中的数据记录。通过创建索引,数据库系统可以快速找到需要的数据,避免全表扫描,从而大大提高查询速度。索引的创建和使用需要占用额外的存储空间,并会影响数据插入、更新和删除操作的性能。因此,索引的使用需要权衡利弊,根据实......
  • MyBatis批量插入数据优化
    背景介绍我们使用了mybatis-plus框架,并采用其中的saveBatch方法进行批量数据插入。然而,通过深入研究源码,我发现这个方法并没有如我期望的那样高效这是因为最终在执行的时候还是通过for循环一条条执行insert,然后再一批的进行flush,默认批的消息为1000为了找到更优秀的解决方案......
  • 【技术探讨】用户使用其他厂家433MHz无线模块时,购买样品OK,小批量100个就会出现偶尔无
    许多用户使用其他厂家的433M透传无线模块反馈这样的问题:前期购买几个样品测试,在无线信号覆盖半径内,收发包测试都很稳定,但是小批量购买100个模块收发就会出现无法收发的情况。这是什么原因呢?首先科普一下,无线电波,在同一个信道同一时刻只允许一个节点发射行为。433M的无线透传模块,没......
  • JdbcTemplate的基本使用-批量新增
    JdbcTemplate的基本配置参考我的上一篇文章:JdbcTemplate的基本使用-新增批量增加可以使用jdbcTemplate.batchUpdate()方法,示例如下:UserServiceImpl增加批量增加方法:packageservice;importdao.UserDao;importentity.User;importorg.springframework.beans.factory.annotat......