首页 > 其他分享 >Elasticsearch 近实时搜索的底层原理

Elasticsearch 近实时搜索的底层原理

时间:2024-06-17 17:35:26浏览次数:20  
标签:engine index return indexService refresh Elasticsearch 实时 false 底层

我们都知道 Elasticsearch 的搜索是近实时的,数据写入后,立即搜索(不通过 id)文档是搜不到的。这一切的原因要归于 lucene 所提供的 API,因为 lucene 的 API 就是非实时的,Elasticsearch 在 lucene 之上盖房子,通过一些增强,实现了查询的近实时和 id 查询的实时性。本文就来看看这个近实时的原理。

对应每个索引分片,ES 会创建一个定时任务,很显然 AsyncRefreshTask 是这个定时任务

 1 // org.elasticsearch.index.IndexService.AsyncRefreshTask
 2 static final class AsyncRefreshTask extends BaseAsyncTask {
 3 
 4     AsyncRefreshTask(IndexService indexService) {
 5         super(indexService, indexService.getIndexSettings().getRefreshInterval());
 6     }
 7 
 8     @Override
 9     protected void runInternal() {
10         indexService.maybeRefreshEngine(false);
11     }
12 
13     @Override
14     protected String getThreadPool() {
15         return ThreadPool.Names.REFRESH;
16     }
17 
18     @Override
19     public String toString() {
20         return "refresh";
21     }
22 }

那么这个定时任务的执行间隔是多少呢,是 1 秒钟

1 // org.elasticsearch.index.IndexSettings
2     public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
3     public static final Setting<TimeValue> INDEX_REFRESH_INTERVAL_SETTING = Setting.timeSetting(
4         "index.refresh_interval",
5         DEFAULT_REFRESH_INTERVAL,
6         new TimeValue(-1, TimeUnit.MILLISECONDS),
7         Property.Dynamic,
8         Property.IndexScope
9     );

定时任务的触发执行在哪呢,下面代码第 31 行,线程池放入任务

 1    abstract static class BaseAsyncTask extends AbstractAsyncTask {
 2 
 3         protected final IndexService indexService;
 4 
 5         BaseAsyncTask(final IndexService indexService, final TimeValue interval) {
 6             super(indexService.logger, indexService.threadPool, interval, true);
 7             this.indexService = indexService;
 8             rescheduleIfNecessary();
 9         }
10 
11         @Override
12         protected boolean mustReschedule() {
13             // don't re-schedule if the IndexService instance is closed or if the index is closed
14             return indexService.closed.get() == false
15                 && indexService.indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN;
16         }
17     }
18 
19     // org.elasticsearch.common.util.concurrent.AbstractAsyncTask#rescheduleIfNecessary
20     public synchronized void rescheduleIfNecessary() {
21         if (isClosed()) {
22             return;
23         }
24         if (cancellable != null) {
25             cancellable.cancel();
26         }
27         if (interval.millis() > 0 && mustReschedule()) {
28             if (logger.isTraceEnabled()) {
29                 logger.trace("scheduling {} every {}", toString(), interval);
30             }
31             cancellable = threadPool.schedule(this, interval, getThreadPool());
32             isScheduledOrRunning = true;
33         } else {
34             logger.trace("scheduled {} disabled", toString());
35             cancellable = null;
36             isScheduledOrRunning = false;
37         }
38     }

具体的执行,是否执行 refresh 需要满足一系列条件,这里着重看 getEngine().refreshNeeded()

// org.elasticsearch.index.IndexService#maybeRefreshEngine
private void maybeRefreshEngine(boolean force) {
    if (indexSettings.getRefreshInterval().millis() > 0 || force) {
        for (IndexShard shard : this.shards.values()) {
            try {
                shard.scheduledRefresh();
            } catch (IndexShardClosedException | AlreadyClosedException ex) {
                // fine - continue;
            }
        }
    }
}

// org.elasticsearch.index.shard.IndexShard#scheduledRefresh
/**
 * Executes a scheduled refresh if necessary.
 *
 * @return <code>true</code> iff the engine got refreshed otherwise <code>false</code>
 */
public boolean scheduledRefresh() {
    verifyNotClosed();
    boolean listenerNeedsRefresh = refreshListeners.refreshNeeded();
    if (isReadAllowed() && (listenerNeedsRefresh || getEngine().refreshNeeded())) {
        if (listenerNeedsRefresh == false // if we have a listener that is waiting for a refresh we need to force it
            && isSearchIdle()
            && indexSettings.isExplicitRefresh() == false
            && active.get()) { // it must be active otherwise we might not free up segment memory once the shard became inactive
            // lets skip this refresh since we are search idle and
            // don't necessarily need to refresh. the next searcher access will register a refreshListener and that will
            // cause the next schedule to refresh.
            final Engine engine = getEngine();
            engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
            setRefreshPending(engine);
            return false;
        } else {
            if (logger.isTraceEnabled()) {
                logger.trace("refresh with source [schedule]");
            }
            return getEngine().maybeRefresh("schedule");
        }
    }
    final Engine engine = getEngine();
    engine.maybePruneDeletes(); // try to prune the deletes in the engine if we accumulated some
    return false;
}

是否需要 refresh,最终调用的是 lucene 中 DirectoryReader 的 isCurrent() 方法,通过方法签名可以看出,当索引发生了新的变化后,该方法返回 true

 1 // org.elasticsearch.index.engine.Engine#refreshNeeded
 2     public boolean refreshNeeded() {
 3         if (store.tryIncRef()) {
 4             /*
 5               we need to inc the store here since we acquire a searcher and that might keep a file open on the
 6               store. this violates the assumption that all files are closed when
 7               the store is closed so we need to make sure we increment it here
 8              */
 9             try {
10                 try (Searcher searcher = acquireSearcher("refresh_needed", SearcherScope.EXTERNAL)) {
11                     return searcher.getDirectoryReader().isCurrent() == false;
12                 }
13             } catch (IOException e) {
14                 logger.error("failed to access searcher manager", e);
15                 failEngine("failed to access searcher manager", e);
16                 throw new EngineException(shardId, "failed to access searcher manager", e);
17             } finally {
18                 store.decRef();
19             }
20         }
21         return false;
22     }
23     
24     
25 
26 // org.apache.lucene.index.DirectoryReader#isCurrent
27 /**
28 **Check whether any new changes have occurred to the index since this reader was opened.
29   If this reader was created by calling open, then this method checks if any further commits (see IndexWriter.commit) have occurred in the directory.
30   If instead this reader is a near real-time reader (ie, obtained by a call to open(IndexWriter), or by calling openIfChanged on a near real-time reader), then this method checks if either a new commit has occurred, or any new uncommitted changes have taken place via the writer. Note that even if the writer has only performed merging, this method will still return false.
31   In any event, if this returns false, you should call openIfChanged to get a new reader that sees the changes.
32 **/
33 public abstract boolean isCurrent() throws IOException;

写惯了 CRUD 业务代码的我,看到 IndexService 想当然以为它管理着所有的索引,仔细阅读了下源码,ES 中一个索引对应一个 IndexService 实例,一个 Engine 实例。好,接下来我们看刷新操作到底做了什么,最终调用的是 lucene 中 DirectoryReader 的 openIfChanged 方法,调用该方法后,返回的新 reader 可以搜索到新文档。

// org.elasticsearch.index.engine.InternalEngine#maybeRefresh
@Override
public boolean maybeRefresh(String source) throws EngineException {
    return refresh(source, SearcherScope.EXTERNAL, false);
}

// org.elasticsearch.index.engine.InternalEngine#refresh
final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException {
    // both refresh types will result in an internal refresh but only the external will also
    // pass the new reader reference to the external reader manager.
    final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
    boolean refreshed;
    try {
        // refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
        if (store.tryIncRef()) {
            // increment the ref just to ensure nobody closes the store during a refresh
            try {
                // even though we maintain 2 managers we really do the heavy-lifting only once.
                // the second refresh will only do the extra work we have to do for warming caches etc.
                ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
                // it is intentional that we never refresh both internal / external together
                if (block) {
                    referenceManager.maybeRefreshBlocking();
                    refreshed = true;
                } else {
                    refreshed = referenceManager.maybeRefresh();
                }
            } finally {
                store.decRef();
            }
            if (refreshed) {
                lastRefreshedCheckpointListener.updateRefreshedCheckpoint(localCheckpointBeforeRefresh);
            }
        } else {
            refreshed = false;
        }
    } catch (AlreadyClosedException e) {
        failOnTragicEvent(e);
        throw e;
    } catch (Exception e) {
        try {
            failEngine("refresh failed source[" + source + "]", e);
        } catch (Exception inner) {
            e.addSuppressed(inner);
        }
        throw new RefreshFailedEngineException(shardId, e);
    }
    assert refreshed == false || lastRefreshedCheckpoint() >= localCheckpointBeforeRefresh
        : "refresh checkpoint was not advanced; "
            + "local_checkpoint="
            + localCheckpointBeforeRefresh
            + " refresh_checkpoint="
            + lastRefreshedCheckpoint();
    // TODO: maybe we should just put a scheduled job in threadPool?
    // We check for pruning in each delete request, but we also prune here e.g. in case a delete burst comes in and then no more deletes
    // for a long time:
    maybePruneDeletes();
    mergeScheduler.refreshConfig();
    return refreshed;
}

//org.elasticsearch.index.engine.ElasticsearchReaderManager#refreshIfNeeded
class ElasticsearchReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
    @Override
    protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryReader referenceToRefresh) throws IOException {
        return (ElasticsearchDirectoryReader) DirectoryReader.openIfChanged(referenceToRefresh);
    }
}

代码很长,结论很简单,ES 通过定时任务,定期对索引进行 refresh,将非实时的搜索增强为近实时。

标签:engine,index,return,indexService,refresh,Elasticsearch,实时,false,底层
From: https://www.cnblogs.com/allenwas3/p/18252857

相关文章

  • AOP切面的实现原理【底层源码】
    AOP是基于IOC的Bean加载来实现的,将切面类的所有切面方法根据使用的注解生成对应的Advice,并将Advice连同切入点匹配器和切面类等信息一并封装到Advisor,为后续交给代理增强实现做准备这里我们可以很明确的知道,AOP也是在Bean容器中被Spring管理的,根据初始化过程打断点定位......
  • AOP代理的创建【底层源码】
    代理的创建(源码)创建代理的方法是postProcessAfterInitialization:如果Bean被子类标识为代理,则使用配置的拦截器创建一个代理源码参考:AOP切面底层原理【底层源码】-postProcessAfterInitialization源码部分wrapIfNecessary方法主要用于判断是否需要创建代理,如果bean能......
  • 使用OpenCV进行实时性别和年龄识别
            在计算机视觉领域,使用深度学习技术进行实时性别和年龄识别是一项具有挑战性和实用性的任务。本文将深入解析一个使用OpenCV和预训练模型实现的实时性别和年龄识别代码,并逐行进行详细的注释解析,帮助读者理解代码的工作原理和实现细节。importcv2importnumpy......
  • 2024 最新谷歌邮箱 Gmail 账号注册完整指南 (多种方法 实时更新)
    Gmail是目前国内外是最常见、使用最广泛的邮箱,基本上持有谷歌邮箱的人可以”横行互联网“。针对很多人反映自己在注册谷歌账号时总是失败,本文整理了截止2024年6月亲测可用的所有注册方法,以图文结合的形式详细手把手带你注册Gmail新账户。 本文将包括:注册Gmail的主......
  • 1、docker-安装-阿里云镜像加速-docker工作流程和底层原理
    1、访问官网:https://docs.docker.com/get-docker/2、卸载旧版本:yumremovedocker\docker-client\docker-client-latest\docker-common\docker-latest\docker-latest-lo......
  • 5、docker-部署ES(elasticsearch)+kibana
    #es暴露的端口多#es十分消耗内存#es的数据一般需要放置到安全目录、挂载=========================================安装es=========================1、下载启动es(建议启动前把其它容器停止,不然会很卡)·dockerrun-d--nameelasticsearch-p9200:9200-p9300:9300......
  • 超级底层:10WQPS/PB级海量存储HBase/RocksDB,底层LSM结构是什么?
    文章很长,且持续更新,建议收藏起来,慢慢读!疯狂创客圈总目录博客园版为您奉上珍贵的学习资源:免费赠送:《尼恩Java面试宝典》持续更新+史上最全+面试必备2000页+面试必备+大厂必备+涨薪必备免费赠送:《尼恩技术圣经+高并发系列PDF》,帮你实现技术自由,完成职业升级,薪......
  • 微服务开发与实战Day09 - Elasticsearch
    一、DSL查询Elasticsearch提供了DSL(DomainSpecificLanguage)查询,就是以JSON格式来定义查询条件。类似这样:DSL查询可以分为两大类:叶子查询(Leafqueryclauses):一般是在特定的字段里查询特定值,属于简单查询,很少单独使用。复合查询(Compoundqueryclauses):以逻辑方式组合多个叶......
  • 双列集合 HashMap以及TreeMap底层原理
    双列集合 特点:    双列集合一次需要存一对数据,分别为键和值    键不能重复,值可以重复    键和值是一一对应的,每个键只能找到自己对应的值        键和值这个整体在Java中叫做“Entry对象”Map的常见API    Map是双列集合的顶......
  • C++双端队列deque源码的深度学习(stack,queue的默认底层容器)
    什么是deque?deque是C++标准模板库(STL)中的一个容器,代表“双端队列”(double-endedqueue)。deque支持在其前端(front)和后端(back)进行快速插入和删除操作,并且它在序列的中间插入和删除元素时通常比vector或list更高效。deque的特点双端插入和删除:你可以在deque的头部和尾部快速......