并发编程-设计模式

发布于:2025-09-07 ⋅ 阅读:(21) ⋅ 点赞:(0)

同步模式之保护性暂停

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果

要点

有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject

如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)

JDK 中,join 的实现、Future 的实现,采用的就是此模式

因为要等待另一方的结果,因此归类到同步模式

实现GuardedObject

GuardedObject 类在 Java 并发编程中用于实现同步模式之保护性暂停(Guarded Suspension)。其主要作用是在两个线程之间传递结果,确保线程安全。具体来说,GuardedObject 类通过关联同一个锁对象,实现了一个线程等待另一个线程的执行结果的功能。

  1. 基本结构

    • GuardedObject 类包含一个私有对象 response,用于存储结果。

    • 使用一个锁对象 lock 来控制对 response 的访问,确保线程安全。

  2. 获取结果的方法 get()

    • 该方法通过 synchronized 块来获取锁。

    • responsenull 时,调用 lock.wait() 使当前线程进入等待状态,直到其他线程调用 notifyAll() 唤醒它。

    • response 不为 null 时,返回 response

    • 用来获取response 的值

  3. 产生结果的方法 complete(Object response)

    • 该方法同样通过 synchronized 块来获取锁。

    • 设置 response 的值,并调用 lock.notifyAll() 唤醒所有等待的线程。

package org.example.designPattern.GuardedSuspension;

/**
 * @author 2405993739
 * @description: 同步模式之保护性暂停
 * @date 2025/6/17 17:54
 */
public class GuardedObject {


    private Object response;
    private Object lock = new Object();

    public Object get() throws InterruptedException {

        synchronized (lock) {
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

        return response;
    }

    public void complete(Object response) {
        synchronized (lock) {
            this.response = response;
            lock.notifyAll();
        }
    }

}

应用

doSomethingInManyTime用来模拟耗时任务

package org.example.designPattern.GuardedSuspension;

/**
 * @author 2405993739
 * @description: 工具类
 * @date 2025/6/17 18:16
 */
public class Tool {

    // 模拟耗时操作
    public static String doSomethingInManyTime(){
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "cjx是帅哥";
    }
}

测试:

package org.example.designPattern.GuardedSuspension;

import lombok.extern.slf4j.Slf4j;

/**
 * @author 2405993739
 * @description: 测试类
 * @date 2025/6/17 18:18
 */
@Slf4j(topic = "test")
public class test {

    public static void main(String[] args) throws InterruptedException {
        log.info("开始测试");

        GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
            log.info("开始执行耗时任务");
            String response = Tool.doSomethingInManyTime();
            guardedObject.complete(response);
            log.info("结束执行耗时任务");
        }, "t1").start();

        log.info("等待耗时任务结果");
        String response = (String) guardedObject.get();
        log.info("结果是:{}", response);
    }
}

结果:

拓展-超时等待

实现
看仔细,是怎么实现的。

package org.example.designPattern.GuardedSuspension;


import lombok.extern.slf4j.Slf4j;

/**
 * @author 2405993739
 * @description: 同步模式之保护性暂停
 * @date 2025/6/17 17:54
 */
@Slf4j
public class GuardedObject {

    private Object lock = new Object();
    private Object response;

    public Object get(int timeout){

        synchronized (lock) {
            // 1) 开始时间
            long beginTime = System.currentTimeMillis();
            // 2) 经过的时间
            long timePassed = 0;

            while (response == null) {

                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = timeout - timePassed;
                log.debug("waitTime: {}", waitTime);

                if (waitTime <= 0) {
                    log.debug("break...");
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - beginTime;
            }

        }

        return response;
    }

    public void complete(Object response) {
        synchronized (lock) {
            this.response = response;
            lock.notifyAll();
        }
    }


}

测试

测试了三种情况

package org.example.designPattern.GuardedSuspension;

import lombok.extern.slf4j.Slf4j;

/**
 * @author 2405993739
 * @description: 测试类
 * @date 2025/6/17 18:18
 */
@Slf4j(topic = "test")
public class test {

    public static void main(String[] args) throws InterruptedException {

        testSuccessfulResponse();
        testTimeout();
        testMultipleWaiters();


    }
    // 场景1:在超时时间内收到响应
    private static void testSuccessfulResponse() {
        System.out.println("=== Test 1: Successful Response ===");
        GuardedObject guarded = new GuardedObject();

        // 启动等待线程,超时设置为3000ms
        Thread waiter = new Thread(() -> {
            Object resp = guarded.get(3000);
            log.info("[Waiter] Received: " + resp);
        });
        waiter.start();

        // 1秒后完成响应
        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ignored) {}
            guarded.complete("Hello World");
            log.info("[Producer] Sent: Hello World");
        }).start();

        join(waiter);
    }

    // 场景2:未在超时内收到响应
    private static void testTimeout() {
        System.out.println("=== Test 2: Timeout ===");
        GuardedObject guarded = new GuardedObject();

        Thread waiter = new Thread(() -> {
            Object resp = guarded.get(1000);
            log.info("[Waiter] Received: " + resp);
        });
        waiter.start();

        // 不发送响应,直接等待超时
        join(waiter);
    }

    // 场景3:多个等待线程
    private static void testMultipleWaiters() {
        System.out.println("=== Test 3: Multiple Waiters ===");
        GuardedObject guarded = new GuardedObject();

        for (int i = 1; i <= 3; i++) {
            final int id = i;
            new Thread(() -> {
                Object resp = guarded.get(5000);
                log.info("[Waiter {}] Received: {}", id, resp);
            }).start();
        }

        // 2 秒后发送响应,使所有等待者被唤醒
        new Thread(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException ignored) {}
            guarded.complete("Broadcast Message");
            log.info("[Producer] Sent: Broadcast Message");
        }).start();

        // 睡 6 秒,确保所有线程结束
        try {
            Thread.sleep(6000);
        } catch (InterruptedException ignored) {
            ignored.printStackTrace();
        }
    }

    private static void join(Thread t) {
        try {
            t.join();
        } catch (InterruptedException ignored) {
            ignored.printStackTrace();
        }
    }

 
}

结果:

join源码分析-JDK17

join其实也是用保护性暂停,不过由原来的,get方法等待资源,变为等待线程暂停。我们来看看

看他实现

    public final synchronized void join(final long millis)
    throws InterruptedException {
        if (millis > 0) {
            if (isAlive()) {
                final long startTime = System.nanoTime();
                long delay = millis;
                do {
                    wait(delay);
                } while (isAlive() && (delay = millis -
                        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
            }
        } else if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            throw new IllegalArgumentException("timeout value is negative");
        }

public final synchronized void join(final long millis) 方法的主要目的是让调用该方法的线程等待指定的时间,直到被调用线程(即 this 线程)执行完毕。该方法的实现思路可以分为以下几个步骤:

  1. 参数检查

    • 如果 millis 小于 0,则抛出 IllegalArgumentException 异常,因为等待时间不能为负数。

    • 如果 millis 等于 0,则表示无限期等待,直到被调用线程结束。

    • 如果 millis 大于 0,则表示等待指定的时间。

  2. 无限期等待

    • millis 等于 0 时,使用一个 while 循环不断检查被调用线程是否仍然存活通过 isAlive() 方法)。

    • 如果被调用线程仍然存活,则调用 wait(0) 方法让当前线程进入等待状态,直到被调用线程结束并调用 notifyAll() 方法唤醒当前线程。

  3. 有限期等待

    • millis 大于 0 时,首先记录当前时间 startTime

    • 使用一个 do-while 循环不断检查被调用线程是否仍然存活。

    • 如果被调用线程仍然存活,则计算剩余的等待时间 delay,并调用 wait(delay) 方法让当前线程进入等待状态。

    • 在每次循环中,重新计算剩余的等待时间 delay,直到被调用线程结束或等待时间耗尽。


网站公告

今日签到

点亮在社区的每一天
去签到