
🌟 大家好,我是摘星! 🌟
今天为大家带来的是并发设计模式实战系列,第十九章监视器(Monitor),废话不多说直接开始~
目录
一、核心原理深度拆解
1. 监视器三要素模型
┌──────────────────────┐
│      Monitor Object   │
│  ┌─────────────────┐  │
│  │  Shared Data     │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Sync Methods    │  │
│  │ (Entry Queue)    │  │
│  └────────┬─────────┘  │
│           │            │
│  ┌────────▼─────────┐  │
│  │  Wait Conditions │  │
│  │ (Condition Queue)│  │
│  └─────────────────┘  │
└──────────────────────┘- 共享数据:被保护的临界资源(如计数器、连接池)
- 同步方法:互斥访问入口(Java的synchronized方法/块)
- 条件变量:线程协作机制(Object.wait()/notify())
2. 线程调度机制
- Entry Set:竞争锁的线程队列(JVM管理)
- Wait Set:调用wait()的线程等待区
- 优先级控制:非公平锁(默认)vs 公平锁(按入队顺序)
二、生活化类比:银行柜台服务系统
| 监视器组件 | 银行类比 | 运行机制 | 
| 共享数据 | 柜台现金 | 所有柜员共享同一保险箱 | 
| 同步方法 | 柜台窗口 | 每次仅允许一个柜员操作现金 | 
| 条件变量 | 客户等待区 | 现金不足时柜员进入等待状态 | 
| Entry Set | 排队叫号机 | 客户按顺序获取服务资格 | 
| Wait Set | VIP休息室 | 特殊需求客户暂时离开主队列 | 
- 异常处理:柜员突发离职(线程中断)→ 系统自动唤醒下个柜员
三、Java代码实现(生产级Demo)
1. 完整可运行代码
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class MonitorPatternDemo {
    // 共享资源:有限容量队列
    private final LinkedList<String> messageQueue = new LinkedList<>();
    private final int MAX_CAPACITY = 10;
    // 显式锁(比synchronized更灵活)
    private final ReentrantLock lock = new ReentrantLock(true); // 公平锁
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    // 监视器方法:生产消息
    public void produce(String message) throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.size() == MAX_CAPACITY) {
                System.out.println("[Producer] 队列已满,等待消费...");
                notFull.await(); // 释放锁并进入等待
            }
            messageQueue.addLast(message);
            System.out.println("[Producer] 添加消息: " + message + " | 队列大小: " + messageQueue.size());
            notEmpty.signal(); // 唤醒等待的消费者
        } finally {
            lock.unlock();
        }
    }
    // 监视器方法:消费消息
    public String consume() throws InterruptedException {
        lock.lock();
        try {
            while (messageQueue.isEmpty()) {
                System.out.println("[Consumer] 队列为空,等待生产...");
                notEmpty.await(); // 释放锁并进入等待
            }
            String message = messageQueue.removeFirst();
            System.out.println("[Consumer] 处理消息: " + message + " | 剩余: " + messageQueue.size());
            notFull.signal(); // 唤醒等待的生产者
            return message;
        } finally {
            lock.unlock();
        }
    }
    // 监控线程
    public void startMonitorThread() {
        new Thread(() -> {
            while (true) {
                try {
                    lock.lock();
                    try {
                        System.out.println("[Monitor] === 当前状态 ===");
                        System.out.println("队列大小: " + messageQueue.size());
                        System.out.println("等待生产者: " + lock.getWaitQueueLength(notFull));
                        System.out.println("等待消费者: " + lock.getWaitQueueLength(notEmpty));
                        System.out.println("=== === === === ===");
                    } finally {
                        lock.unlock();
                    }
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Monitor-Thread").start();
    }
    public static void main(String[] args) {
        MonitorPatternDemo monitor = new MonitorPatternDemo();
        monitor.startMonitorThread();
        // 模拟生产者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.produce("Msg-" + System.currentTimeMillis());
                        TimeUnit.MILLISECONDS.sleep(500);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i).start();
        }
        // 模拟消费者
        for (int i = 0; i < 2; i++) {
            new Thread(() -> {
                try {
                    while (!Thread.currentThread().isInterrupted()) {
                        monitor.consume();
                        TimeUnit.SECONDS.sleep(1);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i).start();
        }
    }
}2. 关键配置说明
// 锁类型选择
new ReentrantLock(true); // 公平锁(按入队顺序获取锁)
new ReentrantLock();     // 非公平锁(默认,吞吐量更高)
// 条件变量分离
private final Condition notFull = lock.newCondition();  // 队列未满条件
private final Condition notEmpty = lock.newCondition(); // 队列非空条件
// 监控接口
lock.getWaitQueueLength(condition); // 获取等待特定条件的线程数四、横向对比表格
1. 线程同步机制对比
| 机制 | 互斥能力 | 条件等待 | 可中断 | 公平性 | 适用场景 | 
| synchronized | 有 | 单一条件 | 不可 | 非公平 | 简单同步场景 | 
| ReentrantLock | 有 | 多条件 | 可中断 | 可配置 | 复杂同步逻辑 | 
| Semaphore | 无 | 无 | 可中断 | 可配置 | 资源池控制 | 
| ReadWriteLock | 有 | 多条件 | 可中断 | 可配置 | 读多写少场景 | 
2. 条件变量实现对比
| 实现方式 | 通知精度 | 批量唤醒 | 超时支持 | 使用复杂度 | 
| Object.wait() | 全部 | 是 | 有 | 低 | 
| Condition.await() | 指定条件 | 否 | 有 | 中 | 
| BlockingQueue | 内置 | 自动 | 有 | 低 | 
五、高级优化技巧
1. 锁分段优化
// 降低锁粒度(如ConcurrentHashMap的分段锁思想)
private final ReentrantLock[] segmentLocks = new ReentrantLock[16];
{
    for (int i = 0; i < segmentLocks.length; i++) {
        segmentLocks[i] = new ReentrantLock();
    }
}
public void put(String key, String value) {
    int segment = Math.abs(key.hashCode() % segmentLocks.length);
    segmentLocks[segment].lock();
    try {
        // 操作对应分段的共享数据
    } finally {
        segmentLocks[segment].unlock();
    }
}2. 条件变量优化
// 使用带超时的等待(避免死锁)
if (!notEmpty.await(5, TimeUnit.SECONDS)) {
    throw new TimeoutException("等待消息超时");
}
// 使用signalAll()谨慎(可能引起"惊群效应")
notEmpty.signal(); // 优先使用精准通知3. 监控指标扩展
// 添加JMX监控(示例)
public class MonitorMetrics implements MonitorMetricsMBean {
    private final ReentrantLock lock;
    public int getWaitThreadCount() {
        return lock.getQueueLength(); // 获取等待锁的线程数
    }
    public int getActiveThreadCount() {
        return lock.getHoldCount(); // 获取锁重入次数
    }
}
// 注册MBean
ManagementFactory.getPlatformMBeanServer().registerMBean(
    new MonitorMetrics(lock), 
    new ObjectName("com.example:type=MonitorMetrics")
);4. 自适应锁优化
// 根据竞争情况动态切换锁类型
public class AdaptiveLock {
    private volatile boolean highContention = false;
    private final ReentrantLock fairLock = new ReentrantLock(true);
    private final ReentrantLock unfairLock = new ReentrantLock();
    public void lock() {
        if (highContention) {
            fairLock.lock(); // 高竞争时用公平锁
        } else {
            unfairLock.lock(); // 默认非公平锁
        }
    }
    // 监控线程竞争情况
    public void monitor() {
        new Thread(() -> {
            while (true) {
                int waiters = fairLock.getQueueLength() + unfairLock.getQueueLength();
                highContention = waiters > 5; // 阈值可配置
                try { Thread.sleep(1000); } catch (InterruptedException e) { break; }
            }
        }).start();
    }
}5. 无锁化改造方案
// 对读多写少场景使用原子变量
private final AtomicReference<Map<String, String>> cache = 
    new AtomicReference<>(new ConcurrentHashMap<>());
public void updateCache(String key, String value) {
    while (true) {
        Map<String, String> oldMap = cache.get();
        Map<String, String> newMap = new ConcurrentHashMap<>(oldMap);
        newMap.put(key, value);
        if (cache.compareAndSet(oldMap, newMap)) break;
    }
}六、异常处理与健壮性设计
1. 死锁检测与恢复
// 使用ThreadMXBean检测死锁
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
long[] threadIds = bean.findDeadlockedThreads();
if (threadIds != null) {
    ThreadInfo[] infos = bean.getThreadInfo(threadIds);
    for (ThreadInfo info : infos) {
        System.err.println("死锁线程: " + info.getThreadName());
        // 强制中断受害线程(生产环境需谨慎)
        Thread thread = findThreadById(info.getThreadId());
        if (thread != null) thread.interrupt();
    }
}2. 线程泄漏防护
// 封装安全的线程池
public class SafeExecutor extends ThreadPoolExecutor {
    private final ConcurrentMap<Worker, Boolean> workers = new ConcurrentHashMap<>();
    protected void beforeExecute(Thread t, Runnable r) {
        workers.put((Worker) t, true);
    }
    protected void afterExecute(Runnable r, Throwable t) {
        workers.remove(Thread.currentThread());
    }
    public List<Thread> getStuckThreads(long timeoutMs) {
        return workers.keySet().stream()
            .filter(w -> w.getActiveTime() > timeoutMs)
            .collect(Collectors.toList());
    }
}七、分布式环境扩展
1. 跨JVM的Monitor实现
// 基于Redis的分布式锁
public class DistributedMonitor {
    private final Jedis jedis;
    private final String lockKey;
    public boolean tryLock(long timeoutMs) {
        String result = jedis.set(lockKey, "locked", 
            "NX", "PX", timeoutMs);
        return "OK".equals(result);
    }
    public void unlock() {
        jedis.del(lockKey);
    }
    // 使用Redisson的看门狗机制实现续期
    public void startWatchdog() {
        new Thread(() -> {
            while (locked) {
                jedis.expire(lockKey, 30);
                try { Thread.sleep(10000); } 
                catch (InterruptedException e) { break; }
            }
        }).start();
    }
}2. 多节点协同方案
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Node 1    │    │   Node 2    │    │   Node 3    │
│ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │
│ │ Monitor │───ZK─▶│ Monitor │───ZK─▶│ Monitor │ │
│ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │
└─────────────┘    └─────────────┘    └─────────────┘- ZooKeeper协调:通过临时节点实现Leader选举
- 状态同步:使用Watcher机制通知条件变更
八、现代Java特性整合
1. 虚拟线程适配
// JDK21+ 虚拟线程优化
ExecutorService vThreadPool = Executors.newVirtualThreadPerTaskExecutor();
public void virtualThreadMonitor() {
    try (var executor = vThreadPool) {
        executor.submit(() -> {
            synchronized(this) { // 兼容传统synchronized
                while (conditionNotMet()) {
                    wait(); // 虚拟线程挂起时不占用OS线程
                }
                // 处理共享数据
            }
        });
    }
}2. Project Loom纤程支持
// 使用Fiber替代线程(实验性)
new Fiber<Void>(() -> {
    LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
    synchronized(lock) { // 百万级纤程共享Monitor
        // 业务逻辑
    }
}).start();九、监控指标体系建设
1. Prometheus监控集成
// 暴露锁竞争指标
Gauge contentionGauge = Gauge.build()
    .name("monitor_lock_contention")
    .help("Current lock waiters count")
    .register();
public void recordMetrics() {
    new ScheduledThreadPoolExecutor(1)
        .scheduleAtFixedRate(() -> {
            contentionGauge.set(lock.getQueueLength());
        }, 0, 5, TimeUnit.SECONDS);
}2. 关键监控看板
| 指标名称 | 计算方式 | 健康阈值 | 
| 锁等待时间 | 历史平均等待时间 | < 50ms | 
| 条件变量等待数 | notEmpty.getWaitQueueLength | < CPU核心数×2 | 
| 死锁检测次数 | ThreadMXBean统计 | = 0 | 
| 线程活跃度 | 活跃线程数/最大线程数 | 60%~80% | 
十、经典场景最佳实践
1. 数据库连接池实现
public class ConnectionPool {
    private final LinkedList<Connection> pool = new LinkedList<>();
    private final int maxSize;
    private final Object monitor = new Object();
    public Connection borrow() throws InterruptedException {
        synchronized (monitor) {
            while (pool.isEmpty()) {
                monitor.wait();
            }
            return pool.removeFirst();
        }
    }
    public void release(Connection conn) {
        synchronized (monitor) {
            pool.addLast(conn);
            monitor.notify();
        }
    }
}2. 生产者-消费者增强版
// 支持优先级和批量处理
public class EnhancedBlockingQueue {
    private final PriorityBlockingQueue<Item> queue;
    private final Semaphore available;
    public void putBatch(List<Item> items) {
        queue.addAll(items);
        available.release(items.size());
    }
    public Item take() throws InterruptedException {
        available.acquire();
        return queue.poll();
    }
}