首页 > 其他分享 >讲清楚 Elasticsearch scroll 的底层原理

讲清楚 Elasticsearch scroll 的底层原理

时间:2024-06-07 17:27:05浏览次数:18  
标签:讲清楚 search ReaderContext final Elasticsearch id null scroll

Elasticsearch 的 Scroll 主要用于高效地分批检索大量数据记录,适用于那些数据量过大而不能一次性通过标准搜索请求获取所有结果的场景。Scroll 机制的工作原理类似于数据库中的游标(cursor),它允许用户发起一次搜索请求后,通过维护一个持续的上下文(context)来分批次获取所有匹配的文档,而不是一次性全部返回。滚动操作并不适用于实时的用户请求,而是旨在处理大量数据,例如为了根据不同的配置将一个数据流或索引中的内容重新索引到新的数据流或索引中。

上面一段话来自于 AI 和 ES 文档翻译,总结下来即:scroll 用于深分页;scroll search 时无法查询到后续修改的数据,非实时。

本文一步一步阐述 scroll 是如何实现深分页查询的。

scroll 使用步骤如下:

# book 是示例索引
# search 时指定 scroll 时间参数,排序字段,页大小,响应会返回一个 _scroll_id,
GET book/_search?scroll=5m
{
    "query": {
        "match_all": {}
    },
    "sort": [{
        "created_at": {
            "order": "desc"
        }
    }],
    "size": 2
}

# 设置 scroll_id 
GET _search/scroll
{
  "scroll": "5m",
  "scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFk9FYzdMc25wUkZhTDJlTkdTQ1RjaVEAAAAAAAAAZBZXQjRoME0xU1Q3SzdkcUltVTJxOGFB"
}

# 删除 scroll
DELETE _search/scroll
{
  "scroll_id": ["FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFlRHT2Z5VEl6VG1ldmY1ajJBR3lsQVEAAAAAAAAALBZXQjRoME0xU1Q3SzdkcUltVTJxOGFB"]
}

那么 scroll 是怎样实现的呢,为什么在查询过程中,数据视图会保持不变?

当我们发起 search 请求时,根据是否携带 scroll 参数创建 ReaderContext,如果携带了 scroll 参数,则创建 LegacyReaderContext,普通 search,则创建 ReaderContext。

// org.elasticsearch.search.SearchService#createAndPutReaderContext
final ReaderContext createAndPutReaderContext(
    ShardSearchRequest request,
    IndexService indexService,
    IndexShard shard,
    Engine.SearcherSupplier reader,
    long keepAliveInMillis
) {
    ReaderContext readerContext = null;
    Releasable decreaseScrollContexts = null;
    try {
        if (request.scroll() != null) {
            decreaseScrollContexts = openScrollContexts::decrementAndGet;
            if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) {
                throw new ElasticsearchException(
                    "Trying to create too many scroll contexts. Must be less than or equal to: ["
                        + maxOpenScrollContext
                        + "]. "
                        + "This limit can be set by changing the ["
                        + MAX_OPEN_SCROLL_CONTEXT.getKey()
                        + "] setting."
                );
            }
        }
        final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
        if (request.scroll() != null) {
            readerContext = new LegacyReaderContext(id, indexService, shard, reader, request, keepAliveInMillis);
            if (request.scroll() != null) {
                readerContext.addOnClose(decreaseScrollContexts);
                decreaseScrollContexts = null;
            }
        } else {
            readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true);
        }
        reader = null;
        final ReaderContext finalReaderContext = readerContext;
        final SearchOperationListener searchOperationListener = shard.getSearchOperationListener();
        searchOperationListener.onNewReaderContext(finalReaderContext);
        if (finalReaderContext.scrollContext() != null) {
            searchOperationListener.onNewScrollContext(finalReaderContext);
        }
        readerContext.addOnClose(() -> {
            try {
                if (finalReaderContext.scrollContext() != null) {
                    searchOperationListener.onFreeScrollContext(finalReaderContext);
                }
            } finally {
                searchOperationListener.onFreeReaderContext(finalReaderContext);
            }
        });
        putReaderContext(finalReaderContext);
        readerContext = null;
        return finalReaderContext;
    } finally {
        Releasables.close(reader, readerContext, decreaseScrollContexts);
    }
}

那么 LegacyReaderContext 和 ReaderContext 的区别在哪呢?LegacyReaderContext 是 ReaderContext  的子类,它重写了 acquireSearcher 方法,复用了 Searcher 对象和底层的 lucene DirectoryReader 对象,而这是保证 scroll search 数据视图不变的关键一步!

public class ReaderContext implements Releasable {
    private final ShardSearchContextId id;
    private final IndexService indexService;
    private final IndexShard indexShard;
    protected final Engine.SearcherSupplier searcherSupplier;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final boolean singleSession;

    private final AtomicLong keepAlive;
    private final AtomicLong lastAccessTime;
    // For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095.
    private final AbstractRefCounted refCounted;

    private final List<Releasable> onCloses = new CopyOnWriteArrayList<>();

    private final long startTimeInNano = System.nanoTime();

    private Map<String, Object> context;

    public boolean singleSession() {
        return singleSession;
    }

    public Engine.Searcher acquireSearcher(String source) {
        return searcherSupplier.acquireSearcher(source);
    }

}


public class LegacyReaderContext extends ReaderContext {
    private final ShardSearchRequest shardSearchRequest;
    private final ScrollContext scrollContext;
    private final Engine.Searcher searcher;

    private AggregatedDfs aggregatedDfs;
    private RescoreDocIds rescoreDocIds;

    public LegacyReaderContext(
        ShardSearchContextId id,
        IndexService indexService,
        IndexShard indexShard,
        Engine.SearcherSupplier reader,
        ShardSearchRequest shardSearchRequest,
        long keepAliveInMillis
    ) {
        super(id, indexService, indexShard, reader, keepAliveInMillis, false);
        assert shardSearchRequest.readerId() == null;
        assert shardSearchRequest.keepAlive() == null;
        assert id.getSearcherId() == null : "Legacy reader context must not have searcher id";
        this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest, "ShardSearchRequest must be provided");
        if (shardSearchRequest.scroll() != null) {
            // Search scroll requests are special, they don't hold indices names so we have
            // to reuse the searcher created on the request that initialized the scroll.
            // This ensures that we wrap the searcher's reader with the user's permissions
            // when they are available.
            final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search");
            addOnClose(delegate);
            // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed
            this.searcher = new Engine.Searcher(
                delegate.source(),
                delegate.getDirectoryReader(),
                delegate.getSimilarity(),
                delegate.getQueryCache(),
                delegate.getQueryCachingPolicy(),
                () -> {}
            );
            this.scrollContext = new ScrollContext();
        } else {
            this.scrollContext = null;
            this.searcher = null;
        }
    }

    @Override
    public Engine.Searcher acquireSearcher(String source) {
        if (scrollContext != null) {
            assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source;
            return searcher;
        }
        return super.acquireSearcher(source);
    }

    @Override
    public boolean singleSession() {
        return scrollContext == null || scrollContext.scroll == null;
    }
}

创建 ReaderContext 后,会保存在 activeReaders 中,当 ES 收到 scroll search 时,则从 activeReaders 中取出 ReaderContext ,执行查询时复用 ReaderContext 中的 Engine.Searcher。

private final Map<Long, ReaderContext> activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();

// org.elasticsearch.search.SearchService#putReaderContext
protected void putReaderContext(ReaderContext context) {
    final ReaderContext previous = activeReaders.put(context.id().getId(), context);
    assert previous == null;
    // ensure that if we race against afterIndexRemoved, we remove the context from the active list.
    // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout.
    final Index index = context.indexShard().shardId().getIndex();
    if (indicesService.hasIndex(index) == false) {
        removeReaderContext(context.id().getId());
        throw new IndexNotFoundException(index);
    }
}

// org.elasticsearch.search.SearchService#findReaderContext
private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException {
    if (id.getSessionId().isEmpty()) {
        throw new IllegalArgumentException("Session id must be specified");
    }
    if (sessionId.equals(id.getSessionId()) == false) {
        throw new SearchContextMissingException(id);
    }
    final ReaderContext reader = activeReaders.get(id.getId());
    if (reader == null) {
        throw new SearchContextMissingException(id);
    }
    try {
        reader.validate(request);
    } catch (Exception exc) {
        processFailure(reader, exc);
        throw exc;
    }
    return reader;
}

另外普通 search 所对应的 ReaderContext 对象,singleSession = true,而 scroll search 则是 false,这影响着 ReaderContext 的清除逻辑,从下面代码中可以看到,singleSession 为 true,ReaderContext 用后即从 map 中删除了。

// org.elasticsearch.search.SearchService#executeFetchPhase(org.elasticsearch.search.internal.ReaderContext, org.elasticsearch.search.internal.SearchContext, long)
private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
    try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) {
        shortcutDocIdsToLoad(context);
        fetchPhase.execute(context);
        if (reader.singleSession()) {
            freeReaderContext(reader.id());
        }
        executor.success();
    }
    return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
}

// org.elasticsearch.search.SearchService#freeReaderContext
    public boolean freeReaderContext(ShardSearchContextId contextId) {
    if (sessionId.equals(contextId.getSessionId())) {
        try (ReaderContext context = removeReaderContext(contextId.getId())) {
            return context != null;
        }
    }
    return false;
}

// org.elasticsearch.search.SearchService#removeReaderContext
protected ReaderContext removeReaderContext(long id) {
    return activeReaders.remove(id);
}

而 scroll search 会从 activeReaders 取出 ReaderContext 继续使用,直到过期。

//org.elasticsearch.search.SearchService#executeFetchPhase(org.elasticsearch.search.internal.InternalScrollSearchRequest, org.elasticsearch.action.search.SearchShardTask, org.elasticsearch.action.ActionListener<org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult>)
public void executeFetchPhase(
    InternalScrollSearchRequest request,
    SearchShardTask task,
    ActionListener<ScrollQueryFetchSearchResult> listener
) {
    final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request);
    final Releasable markAsUsed;
    try {
        markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll()));
    } catch (Exception e) {
        // We need to release the reader context of the scroll when we hit any exception (here the keep_alive can be too large)
        freeReaderContext(readerContext.id());
        throw e;
    }
    runAsync(getExecutor(readerContext.indexShard()), () -> {
        final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null);
        try (
            SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false);
            SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)
        ) {
            searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null));
            searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null));
            processScroll(request, readerContext, searchContext);
            QueryPhase.execute(searchContext);
            final long afterQueryTime = executor.success();
            QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime);
            return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget());
        } catch (Exception e) {
            assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e);
            logger.trace("Fetch phase failed", e);
            // we handle the failure in the failure listener below
            throw e;
        }
    }, wrapFailureListener(listener, readerContext, markAsUsed));
}

铺垫这么多,引出一条链,LegacyReaderContext -> Engine.Searcher -> IndexReader,scroll 查询会复用 LegacyReaderContext,进而复用 IndexReader。而众所周知,ES 的搜索不是即时的,本质是 lucene 的 IndexReader(DirectoryReader)不是即时的,它需要重新 open 之后,才能看到新修改的数据。而这个特点正是 scroll 数据视图一致性的根本所在。

那么实现分页的游标又体现在哪里呢?scroll search 执行后,会保存当前页的最后一个文档。下次执行 scroll 时,基于这个最后文档进行查询。

 

标签:讲清楚,search,ReaderContext,final,Elasticsearch,id,null,scroll
From: https://www.cnblogs.com/allenwas3/p/18237401

相关文章

  • RainBond 制作应用并上架【以ElasticSearch为例】
    文章目录安装ElasticSearch集群第1步:添加组件第2步:查看组件第3步:访问组件制作ElasticSearch组件准备工作ElasticSearch集群原理尝试Helm安装ES集群RainBond制作ES思路源代码Dockerfiledocker-entrypoint.shelasticsearch.yml......
  • Elasticsearch强制重置未分配的分片(unassigned)
    强制重置未分片的分片,这个问题源自于Elasticsearch维护中,Node意外退出的场景。意外退出后Elasticsearch由于网络原因或者jvm性能压力,未能短时间内分配分片。看一下分片的状态。可以看到有一些分片处于未分配状态。代码语言:javascript复制curlhttp://10.93.21.2......
  • 分布式搜索引擎ElasticSearch学习笔记
    一、Elasticsearch介绍什么是elasticsearch?一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能什么是elasticstack(ELK)?是以elasticsearch为核心的技术栈,包括beats、Logstash、kibana、elasticsearch什么是Lucene?是Apache的开源搜索引擎类库,提......
  • 控制台警告:[Violation] Added non-passive event listener to a scroll-blocking 'mou
    控制台警告:[Violation]Addednon-passiveeventlistenertoascroll-blocking'mousewheel'event.Considermarkingeventhandleras'passive'tomakethepagemoreresponsive.Seehttps://www.chromestatus.com/feature/5745543795965952[Viola......
  • WPF datagrid scrolldown and change the marked the location in canvas
    <Windowx:Class="WpfApp134.MainWindow"xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"xmlns:d="http://schemas.microsoft......
  • ElasticSearch性能原理拆解
    逐层拆分ElasticSearch的概念Cluster:集群,Es是一个可以横向扩展的检索引擎(部分时候当作存储数据库使用),一个Es集群由一个唯一的名字标识,默认为“elasticsearch”。在配置文件中指定相同的集群名,Es会将相同集群名的节点组成一个集群。Node:节点,集群中的任意一个实例对象,是一个节......
  • Unity ScrollRect中,拖拽移动Item的顺序
    *UnityScrollRect中,拖拽移动Item的顺序*目标需求制作方法完整代码最终效果备注unitypackage目标需求Unity在制作下拉菜单时,用户可通过拖拽Item,替换当前Item的位置。本教程使用UGUI制作方法创建脚本ScrollRectManager,用于管理整个ScrollRect,我是将脚本Scro......
  • 城市之旅:使用 LLM 和 Elasticsearch 简化地理空间搜索(二)
    我们在之前的文章“城市之旅:使用LLM和Elasticsearch简化地理空间搜索(一)”,在今天的练习中,我将使用本地部署来做那里面的Jupyternotebook。安装Elasticsearch及Kibana如果你还没有安装好自己的Elasticsearch及Kibana,请参考如下的链接来进行安装:如何在Linux,Mac......
  • Elasticsearch8.4安装及Java Api Client的使用
    目录简介一、ElasticSearch安装二、可视化界面(elasticserach-head)插件安装三、Kibana的安装四、ES核心概念五、IK分词器六、Rest风格说明:ES推荐使用的七、关于索引的操作1、PUT命令2、GET命令3、POST命令4、DELETE命令八、关于文档的操作九、整合SpringBoot,基于......
  • 面试专区|【69道Elasticsearch高频题整理(附答案背诵版)】
    简述什么是Elasticsearch?Elasticsearch是一个基于Lucene的搜索服务器,它提供了一个分布式、多用户能力的全文搜索引擎,基于RESTfulweb接口。Elasticsearch是用Java语言开发的,并作为Apache许可条款下的开放源码发布,是一种流行的企业级搜索引擎。它用于云计算中,能够达到实时......