如果一个事务没有进行INSERT、UPDATE、DELETE操作,那么就步会分配事务ID,但事务仍然用一个虚拟事务ID代表自己。虚拟事务ID由两部分组成,第一部分是Backend ID,另一个是每个会话自己维护的本地事务ID计数器。通过两部分组合,能保证这个虚拟事务ID的唯一性。在PostgreSQL数据库IPC——SI Message Queue中描述了Backend ID和local transaction id的产生流程。
StartTransaction流程中获取virtual transaction id
StartTransaction作为底层事务状态机的驱动函数,前半段函数主要是初始化TransactionState和VirtualTransactionId,其vxid.backendId就是取值自MyBackendId,vxid.localTransactionId取值自GetNextLocalTransactionId函数。VirtualXactLockTableInsert函数修改PGPROC结构体中的fpVXIDLock为true【are we holding a fast-path VXID lock?】;fpLocalTransactionId为vxid.localTransactionId【lxid for fast-path VXID lock】。
static void StartTransaction(void) {
TransactionState s = &TopTransactionStateData; CurrentTransactionState = s; /* Let's just make sure the state stack is empty */
/* Set the current transaction state information appropriately during start processing. Note that once the transaction status is switched this process cannot fail until the user ID and the security context flags are fetched below. */
s->state = TRANS_START; s->transactionId = InvalidTransactionId; /* until assigned */
/* initialize current transaction state fields note: prevXactReadOnly is not used at the outermost level */
s->nestingLevel = 1; s->gucNestLevel = 1; s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0;
GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); /* Once the current user ID and the security context flags are fetched, both will be properly reset even if transaction startup fails. */
/* Make sure we've reset xact state variables
* If recovery is still in progress, mark this transaction as read-only. We have lower level defences in XLogInsert and elsewhere to stop us from modifying data during recovery, but this gives the normal indication to the user that the transaction is read-only. */
if (RecoveryInProgress()){ s->startedInRecovery = true; XactReadOnly = true;
} else { s->startedInRecovery = false; XactReadOnly = DefaultXactReadOnly; }
XactDeferrable = DefaultXactDeferrable;XactIsoLevel = DefaultXactIsoLevel;forceSyncCommit = false;
/* Disabled in GPDB as per comment in PrepareTransaction(). */
seqXlogWrite = false;
/* reinitialize within-transaction counters */
s->subTransactionId = TopSubTransactionId;
currentSubTransactionId = TopSubTransactionId; currentCommandId = FirstCommandId;
currentCommandIdUsed = false; currentSavepointTotal = 0;
fastNodeCount = 0; previousFastLink = NULL;
/* initialize reported xid accounting */
nUnreportedXids = 0;
s->didLogXid = false;
TopXactexecutorDidWriteXLog = false;
/* must initialize resource-management stuff first */
AtStart_Memory(); AtStart_ResourceOwner();
VirtualTransactionId vxid;
/* Assign a new LocalTransactionId, and combine it with the backendId to form a virtual transaction id. */
vxid.backendId = MyBackendId;
vxid.localTransactionId = GetNextLocalTransactionId();
/*
* Lock the virtual transaction id before we announce it in the proc array
*/
VirtualXactLockTableInsert(vxid);
/*
* Advertise it in the proc array. We assume assignment of
* LocalTransactionID is atomic, and the backendId should be set already.
*/
Assert(MyProc->backendId == vxid.backendId);
MyProc->lxid = vxid.localTransactionId;
TRACE_POSTGRESQL_TRANSACTION_START(vxid.localTransactionId);
VirtualXactLock
平时查询锁信息时,执行select * from pg_locks
,如果会话已经没有当前打开的事务,则连接不会创建事务,而是隐式发出。查询select * from pg_locks
不能是中立的观察者,因为它需要自己使用事务和锁定pg_locks。因此,此即使数据库没有其他活跃连接,该查询始终报告至少两个条目,如下例所示。我们看第二个条码其锁类型就是virtualxid,锁模式是ExclusiveLock。
test=> \x
test=> SELECT relation::regclass AS relname, * FROM pg_locks;
-[ RECORD 1 ]------+----------------
relname | pg_locks
locktype | relation
database | 113270
relation | 11000
page |
tuple |
virtualxid |
transactionid |
classid |
objid |
objsubid |
virtualtransaction | 2/5789
pid | 31376
mode | AccessShareLock
granted | t
-[ RECORD 2 ]------+----------------
relname |
locktype | virtualxid
database |
relation |
page |
tuple |
virtualxid | 2/5789
transactionid |
classid |
objid |
objsubid |
virtualtransaction | 2/5789
pid | 31376
mode | ExclusiveLock
granted | t
VirtualXactLockTableInsert函数通过fast-path获取vxid锁(Take vxid lock via the fast-path)。 There can’t be any pre-existing lockers, as we haven’t advertised this vxid via the ProcArray yet. Since MyProc->fpLocalTransactionId will normally contain the same data as MyProc->lxid, you might wonder if we really need both. The difference is that MyProc->lxid is set and cleared unlocked, and examined by procarray.c, while fpLocalTransactionId is protected by backendLock and is used only by the locking subsystem. Doing it this way makes it easier to verify that there are no funny race conditions. We don’t bother recording this lock in the local lock table, since it’s only ever released at the end of a transaction. Instead, LockReleaseAll() calls VirtualXactLockTableCleanup(). 不可能有任何预先存在的lockers,因为我们还没有通过ProcArray声明这个vxid。由于MyProc->fpLocalTransactionId通常包含与MyProc->lxid相同的数据,您可能会怀疑我们是否真的需要两者。不同的是,MyProc->lxid被设置并清除为解锁,并由procarray.c检查,而fpLocalTransactionId受backendLock保护,仅由锁定子系统(the locking subsystem)使用。这样做可以更容易地验证是否存在有趣的比赛条件。我们不需要在本地锁表中记录这个锁,因为它只在事务结束时释放。相反,LockReleaseAll()调用VirtualXactLockTableCleanup()。
void VirtualXactLockTableInsert(VirtualTransactionId vxid) {
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
MyProc->fpVXIDLock = true; MyProc->fpLocalTransactionId = vxid.localTransactionId;
LWLockRelease(MyProc->backendLock);
}
VirtualXactLock函数将形式上的vxid锁被转换成了main lock table。如果wait为true,则函数会等待直到给定的VXID被释放,然后返回true;如果wait为false,仅仅检查VXID是否还在运行,然后返回true或者false。该函数执行的流程如下:
- 首先通过vxid.backendId获取其对应的PGPROC,如果PGPROC为null,说明该后端进程已经结束,直接返回true。
- 获取proc的backendLock,检查请求的vxid是否为查找到的proc对应的vxid。如果不同,说明该后端进程已经结束,直接返回true。
- 如果指定wait为false,就是上述流程,仅仅检查VXID对应的后端进程是否还在运行,返回false,说明还在运行。
- 如果指定wait为true,需要将fast-path lock形式锁转换为常规锁【we’re going to need to sleep on the VXID. But first, we must set up the primary lock table entry, if needed (ie, convert the proc’s fast-path lock on its VXID to a regular lock)】,并将proc->fpVXIDLock清理。利用
LockAcquire(&tag, ShareLock, false, false)
函数抢占该常规锁,抢占到后即进行释放,最后直接返回true,说明该进程不在了。
/* VirtualXactLock
* If wait = true, wait until the given VXID has been released, and then return true.
* If wait = false, just check whether the VXID is still running, and return true or false. */
bool VirtualXactLock(VirtualTransactionId vxid, bool wait) {
LOCKTAG tag; SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
/* If a lock table entry must be made, this is the PGPROC on whose behalf it must be done. Note that the transaction might end or the PGPROC might be reassigned to a new backend before we get around to examining it, but it doesn't matter. If we find upon examination that the relevant lxid is no longer running here, that's enough to prove that it's no longer running anywhere. */
PGPROC *proc = BackendIdGetProc(vxid.backendId);
if (proc == NULL) return true;
/* We must acquire this lock before checking the backendId and lxid against the ones we're waiting for. The target backend will only set or clear lxid while holding this lock. */
LWLockAcquire(proc->backendLock, LW_EXCLUSIVE);
/* If the transaction has ended, our work here is done. */
if (proc->backendId != vxid.backendId || proc->fpLocalTransactionId != vxid.localTransactionId) {
LWLockRelease(proc->backendLock); return true;
}
/* If we aren't asked to wait, there's no need to set up a lock table entry. The transaction is still in progress, so just return false. */
if (!wait) {
LWLockRelease(proc->backendLock); return false;
}
/* OK, we're going to need to sleep on the VXID. But first, we must set up the primary lock table entry, if needed (ie, convert the proc's fast-path lock on its VXID to a regular lock). */
if (proc->fpVXIDLock){
PROCLOCK *proclock;uint32 hashcode;LWLock *partitionLock;
hashcode = LockTagHashCode(&tag);partitionLock = LockHashPartitionLock(hashcode);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, &tag, hashcode, ExclusiveLock);
if (!proclock){
LWLockRelease(partitionLock);LWLockRelease(proc->backendLock);
ereport(ERROR,(errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), errhint("You might need to increase max_locks_per_transaction.")));
}
GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
LWLockRelease(partitionLock);
proc->fpVXIDLock = false;
}
/* Done with proc->fpLockBits */
LWLockRelease(proc->backendLock);
/* Time to wait. */
(void) LockAcquire(&tag, ShareLock, false, false);
LockRelease(&tag, ShareLock, false);
return true;
}
VirtualXactLockTableCleanup函数Check whether a VXID lock has been materialized; if so, release it, unblocking waiters. 首先清理VirtualXactLockTableInsert设置的两个成员;如果fastpath为false,lxid为有效的本地事务id,说明形式上的vxid锁被转换成了main lock table,需要调用LockRefindAndRelease函数解锁。
void VirtualXactLockTableCleanup(void) {
/* Clean up shared memory state. */
LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE);
bool fastpath = MyProc->fpVXIDLock; LocalTransactionId lxid = MyProc->fpLocalTransactionId;
MyProc->fpVXIDLock = false; MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
LWLockRelease(MyProc->backendLock);
/* If fpVXIDLock has been cleared without touching fpLocalTransactionId, that means someone transferred the lock to the main lock table. */
if (!fastpath && LocalTransactionIdIsValid(lxid)) {
VirtualTransactionId vxid; LOCKTAG locktag;
vxid.backendId = MyBackendId; vxid.localTransactionId = lxid;
SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc, &locktag, ExclusiveLock, false);
}
}
#define SET_LOCKTAG_VIRTUALTRANSACTION(locktag,vxid) \
((locktag).locktag_field1 = (vxid).backendId, (locktag).locktag_field2 = (vxid).localTransactionId, (locktag).locktag_field3 = 0, (locktag).locktag_field4 = 0, (locktag).locktag_type = LOCKTAG_VIRTUALTRANSACTION, (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
VirtualXactLockTableInsert在StartTransaction和StarupXLOG处调用,就是为了获取virtual transaction id,然后设置fast-path lock形式上的锁。从下图可以看出GetLockStatusData也是依据fast-path lock来组装LockInstanceData,然后可以通过查询pg_locks来查看该条LockInstanceData。
调用VirtualXactLock的函数,其中WaitForLockersMultiple和DefineIndex是为了确保用于指定vxid的进程已经退出,而ResolveRecoveryConflictWithVirtualXIDs仅仅是为了检查指定vxid进程是否退出。DefineIndex函数使用VirtualXactLock是为了The index is now valid in the sense that it contains all currently interesting tuples. But since it might not contain tuples deleted just before the reference snap was taken, we have to wait out any transactions that might have older snapshots. Obtain a list of VXIDs of such transactions, and wait for them individually。WaitForLockersMultiple函数主要用于等待直到指定锁类型和锁模式的锁上没其他事务抢占【Wait until no transaction holds locks that conflict with the given locktags at the given lockmode.】,使用VirtualXactLock实现等待指定vxid对应后端进程结束的功能。ResolveRecoveryConflictWithVirtualXIDs函数使用VirtualXactLock函数用于确保和recovery进程冲突的后端进程都已经结束。
StartTransaction --> VirtualXactLockTableInsert
StarupXLOG --> InitRecoveryTransactionEnvironment --> VirtualXactLockTableInsert
WaitForLockers --> WaitForLockersMultiple --> VirtualXactLock(xxx, true)
ResolveRecoveryConflictWithSnapshot/ResolveRecoveryConflictWithTablespace/ResolveRecoveryConflictWithLock --> ResolveRecoveryConflictWithVirtualXIDs --> VirtualXactLock(xxx, false)
DefineIndex --> VirtualXactLock(xxx, true)
LockReleaseAll --> VirtualXactLockTableCleanup
StarupXLOG --> ShutdownRecoveryTransactionEnvironment --> VirtualXactLockTableCleanup