一、资源争用的现实镜像
当多个ATM机共用一个现金库时,出纳员们需要:
检查库门状态(锁状态检测)
挂上"使用中"标牌(acquire)
完成现金交接(临界区操作)
取下标牌(release)
import threading
cash_vault = 1000000
vault_lock = threading.Lock()
def withdraw(amount):
global cash_vault
with vault_lock: # 自动管理锁周期
if cash_vault >= amount:
cash_vault -= amount
return True
return False
二、锁机制的进化图谱
2.1 互斥锁的局限性
传统Lock在复杂场景暴露出问题:
嵌套调用导致死锁
无法区分读写操作
长时间阻塞影响系统响应
2.2 读写锁(RWLock)解决方案
from threading import RLock
class Account:
def __init__(self):
self._balance = 0
self._lock = RLock()
def transfer(self, amount):
with self._lock: # 可重入锁
self._balance += amount
def audit(self):
with self._lock: # 读操作同样保护
return self._balance
2.3 条件变量实现精准唤醒
class BoundedBuffer:
def __init__(self, capacity):
self.capacity = capacity
self.queue = []
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def put(self, item):
with self.not_full:
while len(self.queue) >= self.capacity:
self.not_full.wait()
self.queue.append(item)
self.not_empty.notify()
def get(self):
with self.not_empty:
while not self.queue:
self.not_empty.wait()
item = self.queue.pop(0)
self.not_full.notify()
return item
三、消息队列的异步革命
3.1 生产者-消费者模式重构
对比传统锁方案与队列方案:
维度锁方案队列方案耦合度高(直接竞争)低(缓冲区解耦)吞吐量依赖锁粒度依赖队列深度错误隔离容易连锁崩溃失败消息可重试
3.2 Python队列实现
import queue
import random
task_queue = queue.Queue(maxsize=5)
def producer():
while True:
item = random.randint(1,100)
task_queue.put(item) # 自动阻塞直到有空位
print(f"生产: {item}")
def consumer():
while True:
item = task_queue.get() # 自动阻塞直到有数据
print(f"消费: {item}")
task_queue.task_done()
# 启动线程
threading.Thread(target=producer, daemon=True).start()
threading.Thread(target=consumer, daemon=True).start()
四、分布式环境下的进阶方案
4.1 Redis实现分布式锁
import redis
from contextlib import contextmanager
redis_cli = redis.Redis()
@contextmanager
def dist_lock(lock_name, timeout=10):
identifier = str(uuid.uuid4())
# 获取锁
if redis_cli.setnx(lock_name, identifier):
redis_cli.expire(lock_name, timeout)
try:
yield
finally:
# Lua脚本保证原子性
script = """
if redis.call('get',KEYS[1]) == ARGV[1] then
return redis.call('del',KEYS[1])
else
return 0
end"""
redis_cli.eval(script, 1, lock_name, identifier)
else:
raise Exception("获取锁失败")
4.2 Kafka式消息队列
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('my_topic',
group_id='my_group',
bootstrap_servers='localhost:9092')
# 生产消息
producer.send('my_topic', b'raw_bytes')
# 消费消息
for msg in consumer:
print(f"收到: {msg.value}")
五、性能调优实战
5.1 锁竞争热点检测
import threading
import time
class ProfiledLock:
def __init__(self):
self._lock = threading.Lock()
self.wait_stats = []
def acquire(self):
start = time.monotonic()
self._lock.acquire()
wait_time = time.monotonic() - start
self.wait_stats.append(wait_time)
return wait_time
def release(self):
self._lock.release()
def stats(self):
return {
'max': max(self.wait_stats),
'avg': sum(self.wait_stats)/len(self.wait_stats)
}
5.2 队列水位监控
class MonitoredQueue(queue.Queue):
def __init__(self, maxsize=0):
super().__init__(maxsize)
self.put_history = []
self.get_history = []
def put(self, item, block=True, timeout=None):
super().put(item, block, timeout)
self.put_history.append((time.time(), self.qsize()))
def get(self, block=True, timeout=None):
item = super().get(block, timeout)
self.get_history.append((time.time(), self.qsize()))
return item
def plot_usage(self):
import matplotlib.pyplot as plt
plt.plot([t[0] for t in self.put_history],
[t[1] for t in self.put_history], label='puts')
plt.plot([t[0] for t in self.get_history],
[t[1] for t in self.get_history], label='gets')
plt.legend()
plt.show()