设计模式——保护性暂停

发布于:2024-04-30 ⋅ 阅读:(39) ⋅ 点赞:(0)

同步模式之保护性暂停

定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  • 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 因为要等待另一方的结果,因此归类到同步模式
    在这里插入图片描述

实现

class GuardedObject {
	private Object response;
 	private final Object lock = new Object();
 	public Object get() {
	 	synchronized (lock) {
		 // 条件不满足则等待
		while (response == null) {
	 		try {
	 			lock.wait();
			 } catch (InterruptedException e) {
	 			e.printStackTrace();
	 		}
	 	}
	 	return response;
 	}
  }
 public void complete(Object response) {
	 	synchronized (lock) {
		 // 条件满足,通知等待线程
		this.response = response;
	 	lock.notifyAll();
	 	}
    }
 }

应用

一个线程等待另一个线程的执行结果

public static void main(String[] args) {
 GuardedObject guardedObject = new GuardedObject();
 new Thread(() -> {
 try {
 // 子线程执行下载
List<String> response = download();
 log.debug("download complete...");
 guardedObject.complete(response);
 } catch (IOException e) {
 e.printStackTrace();
 }
    }).start();
 }
 log.debug("waiting...");
 // 主线程阻塞等待
Object response = guardedObject.get();
 log.debug("get response: [{}] lines", ((List<String>) response).size());

执行结果

08:42:18.568 [main] c.TestGuardedObject - waiting...
 08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
 08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

带超时版 GuardedObject

如果要控制超时时间呢

class GuardedObjectV2 {
 private Object response;
 private final Object lock = new Object();
 public Object get(long millis) {
 synchronized (lock) {
 // 1) 记录最初时间
long begin = System.currentTimeMillis();
 // 2) 已经经历的时间
long timePassed = 0;
 while (response == null) {
 // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
 log.debug("waitTime: {}", waitTime);
 if (waitTime <= 0) {
 log.debug("break...");
 break;
 }
 try {
 lock.wait(waitTime);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
 timePassed = System.currentTimeMillis() - begin;
 log.debug("timePassed: {}, object is null {}", 
timePassed, response == null);
 }
 return response;
 }
    }
 public void complete(Object response) {
 synchronized (lock) {
 // 条件满足,通知等待线程
this.response = response;
 log.debug("notify...");
 lock.notifyAll();
 }
    }
 }

测试,没有超时

public static void main(String[] args) {
 GuardedObjectV2 v2 = new GuardedObjectV2();
 new Thread(() -> {
 sleep(1);
 v2.complete(null);
 sleep(1);
 v2.complete(Arrays.asList("a", "b", "c"));
    }).start();
 Object response = v2.get(2500);
 if (response != null) {
 log.debug("get response: [{}] lines", ((List<String>) response).size());
  } 
    }
 }
else {
 log.debug("can't get response");

输出

08:49:39.917 [main] c.GuardedObjectV2 - waitTime: 2500
 08:49:40.917 [Thread-0] c.GuardedObjectV2 - notify...
 08:49:40.917 [main] c.GuardedObjectV2 - timePassed: 1003, object is null true
 08:49:40.917 [main] c.GuardedObjectV2 - waitTime: 1497
 08:49:41.918 [Thread-0] c.GuardedObjectV2 - notify...
 08:49:41.918 [main] c.GuardedObjectV2 - timePassed: 2004, object is null false
 08:49:41.918 [main] c.TestGuardedObjectV2 - get response: [3] lines

测试,超时

// 等待时间不足
List<String> lines = v2.get(1500);

输出

08:47:54.963 [main] c.GuardedObjectV2 - waitTime: 1500
 08:47:55.963 [Thread-0] c.GuardedObjectV2 - notify...
 08:47:55.963 [main] c.GuardedObjectV2 - timePassed: 1002, object is null true
 08:47:55.963 [main] c.GuardedObjectV2 - waitTime: 498
 08:47:56.461 [main] c.GuardedObjectV2 - timePassed: 1500, object is null true
 08:47:56.461 [main] c.GuardedObjectV2 - waitTime: 0
 08:47:56.461 [main] c.GuardedObjectV2 - break...
 08:47:56.461 [main] c.TestGuardedObjectV2 - can't get response
 08:47:56.963 [Thread-0] c.GuardedObjectV2 - notify...

扩展——原理之join

join的底层就是使用保护性暂停的设计模式

扩展——多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。

在这里插入图片描述
中间用来解耦的类要维护一个集合,同时为了把多个guardedObject区分开,要加上id。

新增 id 用来标识 Guarded Object

 class GuardedObject {
 // 标识 Guarded Object
 private int id;
 public GuardedObject(int id) {
 this.id = id;
    }
 public int getId() {
 return id;
    }
 // 结果
private Object response;
 // 获取结果
// timeout 表示要等待多久 2000
 public Object get(long timeout) {
 synchronized (this) {
 // 开始时间 15:00:00
 long begin = System.currentTimeMillis();
 // 经历的时间
long passedTime = 0;
 while (response == null) {
 // 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
 // 经历的时间超过了最大等待时间时,退出循环
if (timeout - passedTime <= 0) {
 break;
                }
 try {
 this.wait(waitTime); // 虚假唤醒 15:00:01
                } 
catch (InterruptedException e) {
 e.printStackTrace();
  }
 // 求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02  1s
            }
 return response;
        }
    }
 // 产生结果
public void complete(Object response) {
 synchronized (this) {
 // 给结果成员变量赋值
this.response = response;
 this.notifyAll();
        }
    }
 }

中间解耦类

class Mailboxes {
 private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
 private static int id = 1;
 // 产生唯一 id
 private static synchronized int generateId() {
 return id++;
    }
 public static GuardedObject getGuardedObject(int id) {
 return boxes.remove(id);
    }
 }
 public static GuardedObject createGuardedObject() {
 GuardedObject go = new GuardedObject(generateId());
 boxes.put(go.getId(), go);
 return go;
    }
 public static Set<Integer> getIds() {
 return boxes.keySet();
    }

业务相关类

class People extends Thread{
 @Override
 public void run() {
 // 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
 log.debug("开始收信 id:{}", guardedObject.getId());
 Object mail = guardedObject.get(5000);
 log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
 }
 class Postman extends Thread {
 private int id;
 private String mail;
 public Postman(int id, String mail) {
 this.id = id;
 this.mail = mail;
    }
 }
 @Override
 public void run() {
 GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
 log.debug("送信 id:{}, 内容:{}", id, mail);
 guardedObject.complete(mail);
    }

测试

public static void main(String[] args) throws InterruptedException {
 for (int i = 0; i < 3; i++) {
 new People().start();
    }
 Sleeper.sleep(1);
 for (Integer id : Mailboxes.getIds()) {
 new Postman(id, "内容" + id).start();
    }
 }

某次运行结果

10:35:05.689 c.People [Thread-1] - 开始收信 id:3
 10:35:05.689 c.People [Thread-2] - 开始收信 id:1
 10:35:05.689 c.People [Thread-0] - 开始收信 id:2
 10:35:06.688 c.Postman [Thread-4] - 送信 id:2, 内容:内容2
 10:35:06.688 c.Postman [Thread-5] - 送信 id:1, 内容:内容1
 10:35:06.688 c.People [Thread-0] - 收到信 id:2, 内容:内容2
 10:35:06.688 c.People [Thread-2] - 收到信 id:1, 内容:内容1
 10:35:06.688 c.Postman [Thread-3] - 送信 id:3, 内容:内容3
 10:35:06.689 c.People [Thread-1] - 收到信 id:3, 内容:内容3

网站公告

今日签到

点亮在社区的每一天
去签到