HBase高效并发锁:IdLock极简内存设计

发布于:2025-08-31 ⋅ 阅读:(25) ⋅ 点赞:(0)

IdLock 

核心目标是:允许大量并发线程根据一个数字 ID(long 类型)来获取锁,同时保持极低的内存开销。

在 HBase 的很多场景中,需要对某个资源进行加锁,而这个资源可以用一个数字 ID 来标识。例如,在 HFileReaderImpl 的 readBlock 方法中,为了防止多个线程同时加载同一个数据块(Block),就需要对这个块进行加-解锁操作。这个数据块在文件中的偏移量(offset)就是一个 long 类型的 ID。

如果为每一个可能存在的 offset 都创建一个 ReentrantLock 对象,当 HFile 很大、Block 很多时,会造成巨大的内存浪费。IdLock 就是为了解决这个问题而设计的。

设计思想与核心数据结构

IdLock 的设计思想可以概括为 “按需创建,用后即焚” 。它并不会预先为所有可能的 ID 创建锁对象,而是在线程需要锁的时候才动态创建,并且一旦锁不再被任何线程持有或等待,就立即销毁,回收内存。

其核心数据结构非常简单:

// ... existing code ...
public class IdLock {

// ... existing code ...
  /** An entry returned to the client as a lock object */
  public static final class Entry {
    private final long id;
    private int numWaiters;
    private boolean locked = true;
    private Thread holder;

    private Entry(long id, Thread holder) {
      this.id = id;
      this.holder = holder;
    }
// ... existing code ...
  }

  private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>();
// ... existing code ...
  • map: 一个 ConcurrentHashMap,这是整个机制的核心。它以 long 类型的 ID 为键,以一个内部类 Entry 对象为值。
  • Entry: 这个内部类扮演了“锁”的角色。它包含了锁的所有状态信息:
    • id: 该锁对应的数字 ID。
    • locked: 一个布尔值,表示这个锁当前是否被持有。
    • holder: 持有该锁的线程。
    • numWaiters: 正在等待获取这个锁的线程数量。

获取锁: getLockEntry(long id)

这是 IdLock 最复杂也最精妙的部分。我们来逐步分析一个线程获取锁的全过程。

// ... existing code ...
  public Entry getLockEntry(long id) throws IOException {
    Thread currentThread = Thread.currentThread();
    Entry entry = new Entry(id, currentThread);
    Entry existing;
    while ((existing = map.putIfAbsent(entry.id, entry)) != null) {
      synchronized (existing) {
        if (existing.locked) {
          ++existing.numWaiters; // Add ourselves to waiters.
          while (existing.locked) {
            try {
              existing.wait();
            } catch (InterruptedException e) {
              --existing.numWaiters; // Remove ourselves from waiters.
// ... existing code ...
              if (!existing.locked && existing.numWaiters == 0) {
                map.remove(existing.id);
              }
              throw new InterruptedIOException("Interrupted waiting to acquire sparse lock");
            }
          }

          --existing.numWaiters; // Remove ourselves from waiters.
          existing.locked = true;
          existing.holder = currentThread;
          return existing;
        }
        // If the entry is not locked, it might already be deleted from the
        // map, so we cannot return it. We need to get our entry into the map
        // or get someone else's locked entry.
      }
    }
    return entry;
  }
// ... existing code ...
  1. 乐观尝试:

    • 线程首先创建一个新的 Entry 对象 entry,并乐观地认为自己是第一个请求该 ID 锁的线程。
    • 它调用 map.putIfAbsent(entry.id, entry)。这是一个原子操作。
    • 成功情况 (最快路径): 如果 map 中不存在该 idputIfAbsent 会成功将 entry 放入 map 并返回 nullwhile 循环条件不满足,方法直接返回新创建的 entry。此时,该线程成功获取了锁,几乎没有竞争开销。
  2. 竞争与等待:

    • 失败情况: 如果 map 中已经存在该 id 对应的 Entry(由 existing 引用),putIfAbsent 会返回这个已存在的 Entry,而不会放入新的 entrywhile 循环条件满足,进入循环体。
    • 加锁 existing: 线程会对 existing 对象进行 synchronized 加锁。这确保了对同一个 Entry 状态的修改是线程安全的。
    • 检查锁状态:
      • 如果 existing.locked 为 true,说明锁已经被其他线程持有。
      • 当前线程将 existing.numWaiters 加一,表明自己加入等待队列。
      • 然后进入一个 while(existing.locked) 循环,并调用 existing.wait(),释放 synchronized 锁并进入等待状态,直到被唤醒。
    • 被唤醒后:
      • 当持有锁的线程释放锁并调用 notify() 后,等待的线程被唤醒。
      • 它会跳出 while(existing.locked) 循环。
      • 将 existing.numWaiters 减一。
      • 将 existing.locked 重新设为 true,并将 existing.holder 设为自己。
      • 最后返回 existing 对象,表示成功获取了锁。
  3. 处理“失效 Entry”的特殊情况:

    • 在 synchronized (existing) 块中,如果发现 existing.locked 为 false,这意味着什么?
    • 这说明在当前线程执行 putIfAbsent 和 synchronized(existing) 之间,持有该锁的线程已经释放了锁,并且因为没有等待者,它已经从 map 中删除了这个 Entry
    • 此时 existing 对象已经是一个“失效”的引用。我们不能直接返回它。
    • 代码会直接结束 synchronized 块,while 循环会继续,再次尝试 putIfAbsent,相当于重新开始获取锁的流程。

释放锁: releaseLockEntry(Entry entry)

释放锁的逻辑相对简单,但同样关键。

// ... existing code ...
  public void releaseLockEntry(Entry entry) {
    Thread currentThread = Thread.currentThread();
    synchronized (entry) {
      if (entry.holder != currentThread) {
        LOG.warn("{} is trying to release lock entry {}, but it is not the holder.", currentThread,
          entry);
      }
      entry.locked = false;
      if (entry.numWaiters > 0) {
        entry.notify();
      } else {
        map.remove(entry.id);
      }
    }
  }
// ... existing code ...
  1. 加锁 entry: 同样,对 entry 对象加 synchronized 锁,保证释放操作的原子性。
  2. 设置状态: 将 entry.locked 设为 false
  3. 唤醒或销毁:
    • 如果 entry.numWaiters > 0: 说明有其他线程正在等待这个锁。此时调用 entry.notify() 来唤醒一个等待的线程。注意这里是 notify() 而不是 notifyAll(),因为一次只有一个线程能获取锁。Entry 对象会继续保留在 map 中。
    • 如果 entry.numWaiters == 0: 说明没有线程在等待。这意味着这个 Entry 对象已经完成了它的历史使命。此时调用 map.remove(entry.id) 将其从 ConcurrentHashMap 中彻底删除,回收内存。这就是“用后即焚”思想的体现。

总结

IdLock 是一个非常典型的、为特定场景高度优化的并发工具。

  • 优点:

    • 内存效率极高: 只为当前正在被锁定或等待的 ID 维护锁对象,而不是为所有可能的 ID 都维护一个锁。这使得它可以用极小的内存代价管理海量 ID 的锁。
    • 性能好: 在无竞争或低竞争场景下,通过一次 putIfAbsent 原子操作即可完成加锁,性能非常高。
    • 避免死锁: 它的加锁/解锁模式简单清晰,不会产生复杂的锁依赖,从而避免了死锁问题。
  • 适用场景:

    • 需要对大量、稀疏的数字 ID 进行加锁的场景。
    • 锁的持有时间通常较短。
    • 典型的例子就是 HBase 中对 HFile Block Offset 的加锁,以防止缓存风暴。

通过 ConcurrentHashMap 的原子操作和 synchronized + wait/notify 机制的巧妙结合,IdLock 实现了一个轻量级、高性能、低内存占用的 ID 锁服务,是 HBase 中一个非常值得学习的并发编程范例。


网站公告

今日签到

点亮在社区的每一天
去签到