etcdv3.6源码流程---compact

发布于:2024-05-08 ⋅ 阅读:(39) ⋅ 点赞:(0)

总括:

因为bbolt数据库的内容中:key=(revision,version),value=(elemkey,elemvalue),treeIndex中key=elemkey,value=[]generation,而generation包含一个key的所有版本,也就是说treeIndex中的value就是bbolt中的key。因为treeIndex中包含所有的elemkey,所以压缩的思路就是先遍历整个treeIndex来进行压缩:没有被删除的elemkey就用最后一次写入的值来覆盖,然后把该elemkey对应的(revision,version)作为key保存到一个map keep中,被删除的key则不会保存到该map keep中,然后遍历数据库中所有key,如果该key即(revision,version)不在map keep中,那么就从数据库中删除该key,如果在,则跳过,然后就完成了整个压缩操作。总结就是compact一共两步,第一步压缩treeIndex,第二步压缩数据库。

一点杂记:

1:压缩treeIndex和压缩数据库是两个分开的操作。
2:除了compact(defrag操作不知道)操作,所有其他的修改操作比如put/delete都是一个只增不减的过程。
3:put/delete操作也分两步,这两步也是分开的。第一步是往treeIndex中增加数据。第二步是把事务提交到txnbuffer。
4:compact对数据库的修改和put/delete一样,都是把操作丢到batchTxnBuffere就认为成功了
5:读写可以并发,即一边读数据库,一边把修改操作丢到batchTxnBuffere,但是在提交batchTxnBuffere之前会等待所有的读完成之后才会继续提交

debug流程:

1:启动参数添加--auto-compaction-retention=1
2:修改代码,把等待间隔缩小,否则有些流程可能不会走到或者要隔很久才会走到,因为他内部单位是小时

流程:

他是单独开了一个线程每隔一段时间就compact一下


etcdserver.NewServer #创建一个etcdserver
  if num := cfg.AutoCompactionRetention; num != 0:            #需要配置auto-compaction-retention才会开启compact,默认是不开启
    v3compactor.New #创建一个compact对象
    v3compactor.Periodic.Run                                   #启动compact线程,内部一个死循环,每隔一段时间就执行一次compaction
      for{ #定时循环
        sleep
        mvcc.readView.Rev                                      #获取compact开始时etcd系统当前最新的版本号即currentRev,保存为变量compactRev
                                                               #currentRev+1就是下一个待提交的版本号
                                                               #不知道为什么他还要专门创建一个readTx,这么麻烦,为什么不直接读这个字段就行了?
        etcdserver.EtcdServer.Compact
          etcdserver.EtcdServer.processInternalRaftRequestOnce #因为compact操作会改变集群状态,所以要走一遍raft流程,
                                                               #记录一条compact日志到wal日志,然后etcdserver apply这条日志
                                                               #因为改变数据库的操作有很多,比如delete、put、compact,
                                                               #所以etcdserver apply的时候会根据data字段中请求的类型来进行一个dispatch操作

          # compact的raft流程同put,因为走raft流程时,raft是不关心日志内容的,
          # 所以put、delete、compact等都用的同一套流程,同一套代码,
          # 只有在apply的时候需要根据日志类型来进行一个dispatch操作,所以这里就直接跳到apply了
          ......raft 流程,与put一模一样, 略......                                                                        ,
          return
      }
      another thread 1{                                              #就是etcdserver进行apply的流程,只不过dispatch时是compact
        etcdserver.EtcdServer.run
            for
              select:
                case ap <- applyc
                  ......略去了一大串apply调用,直接来到dispatch......
                  apply.uberApplier.dispatch                         #根据req类型来调用对应的方法
                    case r.Compaction:                               #如果这条日志是一个compact请求
                      apply.applyV3.Compaction
                        apply.applierV3backend.Compaction
                          mvcc.store.Compact
    	                    s.mu.Lock()                              #源码注释中说对于事务加读锁,对于非事务如compact/defrag则加写锁
                                                                     #还没完全搞懂  
                                                                   
                            mvcc.store.checkPrevCompactionCompleted  #检查上一次compact操作是否完成,etcd compact的时候会往数据库里写一些数据,
                                                                     #所以compact之前读一下这些数据,即读一下scheduledCompactRev和finishedCompactRev,
                                                                     #如果读到的这两个值相同说明上一次compact操作已经完成,可再次compact
                                                                     #scheduledCompactRev表示已经调度的最新的compact任务对应的revision
                                                                     #finishedCompactRev表示已完成的最新的compact任务对应的revision
                                                                     #因为他是fifo调度,因为先触发的copmact任务的revision必定小于后出发的revision                                                      
                                                                     #所以只需要判断这两个值是否相等就可以判断之前的compact任务是否完成了

                            mvcc.store.updateCompactRev              #往bbolt数据库写入本次compact对应的版本号,即写入scheduedCompactRev=本次compactRev
                            s.mu.Unlock                              #释放数据库写锁

                            mvcc.store.compact                       #执行压缩
                              schedule.NewJob                        #compact是作为job异步执行的,所以有可能上次还没结束就又来了一个新的compact请求
                                mvcc.store.scheduleCompaction        #1:先从treeIndex中删除,然后再遍历数据库中所有key,
                                                                     #删除所有不在压缩后的treeIndex中的(revision,version)
                                                                     #删除范围为revision属于1~compactRev的版本

                                  mvcc.treeIndex.Compact             #treeIndex包含所有elemKey,所以先遍历treeIndex的所有elemkey,
                                                                     #针对每个elemKey,删除他所有在compactRev之前的版本
                                                                     #来得到最终剩下的所有(revision,version)
                                                                     #如果key所有版本都没了,那就从treeIndex删除这个key

                                    mvcc.keyIndex.compact            #合并elemkey的generation,
                                                                     #如果key没被删除则保留最后一个(revision,version),如果被删除则删除
                                    btree.BTreeG[T].Delete           #如果该key被删除了,就从treeIndex中删除这个key

                                  for{                               #压缩完treeindex就开始压缩数据库,一次删除最多删除n个数据
                                                                     #!!!一次事务删除多少条数据也是可以配置的
                                                                     #他就是一个for循环,一批一批的删,当删完1批,释放锁,开始删下一批之前
                                                                     #这个时候是可以commit put/delete事务和rage的,个人猜测是避免一次性删除太多
                                                                     #导致put/delete长时间阻塞(个人猜测,不确定)

                                    backend.batchTx.LockOutsideApply #compact数据库之前禁止继续提交事务(主要是put/delete)
                                                                     #compact操作和其他put/delete一样,都会竞争这个batchTxnBuffer
                                                                     #此时treeIndex是没有锁的,所以put/delete/range是可以正常读取和修改treeIndex
                                                                  
                                      backend.batchTx.lock           #apply的时候会修改batchTxBuffer,会先申请batchTx锁
                                                                     #compact的时候会锁住buffer,从而禁止compact期间提交apply事务

                                    backend.batchTx.UnsafeRange      #读出所有key即(revision,version),因为此时已经获取了事务锁,
                                                                     #不用担心其他人修改,所以unsafe读
                                    if key not in keep               #如果该key不在合并后的treeIndex中,那么就从数据库中删除该key
                                      batchTxBuffered.UnsafeDelete   
                                   
                                    if cur<batchNum :                #如果本次处理的小于batch大小,说明所有的都删除了,可以结束本次compact
                                      mvcc.UnsafeSetFinishedCompact  #往数据库写一条数据即finishedCompactRev=compactRev,标记本次compact已经完成
                                      backend.batchTx.Unlock         #解锁
                                      return                         #结束本次compact。
                                                                     #!!此处没有forceCommite,也就是最后这一批对应的事务没有强制提交就返回了
                                                                     #!!因为和put/delete一样,compact把数据删除操作丢到txnbuffer就不管了
                                                                     #这个txnbuffer同一时刻只能有一个事务修改
                                                                     #由另一个线程定时commited。总的来说:txnbuffer就是一个缓存,任何事务修改操作,
                                                                     #只要把操作丢到这个txnbuffer后,该事务就可以认为该操作成功了,就可以直接返回了
                                                                     #备注:batchTxnBuffered和batchTx以及bbolt事务之间的关系我还云里雾里,还不懂
                                                                     #只知道batchTxnBuffered里有一个batchTx,然后所有操作都是转发给batchTx
                                                                     #笔记:写事务在开始之前会调用s.mu.RLock加读锁,然后把操作丢到txnbuffere的时候
                                                                     #对txnbuffer加锁,然后返回,然后释放s.mu.RUnlock,
                                                                     #这样从加读锁到释放读锁就标记着一个事务的完成,如果加了读锁,但是还没有释放读锁
                                                                     #说明数据库此时还有事务没有完成,在开始一次新的compact之前会对数据库加写锁
                                                                     #直到所有已有的事务完成才能成功加写锁,所以他就是用这个s.mu来判断是否有事务还没有完成
                                                                     #这个s.mu只有在triggersnapshot/compact中才会加写锁,其他地方都是加读锁(defrag不知道)
                                                                    

                                    else:                            #如果达到了CompactionBatchLimit,则立即提交本次事务,
                                                                     #因为配置文件中配置了允许单次事务compact的最大条数
                                      backend.backend.ForceCommit    
                                        backend.batchTxBuffered.Commit
                                          backend.batchTx.lock
                                            backend.batchTxBuffered.commit
                                              backend.readTx.Lock     #因为我们前面创建读事务的时候会给readTx加读锁,所有这里获取写锁会阻塞
                                                                      #直到所有读事务都完成,释放读锁后,这里才可以成功加写锁
                                                                      #前面已经调用LockOutsideApply禁止了其他写事务提交,所以这里不用担心并发写
                                                                      #读事务会先访问treeIndex然后才去数据库读,treeIndex内部会加锁,所以是并发安全的
                                                                      #compact是先压缩treeIndex,然后再去压缩数据库,如果压缩完treeIndex之后
                                                                      #在compact提交写事务前,来了读事务,那么读事务会先获取treeIndex,然后再去读数据库
                                                                      #compact会因获取不到readTx的写锁而阻塞在这里,
                                                                      #所以在compact提交前和提交后都可以安全的读,但是compact中不可以
                                          backend.batchTx.unlock

                                  }

      }