互斥锁与消息队列的架构哲学

发布于:2025-06-05 ⋅ 阅读:(14) ⋅ 点赞:(0)

一、资源争用的现实镜像

当多个ATM机共用一个现金库时,出纳员们需要:

  1. 检查库门状态(锁状态检测)

  2. 挂上"使用中"标牌(acquire)

  3. 完成现金交接(临界区操作)

  4. 取下标牌(release)

import threading

cash_vault = 1000000
vault_lock = threading.Lock()

def withdraw(amount):
    global cash_vault
    with vault_lock:  # 自动管理锁周期
        if cash_vault >= amount:
            cash_vault -= amount
            return True
        return False

二、锁机制的进化图谱

2.1 互斥锁的局限性

传统Lock在复杂场景暴露出问题:

  • 嵌套调用导致死锁

  • 无法区分读写操作

  • 长时间阻塞影响系统响应

2.2 读写锁(RWLock)解决方案

from threading import RLock

class Account:
    def __init__(self):
        self._balance = 0
        self._lock = RLock()
    
    def transfer(self, amount):
        with self._lock:  # 可重入锁
            self._balance += amount
    
    def audit(self):
        with self._lock:  # 读操作同样保护
            return self._balance

2.3 条件变量实现精准唤醒

class BoundedBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.queue = []
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)
    
    def put(self, item):
        with self.not_full:
            while len(self.queue) >= self.capacity:
                self.not_full.wait()
            self.queue.append(item)
            self.not_empty.notify()
    
    def get(self):
        with self.not_empty:
            while not self.queue:
                self.not_empty.wait()
            item = self.queue.pop(0)
            self.not_full.notify()
            return item

三、消息队列的异步革命

3.1 生产者-消费者模式重构

对比传统锁方案与队列方案:

维度锁方案队列方案耦合度高(直接竞争)低(缓冲区解耦)吞吐量依赖锁粒度依赖队列深度错误隔离容易连锁崩溃失败消息可重试

3.2 Python队列实现

import queue
import random

task_queue = queue.Queue(maxsize=5)

def producer():
    while True:
        item = random.randint(1,100)
        task_queue.put(item)  # 自动阻塞直到有空位
        print(f"生产: {item}")

def consumer():
    while True:
        item = task_queue.get()  # 自动阻塞直到有数据
        print(f"消费: {item}")
        task_queue.task_done()

# 启动线程
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()

四、分布式环境下的进阶方案

4.1 Redis实现分布式锁

import redis
from contextlib import contextmanager

redis_cli = redis.Redis()

@contextmanager
def dist_lock(lock_name, timeout=10):
    identifier = str(uuid.uuid4())
    # 获取锁
    if redis_cli.setnx(lock_name, identifier):
        redis_cli.expire(lock_name, timeout)
        try:
            yield
        finally:
            # Lua脚本保证原子性
            script = """
            if redis.call('get',KEYS[1]) == ARGV[1] then
                return redis.call('del',KEYS[1])
            else
                return 0
            end"""
            redis_cli.eval(script, 1, lock_name, identifier)
    else:
        raise Exception("获取锁失败")

4.2 Kafka式消息队列

from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('my_topic',
                         group_id='my_group',
                         bootstrap_servers='localhost:9092')

# 生产消息
producer.send('my_topic', b'raw_bytes')  

# 消费消息
for msg in consumer:
    print(f"收到: {msg.value}")

五、性能调优实战

5.1 锁竞争热点检测

import threading
import time

class ProfiledLock:
    def __init__(self):
        self._lock = threading.Lock()
        self.wait_stats = []
    
    def acquire(self):
        start = time.monotonic()
        self._lock.acquire()
        wait_time = time.monotonic() - start
        self.wait_stats.append(wait_time)
        return wait_time
    
    def release(self):
        self._lock.release()
    
    def stats(self):
        return {
            'max': max(self.wait_stats),
            'avg': sum(self.wait_stats)/len(self.wait_stats)
        }

5.2 队列水位监控

class MonitoredQueue(queue.Queue):
    def __init__(self, maxsize=0):
        super().__init__(maxsize)
        self.put_history = []
        self.get_history = []
    
    def put(self, item, block=True, timeout=None):
        super().put(item, block, timeout)
        self.put_history.append((time.time(), self.qsize()))
    
    def get(self, block=True, timeout=None):
        item = super().get(block, timeout)
        self.get_history.append((time.time(), self.qsize()))
        return item
    
    def plot_usage(self):
        import matplotlib.pyplot as plt
        plt.plot([t[0] for t in self.put_history], 
                [t[1] for t in self.put_history], label='puts')
        plt.plot([t[0] for t in self.get_history],
                [t[1] for t in self.get_history], label='gets')
        plt.legend()
        plt.show()

网站公告

今日签到

点亮在社区的每一天
去签到