缓存和刷新是比较重要的问题,它涉及到lucene如何管理内存和磁盘。前面提到索引的结果是缓存在内存中的,等到一定时候才会将其刷新到硬盘上去。缓存在这里的目的无非是缓解高速设备到低速设备的不匹配。下面这些问题都比较重要:调用增删改索引后此时索引时已经写入磁盘还是仍然驻留内存,即索引的刷新时间是什么?其次,缓存会占用多少内存?另外,刷新的效率如何?最后,lucene允许多个线程并发刷新索引,具体实现是怎么做的?
flush的一个总入口是DocumentWriter中的doFlush,随后严格按照索引链层层向下传递,直到FreqProxTermWriter的flush方法,FreqProxTermWriter中最后会调用如下语句,将所有对索引文件的操作交给codec去做。
1. state.segmentInfo.getCodec().postingsFormat().fieldsConsumer(state).write(fields);
那么到底哪些地方会调用flush呢?实际上,下面这5个地方都会最终调用到flush,可以发现,这5个地方已经涵盖了对索引的大部分操作,可见flush的重要。
从源码上看,addIndexes, forceMerge, close和commit4个操作都是进行full flush,入口为DocumentWriter.flushAllThreads,他们会对所有active的DocumentsWriterPerThread进行flush;而updateDocument则只会flush一个DocumentsWriterPerThread。
在lucene中,flush是由DocumentsWriterFlushControl来统一控制的,好处就是统一入口,便于维护和管理。这个控制器用了很多方式来保持其扩展灵活性以及健壮性。
刷新时机策略
DocumentsWriterFlushControl中有一个FlushPolicy。我们每次更新文档都需要将缓存内容写入磁盘吗?如果每次索引的内容只有几十字节,那么频繁写入其实是不划算的,会带来极大的开销,也就失去使用缓存的意义了。FlushPolicy就是定义了这个刷新时机的策略,默认策略在FlushByRamOrCountsPolicy中定义。如下图,我们可以看出来,FlushByRamOrCountsPolicy中实际上有3种方式来控制缓存的刷新。
其中flushOnRAM会在删除或者插入时被应用,也就是内存达到设定值时候才允许刷新;flushOnDocCount则只在插入时生效,即文档数达到设定值生效;flushOnDeleteTerms在删除时的缓存大于设定值时生效;而这3个设定值分别对应indexWriterConfig中的RAMBufferSizeMB,MaxBufferedDocs, MaxBufferedDeleteTerms。也就是说,我们可以通过控制这几个值来控制刷新的方式。因而一旦设定了RAMBufferSizeMB这种刷新方式,则缓存占用的最大内存就是RAMBufferSizeMB
1. public void onDelete(DocumentsWriterFlushControl control, ThreadState state) {
2. if (flushOnDeleteTerms()) {
3. // Flush this state by num del terms
4. final int maxBufferedDeleteTerms = indexWriterConfig
5. .getMaxBufferedDeleteTerms();
6. if (control.getNumGlobalTermDeletes() >= maxBufferedDeleteTerms) {
7. control.setApplyAllDeletes();
8. }
9. }
10. if ((flushOnRAM() &&
11. 1024*1024*indexWriterConfig.getRAMBufferSizeMB()))) {
12. control.setApplyAllDeletes();
13. if (infoStream.isEnabled("FP")) {
14. "FP", "force apply deletes bytesUsed=" + control.getDeleteBytesUsed() + " vs ramBuffer=" + (1024*1024*indexWriterConfig.getRAMBufferSizeMB()));
15. }
16. }
17. }
18.
19. public void onInsert(DocumentsWriterFlushControl control, ThreadState state) {
20. if (flushOnDocCount()
21. && state.dwpt.getNumDocsInRAM() >= indexWriterConfig
22. .getMaxBufferedDocs()) {
23. // Flush this state by num docs
24. control.setFlushPending(state);
25. else if (flushOnRAM()) {// flush by RAM
26. final long limit = (long) (indexWriterConfig.getRAMBufferSizeMB() * 1024.d * 1024.d);
27. final long totalRam = control.activeBytes() + control.getDeleteBytesUsed();
28. if (totalRam >= limit) {
29. if (infoStream.isEnabled("FP")) {
30. "FP", "flush: activeBytes=" + control.activeBytes() + " deleteBytes=" + control.getDeleteBytesUsed() + " vs limit=" + limit);
31. }
32. markLargestWriterPending(control, state, totalRam);
33. }
34. }
35. }
健康管理
在多线程下,有可能出现索引线程的速度快于刷新线程的速度,积累下去很容易导致JVM内存耗尽。
DocumentsWriterFlushControl中的DocumentsWriterStallControl便是用来做健康管理的。所谓的健康管理,就是在索引线程速度过快时阻塞索引线程,从而保证内存的占用不会持续上涨。判断是否需要健康管理的程序如下
1. final boolean stall = ((activeBytes + flushBytes) > limit) &&
2. (activeBytes < limit) &&
3. !closed;
FlushAllThreads
具体到一次完全刷新具体做了什么事,可以参考下图,实际上主要就是3件事,首先做一些准备工作,因为刷新是非常复杂的过程,当中涉及多线程操作以及状态的变化,所以需要mark,接下来对每个pending状态的DocumentsWriterPerThread做flush,最后是等待所有flush结束。
markForFullFlush这个方法实际上有如下这些步骤。 实际上这个方法有许多多线程处理的技巧,都是为了尽可能的少用锁。
1. 确定删除队列。这里会新建一个generation加1的删除队列来替换原来的删除队列,而后续操作则使用老的删除队列,这么做无非是想在fullFlush时可以同时做其他操作比如删除而不用整个锁住。
2. 改变每个待刷新的ThreadState的状态,从active变为flushing。这步其实还有一个最重要的操作,就是internalTryCheckOutForFlush,他的机制就是将所有checkout的ThreadState放到一个Map(flushingWriters)中,这样就类似上了锁,只要在Map中的都是已经加锁的,比每次加锁判断要高效。
3. 对blockedFlushes阻塞的每个DocumentWriterPerThread,将其状态设置为flushing并从blockedFlushes中删除。
4. 准备好flushQueue,主要就是将上面步骤中fullFlushBuffer搜集到的DocumentWriterPerThread全部填充到flushQueue中
5. 更新健康状态,这步就是去判断是否需要健康管理。
如图,实际的刷新工作是多个线程并发的做,他们都会调用doFlush,很有意思的一点是它会把每次刷新作为一个ticket放到ticketQueue中来管理,这样会有一个统一地方获知正在刷新的ticket,也便于上锁等操作。在flush结束时会进入doAfterFlush,这里会将前面的flushingWriters删除本次flush用的DocumentsWriterPerThread,然后更新健康管理的状态并且使用notifyAll来通知其他等待的线程(主要就是waitForFlush的等待线程)。至于doFlush中实际如何顺着索引链向下的调用,这里就不细细说明了,整体流程到这里已经比较清楚,需要细节可以自行查阅相关代码。
waitForFlush是主线程与刷新线程同步的过程,每次都判断flushingWriters这个Map是否没有可用的threadState(即前面的flush有没有完成),如果没有完成,则继续等待。这正是《java多线程设计模式》中的Balking的使用。
标签:control,缓存,flush,索引,源码,线程,刷新,lucene4.5 From: https://blog.51cto.com/u_2650279/6151322