首页 > 其他分享 >Elasticsearch 的事务日志

Elasticsearch 的事务日志

时间:2024-06-19 15:01:30浏览次数:12  
标签:事务 return operation get translog Elasticsearch 日志 versionValue final

translog 是 Elasticsearch 保证数据可靠性和灾难恢复的重要组件,每个索引分片配备一个 translog,对索引数据的增加、更新操作都会记录在 translog 中。

translog 本质上是一个可滚动的日志文件,相比于 lucene 的写入,日志文件的写入是一个相对轻量的操作,translog 会定期地 sync 到磁盘中。

translog 的写入:

 1 // org.elasticsearch.index.translog.Translog#add
 2 /**
 3  * Adds an operation to the transaction log.
 4  *
 5  * @param operation the operation to add
 6  * @return the location of the operation in the translog
 7  * @throws IOException if adding the operation to the translog resulted in an I/O exception
 8  */
 9 public Location add(final Operation operation) throws IOException {
10     final ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
11     try {
12         final long start = out.position();
13         out.skip(Integer.BYTES);
14         writeOperationNoSize(new BufferedChecksumStreamOutput(out), operation);
15         final long end = out.position();
16         final int operationSize = (int) (end - Integer.BYTES - start);
17         out.seek(start);
18         out.writeInt(operationSize);
19         out.seek(end);
20         final BytesReference bytes = out.bytes();
21         try (ReleasableLock ignored = readLock.acquire()) {
22             ensureOpen();
23             if (operation.primaryTerm() > current.getPrimaryTerm()) {
24                 assert false
25                     : "Operation term is newer than the current term; "
26                         + "current term["
27                         + current.getPrimaryTerm()
28                         + "], operation term["
29                         + operation
30                         + "]";
31                 throw new IllegalArgumentException(
32                     "Operation term is newer than the current term; "
33                         + "current term["
34                         + current.getPrimaryTerm()
35                         + "], operation term["
36                         + operation
37                         + "]"
38                 );
39             }
40             return current.add(bytes, operation.seqNo());
41         }
42     } catch (final AlreadyClosedException | IOException ex) {
43         closeOnTragicEvent(ex);
44         throw ex;
45     } catch (final Exception ex) {
46         closeOnTragicEvent(ex);
47         throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", ex);
48     } finally {
49         Releasables.close(out);
50     }
51 }

Operation 是一个接口,Index 和 Delete 是它的实现类,ES 中对索引数据的增删改都可以用 Index 和 Delete 实现。

public interface Operation 
public static class Index implements Operation
public static class Delete implements Operation 

current 是 TranslogWriter,translog 的写入是异步刷盘的,先写到内存的输出流中,后续由定时任务刷盘

 1 // org.elasticsearch.index.translog.TranslogWriter#add
 2 /**
 3  * Add the given bytes to the translog with the specified sequence number; returns the location the bytes were written to.
 4  *
 5  * @param data  the bytes to write
 6  * @param seqNo the sequence number associated with the operation
 7  * @return the location the bytes were written to
 8  * @throws IOException if writing to the translog resulted in an I/O exception
 9  */
10 public Translog.Location add(final BytesReference data, final long seqNo) throws IOException {
11     long bufferedBytesBeforeAdd = this.bufferedBytes;
12     if (bufferedBytesBeforeAdd >= forceWriteThreshold) {
13         writeBufferedOps(Long.MAX_VALUE, bufferedBytesBeforeAdd >= forceWriteThreshold * 4);
14     }
15 
16     final Translog.Location location;
17     synchronized (this) {
18         ensureOpen();
19         if (buffer == null) {
20             buffer = new ReleasableBytesStreamOutput(bigArrays);
21         }
22         assert bufferedBytes == buffer.size();
23         final long offset = totalOffset;
24         totalOffset += data.length();
25         data.writeTo(buffer);
26 
27         assert minSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
28         assert maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED || operationCounter == 0;
29 
30         minSeqNo = SequenceNumbers.min(minSeqNo, seqNo);
31         maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo);
32 
33         nonFsyncedSequenceNumbers.add(seqNo);
34 
35         operationCounter++;
36 
37         assert assertNoSeqNumberConflict(seqNo, data);
38 
39         location = new Translog.Location(generation, offset, data.length());
40         bufferedBytes = buffer.size();
41     }
42 
43     return location;
44 }

异步刷盘的定时任务,这种文件写盘的套路都一样,或异步刷盘,或针对每个请求进行刷盘(也就是同步写入),异步刷盘的默认间隔是 5 s

  1 // org.elasticsearch.index.IndexService.AsyncTranslogFSync
  2 /**
  3  * FSyncs the translog for all shards of this index in a defined interval.
  4  */
  5 static final class AsyncTranslogFSync extends BaseAsyncTask {
  6 
  7     AsyncTranslogFSync(IndexService indexService) {
  8         super(indexService, indexService.getIndexSettings().getTranslogSyncInterval());
  9     }
 10 
 11     @Override
 12     protected String getThreadPool() {
 13         return ThreadPool.Names.FLUSH;
 14     }
 15 
 16     @Override
 17     protected void runInternal() {
 18         indexService.maybeFSyncTranslogs();
 19     }
 20 
 21     void updateIfNeeded() {
 22         final TimeValue newInterval = indexService.getIndexSettings().getTranslogSyncInterval();
 23         if (newInterval.equals(getInterval()) == false) {
 24             setInterval(newInterval);
 25         }
 26     }
 27 
 28     @Override
 29     public String toString() {
 30         return "translog_sync";
 31     }
 32 }
 33 
 34 // org.elasticsearch.index.IndexService#maybeFSyncTranslogs
 35 private void maybeFSyncTranslogs() {
 36     if (indexSettings.getTranslogDurability() == Translog.Durability.ASYNC) {
 37         for (IndexShard shard : this.shards.values()) {
 38             try {
 39                 if (shard.isSyncNeeded()) {
 40                     shard.sync();
 41                 }
 42             } catch (AlreadyClosedException ex) {
 43                 // fine - continue;
 44             } catch (IOException e) {
 45                 logger.warn("failed to sync translog", e);
 46             }
 47         }
 48     }
 49 }
 50 
 51 // org.elasticsearch.index.shard.IndexShard#sync()
 52 public void sync() throws IOException {
 53     verifyNotClosed();
 54     getEngine().syncTranslog();
 55 }
 56 
 57 
 58 // org.elasticsearch.index.engine.InternalEngine#syncTranslog
 59 @Override
 60 public void syncTranslog() throws IOException {
 61     translog.sync();
 62     revisitIndexDeletionPolicyOnTranslogSynced();
 63 }
 64     
 65 // org.elasticsearch.index.translog.TranslogWriter#syncUpTo
 66 /**
 67  * Syncs the translog up to at least the given offset unless already synced
 68  *
 69  * @return <code>true</code> if this call caused an actual sync operation
 70  */
 71 final boolean syncUpTo(long offset) throws IOException {
 72     if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
 73         synchronized (syncLock) { // only one sync/checkpoint should happen concurrently but we wait
 74             if (lastSyncedCheckpoint.offset < offset && syncNeeded()) {
 75                 // double checked locking - we don't want to fsync unless we have to and now that we have
 76                 // the lock we should check again since if this code is busy we might have fsynced enough already
 77                 final Checkpoint checkpointToSync;
 78                 final List<Long> flushedSequenceNumbers;
 79                 final ReleasableBytesReference toWrite;
 80                 try (ReleasableLock toClose = writeLock.acquire()) {
 81                     synchronized (this) {
 82                         ensureOpen();
 83                         checkpointToSync = getCheckpoint();
 84                         toWrite = pollOpsToWrite();
 85                         if (nonFsyncedSequenceNumbers.isEmpty()) {
 86                             flushedSequenceNumbers = null;
 87                         } else {
 88                             flushedSequenceNumbers = nonFsyncedSequenceNumbers;
 89                             nonFsyncedSequenceNumbers = new ArrayList<>(64);
 90                         }
 91                     }
 92 
 93                     try {
 94                         // Write ops will release operations.
 95                         writeAndReleaseOps(toWrite);
 96                         assert channel.position() == checkpointToSync.offset;
 97                     } catch (final Exception ex) {
 98                         closeWithTragicEvent(ex);
 99                         throw ex;
100                     }
101                 }
102                 // now do the actual fsync outside of the synchronized block such that
103                 // we can continue writing to the buffer etc.
104                 try {
105                     assert lastSyncedCheckpoint.offset != checkpointToSync.offset || toWrite.length() == 0;
106                     if (lastSyncedCheckpoint.offset != checkpointToSync.offset) {
107                         channel.force(false);
108                     }
109                     writeCheckpoint(checkpointChannel, checkpointPath, checkpointToSync);
110                 } catch (final Exception ex) {
111                     closeWithTragicEvent(ex);
112                     throw ex;
113                 }
114                 if (flushedSequenceNumbers != null) {
115                     flushedSequenceNumbers.forEach(persistedSequenceNumberConsumer::accept);
116                 }
117                 assert lastSyncedCheckpoint.offset <= checkpointToSync.offset
118                     : "illegal state: " + lastSyncedCheckpoint.offset + " <= " + checkpointToSync.offset;
119                 lastSyncedCheckpoint = checkpointToSync; // write protected by syncLock
120                 return true;
121             }
122         }
123     }
124     return false;
125 }

接下来探究 translog 的读取,什么情况下会读取 translog,一种是灾难恢复时,如果 lucene 数据还没有持久化而恰好 ES 停机,重启后会从 translog 中恢复数据(如果 translog 持久化了);还有一种情况是基于 id 的实时查询,如果我们新写入一个文档,立即通过其他条件而非 id 查询,这时是查不到文档的,但如果通过 id 查询,则可以查询到,因为此时从 translog 中可以查找到最新的文档数据。

trasnlog 的读取:

// org.elasticsearch.index.translog.Translog#readOperation(org.elasticsearch.index.translog.Translog.Location)
/**
 * Reads and returns the operation from the given location if the generation it references is still available. Otherwise
 * this method will return <code>null</code>.
 */
public Operation readOperation(Location location) throws IOException {
    try (ReleasableLock ignored = readLock.acquire()) {
        ensureOpen();
        if (location.generation < getMinFileGeneration()) {
            return null;
        }
        if (current.generation == location.generation) {
            // no need to fsync here the read operation will ensure that buffers are written to disk
            // if they are still in RAM and we are reading onto that position
            return current.read(location);
        } else {
            // read backwards - it's likely we need to read on that is recent
            for (int i = readers.size() - 1; i >= 0; i--) {
                TranslogReader translogReader = readers.get(i);
                if (translogReader.generation == location.generation) {
                    return translogReader.read(location);
                }
            }
        }
    } catch (final Exception ex) {
        closeOnTragicEvent(ex);
        throw ex;
    }
    return null;
}

从上文的 transllog 写入方法返回的 Translog.Location 对象,此时派上用场,根据 Location 去 translog 的指定位置读取数据,反查出 Operation。

查看 ES getById 的方法,在 16 行取出 versionValue,针对新增数据是 IndexVersionValue,在第 45 行指定 Location 读取 translog

 1 // org.elasticsearch.index.engine.InternalEngine#get
 2 @Override
 3 public GetResult get(
 4     Get get,
 5     MappingLookup mappingLookup,
 6     DocumentParser documentParser,
 7     Function<Engine.Searcher, Engine.Searcher> searcherWrapper
 8 ) {
 9     assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
10     try (ReleasableLock ignored = readLock.acquire()) {
11         ensureOpen();
12         if (get.realtime()) {
13             final VersionValue versionValue;
14             try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
15                 // we need to lock here to access the version map to do this truly in RT
16                 versionValue = getVersionFromMap(get.uid().bytes());
17             }
18             if (versionValue != null) {
19                 if (versionValue.isDelete()) {
20                     return GetResult.NOT_EXISTS;
21                 }
22                 if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
23                     throw new VersionConflictEngineException(
24                         shardId,
25                         "[" + get.id() + "]",
26                         get.versionType().explainConflictForReads(versionValue.version, get.version())
27                     );
28                 }
29                 if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
30                     && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {
31                     throw new VersionConflictEngineException(
32                         shardId,
33                         get.id(),
34                         get.getIfSeqNo(),
35                         get.getIfPrimaryTerm(),
36                         versionValue.seqNo,
37                         versionValue.term
38                     );
39                 }
40                 if (get.isReadFromTranslog()) {
41                     // this is only used for updates - API _GET calls will always read form a reader for consistency
42                     // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
43                     if (versionValue.getLocation() != null) {
44                         try {
45                             final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
46                             if (operation != null) {
47                                 return getFromTranslog(get, (Translog.Index) operation, mappingLookup, documentParser, searcherWrapper);
48                             }
49                         } catch (IOException e) {
50                             maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
51                             throw new EngineException(shardId, "failed to read operation from translog", e);
52                         }
53                     } else {
54                         trackTranslogLocation.set(true);
55                     }
56                 }
57                 assert versionValue.seqNo >= 0 : versionValue;
58                 refreshIfNeeded("realtime_get", versionValue.seqNo);
59             }
60             return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
61         } else {
62             // we expose what has been externally expose in a point in time snapshot via an explicit refresh
63             return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper), false);
64         }
65     }
66 }
 1 final class IndexVersionValue extends VersionValue {
 2 
 3     private static final long RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(IndexVersionValue.class);
 4 
 5     private final Translog.Location translogLocation;
 6 
 7     IndexVersionValue(Translog.Location translogLocation, long version, long seqNo, long term) {
 8         super(version, seqNo, term);
 9         this.translogLocation = translogLocation;
10     }
11 
12     @Override
13     public long ramBytesUsed() {
14         return RAM_BYTES_USED + RamUsageEstimator.shallowSizeOf(translogLocation);
15     }
16 
17     @Override
18     public boolean equals(Object o) {
19         if (this == o) return true;
20         if (o == null || getClass() != o.getClass()) return false;
21         if (super.equals(o) == false) return false;
22         IndexVersionValue that = (IndexVersionValue) o;
23         return Objects.equals(translogLocation, that.translogLocation);
24     }
25 
26     @Override
27     public int hashCode() {
28         return Objects.hash(super.hashCode(), translogLocation);
29     }
30 
31     @Override
32     public String toString() {
33         return "IndexVersionValue{" + "version=" + version + ", seqNo=" + seqNo + ", term=" + term + ", location=" + translogLocation + '}';
34     }
35 
36     @Override
37     public Translog.Location getLocation() {
38         return translogLocation;
39     }
40 }

最后还有一块 checkpoint 文件,checkpoint 文件记录了 translog 的元数据信息,指导 ES 从 translog 的哪个位置开始重放数据

 1 // org.elasticsearch.index.engine.InternalEngine#recoverFromTranslogInternal
 2 private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
 3     final int opsRecovered;
 4     final long localCheckpoint = getProcessedLocalCheckpoint();
 5     if (localCheckpoint < recoverUpToSeqNo) {
 6         try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
 7             opsRecovered = translogRecoveryRunner.run(this, snapshot);
 8         } catch (Exception e) {
 9             throw new EngineException(shardId, "failed to recover from translog", e);
10         }
11     } else {
12         opsRecovered = 0;
13     }
14     // flush if we recovered something or if we have references to older translogs
15     // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
16     assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
17     pendingTranslogRecovery.set(false); // we are good - now we can commit
18     logger.trace(
19         () -> format(
20             "flushing post recovery from translog: ops recovered [%s], current translog generation [%s]",
21             opsRecovered,
22             translog.currentFileGeneration()
23         )
24     );
25     flush(false, true);
26     translog.trimUnreferencedReaders();
27 }

 

标签:事务,return,operation,get,translog,Elasticsearch,日志,versionValue,final
From: https://www.cnblogs.com/allenwas3/p/18256244

相关文章

  • 一行超长日志引发的 “血案” - Containerd 频繁 OOM 背后的真相
    案发现场:混沌初现2024年6月10日,本应是平静的一天。但从上午9点开始,Sealos公有云的运维监控告警就开始不停地响。北京可用区服务器节点突然出现大量“notready”告警,紧接着,系统自动触发004节点重启,让服务暂时恢复了正常。就在我以为这只是个小插曲的时候,7分钟后,广州可用......
  • C语言开发日志,问题记录(长期更新版本)
    一、程序存储与占用1.编译后内存分配编译后的Code(代码),RO-data(只读,譬如const),RW-data(读写,初始化非0的全局变量),存储在ROM(flash)中,ZI-data(初始化为0或者未初始化的变量),运行时ROM占用是Code+RO-data+RW-data运行时RAM占用是RO-data+RW-data+ZI-data;RW和ZI会被......
  • 事务&AOP
    事物管理事务管理是指对一系列数据库操作进行管理,确保这些操作要么全部成功执行,要么在遇到错误时全部回滚,以维护数据的一致性和完整性。在多用户并发操作和大数据处理的现代软件开发领域中,事务管理已成为确保数据一致性和完整性的关键技术之一。基本概念定义:事务是由N步数据......
  • 一文理清GO语言日志库实现开发项目中的日志功能(rotatelogs/zap分析)
    一文理清GO语言日志库实现开发项目中的日志功能(rotatelogs/zap分析)rotatelogsrotatelogs是一个用于管理日志文件的Go语言库,它提供了自动轮换、压缩和删除旧日志文件的功能。这个库可以帮助你更好地管理和维护你的应用程序日志。要使用rotatelogs,你需要先安装它:goget......
  • springboot 使用 doris-streamloader 到doris 防止批量更新 事务卡主
    背景:使用mybatis批量实时和更新doris时经常出现连接不上的错误,导致kafka死信队列堆积很多滞后消费https://doris.apache.org/zh-CN/docs/2.0/ecosystem/doris-streamloader/packagecom.jiaoda.sentiment.data.etl.service.update;importcn.hutool.core.text.CharSequenc......
  • Dozzle-解决通过命令方式查看Docker 日志的神器
    对于程序员们来说,Docker一定是不陌生了。Docker为我们的工作带来的巨大的便利,你可以使用它快速部署和扩展应用程序,并保证隔离性和可移植性,使应用程序在容器内独立运行,而且可以轻松地在不同的主机和操作系统上移植。Docker还简化了开发环境的配置和协作,开发人员可以使用相同的容......
  • DolphinScheduler日志乱码、worker日志太多磁盘报警、版本更新导致不兼容怎么办?
    作者|刘宇星本文作者总结了在使用ApacheDolphinScheduler过程中遇见过的常见问题及其解决方案,包括日志出现乱码、worker日志太多磁盘报警、版本更新导致不兼容问题等,快来看看有没有困扰你想要的答案吧!DolphinScheduler集群环境有多台worker(worker1,worker2,worker3),多个......
  • 【Spring注解】事务注解@Transactional
    @Transactional作用:就是在当前这个方法执行开始之前来开启事务,方法执行完毕之后提交事务。如果在这个方法执行的过程当中出现了异常,就会进行事务的回滚操作@Transactional注解书写位置:方法当前方法交给spring进行事务管理类当前类中所有的方法都交由spring进行事务管......
  • 日志监测与文件句柄数监控推送脚本
    点击查看代码#!/bin/bashecho`date`#获取最新的错误计数new_error_count_8080=$(grep"Toomanyopenfiles"/var/log/router/8080/error.log|wc-l)new_error_count_8181=$(grep"Toomanyopenfiles"/var/log/router/8181/error.log|wc-l)......
  • java 使用Log4j进行日志记录
    要在Java项目中使用Log4j进行日志记录,需要经过以下步骤:添加Log4j依赖:在项目的pom.xml文件中,添加Log4j依赖。例如:<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>创建Log4j配置文件:......