Postgresql源码(131)行锁的原理分析

发布于:2024-05-24 ⋅ 阅读:(28) ⋅ 点赞:(0)

相关
《Postgresql源码(124)两个事务更新同一行数据时的行为和原理分析》

0 简介&全文总结

行锁是一种用于控制并发访问的机制,可以确保同一时间只有一个事务可以修改或删除特定的行数据。本文对行锁的原理做一些分析。

总结

  • ExecInitLockRows为要加锁的每一张表拼一个ExecAuxRowMark结构,主要记录了哪张表、ctid在哪一列这两个信息。
  • 执行时,对每一个元组执行ExecLockRows,ExecLockRows拿到元组后,遍历ExecAuxRowMark结构,找到ctid开始加行锁。
  • 持锁者:通过ctid指向的行执行HeapTupleSatisfiesUpdate拿到行没有人更新过xmax,也就是xmax是干净的,加锁者会添加字的xid到xmax同时增加标记HEAP_XMAX_LOCK_ONLY。
  • 等锁者:通过ctid只想的行执行HeapTupleSatisfiesUpdate拿到的行发现有人更新了xmax,先去拿到tuple锁保证没人在更新了,然后再去拿xid锁开始等待,等持锁事务结束后,这里继续执行。

1 行锁的用法

Postgresql中行锁的冲突矩阵:Conflicting Row-Level Locks

Requested Lock Mode FOR KEY SHARE FOR SHARE FOR NO KEY UPDATE FOR UPDATE
FOR KEY SHARE X
FOR SHARE X X
FOR NO KEY UPDATE X X X
FOR UPDATE X X X X

下面分享两种用法:

  • 表连接+行锁。
  • 带子查询+行锁。

1.1 实例一:表连接+行锁

  • 注意表连接情况下,表ot和表it相关的行都会被锁住。
  • 看执行计划来判断。
drop table ot;
create table ot(a int primary key, b int);
insert into ot values (1,1),(2,1),(3,2);

drop table it;
create table it(b int);
insert into it values (1);

begin;

explain select * from ot, it where ot.b = it.b for update;
                               QUERY PLAN
-------------------------------------------------------------------------
 LockRows  (cost=338.29..1069.96 rows=28815 width=24)
   ->  Merge Join  (cost=338.29..781.81 rows=28815 width=24)
         Merge Cond: (ot.b = it.b)
         ->  Sort  (cost=158.51..164.16 rows=2260 width=14)
               Sort Key: ot.b
               ->  Seq Scan on ot  (cost=0.00..32.60 rows=2260 width=14)
         ->  Sort  (cost=179.78..186.16 rows=2550 width=10)
               Sort Key: it.b
               ->  Seq Scan on it  (cost=0.00..35.50 rows=2550 width=10)
(9 rows)

select * from ot, it where ot.b = it.b for update;
 a | b | b
---+---+---
 1 | 1 | 1
 2 | 1 | 1

1.2 实例二:带子查询+行锁

  • 注意:子查询的表it在独立的plan中(InitPlan 1),不会加行锁。
  • 看执行计划来判断有没有加行锁。
drop table ot;
create table ot(a int primary key, b int);
insert into ot values (1,1),(2,1),(3,2);

drop table it;
create table it(b int);
insert into it values (1);

begin;

explain select * from ot where b = (select b from it where b = 1) for update;
                         QUERY PLAN
------------------------------------------------------------
 LockRows  (cost=41.88..80.23 rows=11 width=14)
   InitPlan 1
     ->  Seq Scan on it  (cost=0.00..41.88 rows=13 width=4)
           Filter: (b = 1)
   ->  Seq Scan on ot  (cost=0.00..38.25 rows=11 width=14)
         Filter: (b = (InitPlan 1).col1)
(6 rows)

select * from ot where b = (select b from it where b = 1) for update;
 a | b
---+---
 1 | 1
 2 | 1

2 如何排查拿不到行锁?

以下面操作为例:

步骤 事务一 事务二 结果
1 begin; begin; 执行成功
2 select a+1 from ot where b = 1 for update; 执行成功
3 select a+1 from ot where a = 2 for update; 事务二等锁

查询锁视图:事务一持锁

select locktype,relation::regclass,page,tuple,virtualxid,transactionid,virtualtransaction,pid,mode,granted from pg_locks where pid = 941126 order by pid;
   locktype    | relation | page | tuple | virtualxid | transactionid | virtualtransaction |  pid   |     mode      | granted
---------------+----------+------+-------+------------+---------------+--------------------+--------+---------------+---------
 relation      | ot_pkey  |      |       |            |               | 13/2               | 941126 | RowShareLock  | t
 relation      | ot       |      |       |            |               | 13/2               | 941126 | RowShareLock  | t
 virtualxid    |          |      |       | 13/2       |               | 13/2               | 941126 | ExclusiveLock | t
 transactionid |          |      |       |            |           791 | 13/2               | 941126 | ExclusiveLock | t

查询锁视图:事务二等锁

select locktype,relation::regclass,page,tuple,virtualxid,transactionid,virtualtransaction,pid,mode,granted from pg_locks where pid = 941433 order by pid;
   locktype    | relation | page | tuple | virtualxid | transactionid | virtualtransaction |  pid   |        mode         | granted
---------------+----------+------+-------+------------+---------------+--------------------+--------+---------------------+---------
 relation      | ot_pkey  |      |       |            |               | 14/3               | 941433 | RowShareLock        | t
 relation      | ot       |      |       |            |               | 14/3               | 941433 | RowShareLock        | t
 virtualxid    |          |      |       | 14/3       |               | 14/3               | 941433 | ExclusiveLock       | t
 tuple         | ot       |    0 |     2 |            |               | 14/3               | 941433 | AccessExclusiveLock | t
 transactionid |          |      |       |            |           791 | 14/3               | 941433 | ShareLock           | f
  • 事务二在等锁,为什么事务二拿着tuple锁?2.2中解答。
  • 为什么不是tuple锁granted==false?因为事务中的所有锁的冲突,最终实现都是用transactionid来互斥的。

2 行锁的源码分析

两表连接为例分析行锁的执行流程。

explain select * from ot, it where ot.b = it.b for update;
                               QUERY PLAN
-------------------------------------------------------------------------
 LockRows  (cost=0.00..86806.90 rows=28815 width=24)
   ->  Nested Loop  (cost=0.00..86518.75 rows=28815 width=24)
         Join Filter: (ot.b = it.b)
         ->  Seq Scan on it  (cost=0.00..35.50 rows=2550 width=10)
         ->  Materialize  (cost=0.00..43.90 rows=2260 width=14)
               ->  Seq Scan on ot  (cost=0.00..32.60 rows=2260 width=14)

begin;
select * from ot, it where ot.b = it.b for update;
 a | b | b
---+---+---
 1 | 1 | 1
 2 | 1 | 1

2.1 ExecInitLockRows

功能:

  • 计划阶段的PlanRowMark转换为运行时的ExecRowMark。注意ExecRowMark是在InitPlan初始阶段生成的。
  • 然后继续生成ExecAuxRowMark,其中汇总记录了ExecRowMark和ctid列的列号等。
  • 最后将ExecAuxRowMark信息记录在链表中LockRowsState→lr_arowMarks,每个表放一个ExecAuxRowMark。
LockRowsState *
ExecInitLockRows(LockRows *node, EState *estate, int eflags)
{
	...
	ExecInitResultTypeTL(&lrstate->ps);
	
	...
	outerPlanState(lrstate) = ExecInitNode(outerPlan, estate, eflags);
	
	...
	foreach(lc, node->rowMarks)
	{
		PlanRowMark *rc = lfirst_node(PlanRowMark, lc);
		ExecRowMark *erm;
		ExecAuxRowMark *aerm;
	
		...
		
		erm = ExecFindRowMark(estate, rc->rti, false);
		aerm = ExecBuildAuxRowMark(erm, outerPlan->targetlist);

  • 注意这里会把四种行锁标记的markType记录到lr_arowMarks中。
  • 两种非行锁标记的类型传递给EvalPlanQual机制,这里就不关注了。
		if (RowMarkRequiresRowShareLock(erm->markType))
			lrstate->lr_arowMarks = lappend(lrstate->lr_arowMarks, aerm);
		else
			epq_arowmarks = lappend(epq_arowmarks, aerm);
	}

	...
	return lrstate;
}

当执行select ot.a from ot, it where ot.b = it.b for update;时:

  • 结果targetlist会有三列(a,ctid1,ctid2)
  • rti表明了在rangetable中的位置。
表ot的ExecRowMark
{ rowmark = {
    relation = ..., relid = 16384,    // ot表
    rti = 1, prti = 1, rowmarkId = 1, 
    markType = ROW_MARK_EXCLUSIVE, 
    strength = LCS_FORUPDATE, waitPolicy = LockWaitBlock,
    ermActive = false, 
    curCtid = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, 
    ermExtra = 0x0}, 
  ctidAttNo = 2,    // ctid列的位置在第二列上
  toidAttNo = 0, 
  wholeAttNo = 0}

表it的ExecRowMark
{ rowmark = {
    relation = ..., relid = 16389,    // ot表
    rti = 2, prti = 2, rowmarkId = 2, 
    markType = ROW_MARK_EXCLUSIVE, 
    strength = LCS_FORUPDATE, waitPolicy = LockWaitBlock,
    ermActive = false, 
    curCtid = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, 
    ermExtra = 0x0}, 
  ctidAttNo = 3,    // ctid列的位置在第三列上
  toidAttNo = 0, 
  wholeAttNo = 0}

当执行:select a,a,a,a,b from ot where a = 2 for update;时:

  • 结果targetlist会有6列(a,a,a,a,b,citd)
  • tid记录的是rangetable中的位置,和estate->es_rowmarks中的元素是一一对应的。
表it的ExecRowMark
{ rowmark = {
    relation = ..., relid = 16384,    // ot表
    rti = 1, prti = 1, rowmarkId = 1, 
    markType = ROW_MARK_EXCLUSIVE, 
    strength = LCS_FORUPDATE, waitPolicy = LockWaitBlock,
    ermActive = false, 
    curCtid = {ip_blkid = {bi_hi = 65535, bi_lo = 65535}, ip_posid = 0}, 
    ermExtra = 0x0}, 
  ctidAttNo = 6,    // ctid列的位置在第六列上
  toidAttNo = 0, 
  wholeAttNo = 0}

2.2 ExecLockRows

ExecLockRows核心是调用heap_lock_tuple函数完成具体的加锁操作,再调用heap_lock_tuple前,用上面拼好的lr_arowMarks链表中,拿到ExecAuxRowMark,进而ExecGetJunkAttribute拿到ctid列的值,因为后面锁是用ctid查行然后通过xmax和标记为来加锁的:

ExecLockRows
	foreach(lc, node->lr_arowMarks)
	{
		ExecAuxRowMark *aerm = (ExecAuxRowMark *) lfirst(lc);
		...
		datum = ExecGetJunkAttribute(slot,
									 aerm->ctidAttNo,
									 &isNull);
		...
		tid = *((ItemPointer) DatumGetPointer(datum));
		switch (erm->markType)
		{
			case ROW_MARK_EXCLUSIVE:
				lockmode = LockTupleExclusive;
				break;
			case ROW_MARK_NOKEYEXCLUSIVE:
				lockmode = LockTupleNoKeyExclusive;
				break;
			case ROW_MARK_SHARE:
				lockmode = LockTupleShare;
				break;
			case ROW_MARK_KEYSHARE:
				lockmode = LockTupleKeyShare;
				break;
			default:
				elog(ERROR, "unsupported rowmark type");
				lockmode = LockTupleNoKeyExclusive; /* keep compiler quiet */
				break;
		}

		lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
		if (!IsolationUsesXactSnapshot())
			lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;

		test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
								markSlot, estate->es_output_cid,
								lockmode, erm->waitPolicy,
								lockflags,
								&tmfd);

heap_lock_tuple函数中完成行锁的加锁动作,调用栈:

#0  heap_lock_tuple (relation=0x7fc1e4210608, tuple=0x2a22590, cid=0, mode=LockTupleExclusive, wait_policy=LockWaitBlock, follow_updates=true, buffer=0x7ffdcd9a796c,
    tmfd=0x7ffdcd9a7a50) at heapam.c:4315
#1  0x0000000000509737 in heapam_tuple_lock (relation=0x7fc1e4210608, tid=0x7ffdcd9a7a68, snapshot=0x28e5a20, slot=0x2a22540, cid=0, mode=LockTupleExclusive,
    wait_policy=LockWaitBlock, flags=3 '\003', tmfd=0x7ffdcd9a7a50) at heapam_handler.c:378
#2  0x0000000000790e43 in table_tuple_lock (rel=0x7fc1e4210608, tid=0x7ffdcd9a7a68, snapshot=0x28e5a20, slot=0x2a22540, cid=0, mode=LockTupleExclusive, wait_policy=LockWaitBlock,
    flags=3 '\003', tmfd=0x7ffdcd9a7a50) at ../../../src/include/access/tableam.h:1595
#3  0x000000000079131e in ExecLockRows (pstate=0x29c9420) at nodeLockRows.c:185
#4  0x000000000075f5c7 in ExecProcNodeFirst (node=0x29c9420) at execProcnode.c:464
#5  0x0000000000753380 in ExecProcNode (node=0x29c9420) at ../../../src/include/executor/executor.h:274
#6  0x0000000000755e7d in ExecutePlan (estate=0x29c9140, planstate=0x29c9420, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,
    direction=ForwardScanDirection, dest=0x29d0968, execute_once=true) at execMain.c:1646

heap_lock_tuple函数流程:

heap_lock_tuple
	*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
	page = BufferGetPage(*buffer);
	tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
	tuple->t_len = ItemIdGetLength(lp);
	tuple->t_tableOid = RelationGetRelid(relation);

	/* !重要 */
	result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer);

加行锁的事务,会给元组的tuple的xmax更新一个自己的事务ID,导致后续要给这一行加锁的事务,执行HeapTupleSatisfiesUpdate时返回TM_BeingModified,以为这一行被别人改了,需要进一步判断:

HeapTupleSatisfiesUpdate【加锁事务】 HeapTupleSatisfiesUpdate【等锁事务】
if (tuple->t_infomask & HEAP_XMAX_INVALID)
return TM_Ok;
if (TransactionIdIsInProgress(HeapTupleHeaderGetRawXmax(tuple)))
return TM_BeingModified;

加锁事务执行

  • compute_new_xmax_infomask,给new_infomask添加HEAP_XMAX_LOCK_ONLY标记(不管哪种行锁都加),和HEAP_XMAX_EXCL_LOCK标记(标记排他锁)。
  • HeapTupleHeaderSetXmax(tuple->t_data, xid),给行加上xmax(当前事务的XID)。
    在这里插入图片描述

等锁事务执行:

heap_lock_tuple
	else if (result == TM_BeingModified ...)
		xwait = HeapTupleHeaderGetRawXmax(tuple->t_data); // 776 持锁事务的ID
		infomask = tuple->t_data->t_infomask;             // 111000000
		infomask2 = tuple->t_data->t_infomask2;           // 10000000000010
		if (!skip_tuple_lock && !heap_acquire_tuplock(relation, tid, mode, wait_policy, &have_tuple_lock))
		switch (wait_policy)
			case LockWaitBlock: 
				XactLockTableWait(xwait, relation, &tuple->t_self, XLTW_Lock);
				break;
			...

注意:

  • 等锁事务先用heap_acquire_tuplock拿了一个行锁,注意这里是等锁的事务拿到了,不是持锁的事务拿的。这个行锁是防止其他事务再去更改这一行。等锁事务拿到行锁后可以在pg_locks中查到(看第二节)。
  • 等锁的时候继续执行XactLockTableWait才真正发生等待,这里等的是xid锁,xid是持锁的xid,含义是等持锁的事务退了,等锁事务就能继续执行了。