Elasticsearch 的 Scroll 主要用于高效地分批检索大量数据记录,适用于那些数据量过大而不能一次性通过标准搜索请求获取所有结果的场景。Scroll 机制的工作原理类似于数据库中的游标(cursor),它允许用户发起一次搜索请求后,通过维护一个持续的上下文(context)来分批次获取所有匹配的文档,而不是一次性全部返回。滚动操作并不适用于实时的用户请求,而是旨在处理大量数据,例如为了根据不同的配置将一个数据流或索引中的内容重新索引到新的数据流或索引中。
上面一段话来自于 AI 和 ES 文档翻译,总结下来即:scroll 用于深分页;scroll search 时无法查询到后续修改的数据,非实时。
本文一步一步阐述 scroll 是如何实现深分页查询的。
scroll 使用步骤如下:
# book 是示例索引 # search 时指定 scroll 时间参数,排序字段,页大小,响应会返回一个 _scroll_id, GET book/_search?scroll=5m { "query": { "match_all": {} }, "sort": [{ "created_at": { "order": "desc" } }], "size": 2 } # 设置 scroll_id GET _search/scroll { "scroll": "5m", "scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFk9FYzdMc25wUkZhTDJlTkdTQ1RjaVEAAAAAAAAAZBZXQjRoME0xU1Q3SzdkcUltVTJxOGFB" } # 删除 scroll DELETE _search/scroll { "scroll_id": ["FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFlRHT2Z5VEl6VG1ldmY1ajJBR3lsQVEAAAAAAAAALBZXQjRoME0xU1Q3SzdkcUltVTJxOGFB"] }
那么 scroll 是怎样实现的呢,为什么在查询过程中,数据视图会保持不变?
当我们发起 search 请求时,根据是否携带 scroll 参数创建 ReaderContext,如果携带了 scroll 参数,则创建 LegacyReaderContext,普通 search,则创建 ReaderContext。
// org.elasticsearch.search.SearchService#createAndPutReaderContext final ReaderContext createAndPutReaderContext( ShardSearchRequest request, IndexService indexService, IndexShard shard, Engine.SearcherSupplier reader, long keepAliveInMillis ) { ReaderContext readerContext = null; Releasable decreaseScrollContexts = null; try { if (request.scroll() != null) { decreaseScrollContexts = openScrollContexts::decrementAndGet; if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { throw new ElasticsearchException( "Trying to create too many scroll contexts. Must be less than or equal to: [" + maxOpenScrollContext + "]. " + "This limit can be set by changing the [" + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting." ); } } final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); if (request.scroll() != null) { readerContext = new LegacyReaderContext(id, indexService, shard, reader, request, keepAliveInMillis); if (request.scroll() != null) { readerContext.addOnClose(decreaseScrollContexts); decreaseScrollContexts = null; } } else { readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true); } reader = null; final ReaderContext finalReaderContext = readerContext; final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); searchOperationListener.onNewReaderContext(finalReaderContext); if (finalReaderContext.scrollContext() != null) { searchOperationListener.onNewScrollContext(finalReaderContext); } readerContext.addOnClose(() -> { try { if (finalReaderContext.scrollContext() != null) { searchOperationListener.onFreeScrollContext(finalReaderContext); } } finally { searchOperationListener.onFreeReaderContext(finalReaderContext); } }); putReaderContext(finalReaderContext); readerContext = null; return finalReaderContext; } finally { Releasables.close(reader, readerContext, decreaseScrollContexts); } }
那么 LegacyReaderContext 和 ReaderContext 的区别在哪呢?LegacyReaderContext 是 ReaderContext 的子类,它重写了 acquireSearcher 方法,复用了 Searcher 对象和底层的 lucene DirectoryReader 对象,而这是保证 scroll search 数据视图不变的关键一步!
public class ReaderContext implements Releasable { private final ShardSearchContextId id; private final IndexService indexService; private final IndexShard indexShard; protected final Engine.SearcherSupplier searcherSupplier; private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean singleSession; private final AtomicLong keepAlive; private final AtomicLong lastAccessTime; // For reference why we use RefCounted here see https://github.com/elastic/elasticsearch/pull/20095. private final AbstractRefCounted refCounted; private final List<Releasable> onCloses = new CopyOnWriteArrayList<>(); private final long startTimeInNano = System.nanoTime(); private Map<String, Object> context; public boolean singleSession() { return singleSession; } public Engine.Searcher acquireSearcher(String source) { return searcherSupplier.acquireSearcher(source); } } public class LegacyReaderContext extends ReaderContext { private final ShardSearchRequest shardSearchRequest; private final ScrollContext scrollContext; private final Engine.Searcher searcher; private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; public LegacyReaderContext( ShardSearchContextId id, IndexService indexService, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis ) { super(id, indexService, indexShard, reader, keepAliveInMillis, false); assert shardSearchRequest.readerId() == null; assert shardSearchRequest.keepAlive() == null; assert id.getSearcherId() == null : "Legacy reader context must not have searcher id"; this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest, "ShardSearchRequest must be provided"); if (shardSearchRequest.scroll() != null) { // Search scroll requests are special, they don't hold indices names so we have // to reuse the searcher created on the request that initialized the scroll. // This ensures that we wrap the searcher's reader with the user's permissions // when they are available. final Engine.Searcher delegate = searcherSupplier.acquireSearcher("search"); addOnClose(delegate); // wrap the searcher so that closing is a noop, the actual closing happens when this context is closed this.searcher = new Engine.Searcher( delegate.source(), delegate.getDirectoryReader(), delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {} ); this.scrollContext = new ScrollContext(); } else { this.scrollContext = null; this.searcher = null; } } @Override public Engine.Searcher acquireSearcher(String source) { if (scrollContext != null) { assert Engine.SEARCH_SOURCE.equals(source) : "scroll context should not acquire searcher for " + source; return searcher; } return super.acquireSearcher(source); } @Override public boolean singleSession() { return scrollContext == null || scrollContext.scroll == null; } }
创建 ReaderContext 后,会保存在 activeReaders 中,当 ES 收到 scroll search 时,则从 activeReaders 中取出 ReaderContext ,执行查询时复用 ReaderContext 中的 Engine.Searcher。
private final Map<Long, ReaderContext> activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); // org.elasticsearch.search.SearchService#putReaderContext protected void putReaderContext(ReaderContext context) { final ReaderContext previous = activeReaders.put(context.id().getId(), context); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout. final Index index = context.indexShard().shardId().getIndex(); if (indicesService.hasIndex(index) == false) { removeReaderContext(context.id().getId()); throw new IndexNotFoundException(index); } } // org.elasticsearch.search.SearchService#findReaderContext private ReaderContext findReaderContext(ShardSearchContextId id, TransportRequest request) throws SearchContextMissingException { if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } if (sessionId.equals(id.getSessionId()) == false) { throw new SearchContextMissingException(id); } final ReaderContext reader = activeReaders.get(id.getId()); if (reader == null) { throw new SearchContextMissingException(id); } try { reader.validate(request); } catch (Exception exc) { processFailure(reader, exc); throw exc; } return reader; }
另外普通 search 所对应的 ReaderContext 对象,singleSession = true,而 scroll search 则是 false,这影响着 ReaderContext 的清除逻辑,从下面代码中可以看到,singleSession 为 true,ReaderContext 用后即从 map 中删除了。
// org.elasticsearch.search.SearchService#executeFetchPhase(org.elasticsearch.search.internal.ReaderContext, org.elasticsearch.search.internal.SearchContext, long) private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) { try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)) { shortcutDocIdsToLoad(context); fetchPhase.execute(context); if (reader.singleSession()) { freeReaderContext(reader.id()); } executor.success(); } return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); } // org.elasticsearch.search.SearchService#freeReaderContext public boolean freeReaderContext(ShardSearchContextId contextId) { if (sessionId.equals(contextId.getSessionId())) { try (ReaderContext context = removeReaderContext(contextId.getId())) { return context != null; } } return false; } // org.elasticsearch.search.SearchService#removeReaderContext protected ReaderContext removeReaderContext(long id) { return activeReaders.remove(id); }
而 scroll search 会从 activeReaders 取出 ReaderContext 继续使用,直到过期。
//org.elasticsearch.search.SearchService#executeFetchPhase(org.elasticsearch.search.internal.InternalScrollSearchRequest, org.elasticsearch.action.search.SearchShardTask, org.elasticsearch.action.ActionListener<org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult>) public void executeFetchPhase( InternalScrollSearchRequest request, SearchShardTask task, ActionListener<ScrollQueryFetchSearchResult> listener ) { final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId(), request); final Releasable markAsUsed; try { markAsUsed = readerContext.markAsUsed(getScrollKeepAlive(request.scroll())); } catch (Exception e) { // We need to release the reader context of the scroll when we hit any exception (here the keep_alive can be too large) freeReaderContext(readerContext.id()); throw e; } runAsync(getExecutor(readerContext.indexShard()), () -> { final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(null); try ( SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false); SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext) ) { searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); processScroll(request, readerContext, searchContext); QueryPhase.execute(searchContext); final long afterQueryTime = executor.success(); QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { assert TransportActions.isShardNotAvailableException(e) == false : new AssertionError(e); logger.trace("Fetch phase failed", e); // we handle the failure in the failure listener below throw e; } }, wrapFailureListener(listener, readerContext, markAsUsed)); }
铺垫这么多,引出一条链,LegacyReaderContext -> Engine.Searcher -> IndexReader,scroll 查询会复用 LegacyReaderContext,进而复用 IndexReader。而众所周知,ES 的搜索不是即时的,本质是 lucene 的 IndexReader(DirectoryReader)不是即时的,它需要重新 open 之后,才能看到新修改的数据。而这个特点正是 scroll 数据视图一致性的根本所在。
那么实现分页的游标又体现在哪里呢?scroll search 执行后,会保存当前页的最后一个文档。下次执行 scroll 时,基于这个最后文档进行查询。
标签:讲清楚,search,ReaderContext,final,Elasticsearch,id,null,scroll From: https://www.cnblogs.com/allenwas3/p/18237401