【Golang】Go语言编程思想(六):Channel,第二节,使用Channel等待Goroutine结束

发布于:2024-12-18 ⋅ 阅读:(43) ⋅ 点赞:(0)

使用 Channel 等待任务结束

首先回顾上一节 channel 这一概念介绍时所写的代码:

package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
	}
}

func createWorker(id int) chan<- int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func chanDemo() {
	var channels [10]chan<- int
	for i := 0; i < 10; i++ {
		channels[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}

	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}

	time.Sleep(time.Millisecond)
}

func main() {
	chanDemo()
}

👆 上述代码的流程如下:

首先在 main 函数中找到入口,即 chanDemo。在 chanDemo 中我们使用 var 新建了一个存储 10 个接收消息的 chan int 类型的数组,并定义了 createWorker 方法来对每一个 chan<- int类型对象进行构造。(值得注意的是,createWorker 方法是一个返回值是chan<- int的函数)

在 createWorker 中,使用 go worker(id, c) 来启动协程,c 既作为返回值返回,也会作为 worker 函数的参数在启动的协程中被使用。

此时,程序有多个流程在并行执行(并发),分别是 10 个 worker 和一个 chanDemo。我们在 chanDemo 中向 chan<- int 类型的通道发送消息,当 worker 当中的 c 有消息接收到的时候,会将消息打印出来。

现在我们希望对上述代码进行进一步的优化,在 worker 函数当中,当 n 打印完毕时,我们希望通知外部,此处的输出已经完成了。过去的做法可能会通过共享内存来进行通信,但是在 Golang 当中我们可以使用 channel 来实现上述需求,即:直接通过通信来共享内存。

我们对 worker 函数进行改进,添加一个名为 done 的 chan bool 类型,用来告知外部当前是否但因完毕:

func worker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		done <- true
	}
}

当然,在 chanDemo 函数中,需要有一个变量来接受 done 这个 chan bool 的值。

为此,我们新建一个结构,命名为 worker,并将上面同名的函数修改为 goWorker。worker 的定义如下:

type worker struct {
	in   chan int
	done chan bool
}

相应地对 createWorker 方法进行修改:

func createWorker(id int) worker {
	w := worker{				// 使用花括号显式地对 worker 进行构造
		in:   make(chan int),
		done: make(chan bool),
	}
	go doWorker(id, w.in, w.done)
	return w
}

函数 doWorker:

func doWorker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		done <- true	// 当打印执行结束时, 将 true 输入到 chan bool 的 done 当中
	}
}

最后的 chanDemo 函数定义如下:

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'a' + i
		<-workers[i].done				// 不用使用变量来对值进行接受
		// 直接显式地将值写出来, 即可在函数结束之前完成 chan int 的值的打印
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'A' + i
		<-workers[i].done
	}

}

完整的代码如下:

package main

import (
	"fmt"
)

func doWorker(id int, c chan int, done chan bool) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		done <- true
	}
}

type worker struct {
	in   chan int
	done chan bool
}

func createWorker(id int) worker {
	w := worker{
		in:   make(chan int),
		done: make(chan bool),
	}
	go doWorker(id, w.in, w.done)
	return w
}

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'a' + i
		<-workers[i].done
	}

	for i := 0; i < 10; i++ {
		workers[i].in <- 'A' + i
		<-workers[i].done
	}

}

func main() {
	chanDemo()
}

得到的输出如下:
在这里插入图片描述
一个问题在于,上述输出是顺序执行的,失去了并行的意义。

我们想要做的是,一次性将想要打印的 20 个信息发送给 10 个通道,发送所有信息之后再等待所有通道的打印结束。

一个可能的改进如下:

func chanDemo() {
	var workers [10]worker
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i)
	}

	for i, worker := range workers {
		worker.in <- 'a' + i			// 向 channel 发送数据
	}

	for _, worker := range workers {
		<-worker.done					// 首先接受数据, 再发送数据
	}

	for i, worker := range workers {
		worker.in <- 'A' + i
	}

	for _, worker := range workers {
		<-worker.done
	}

}

此时得到的结果是:
在这里插入图片描述

使用 WaitGroup

除了显式地新建一个名为 done 的 chan bool 之外,还可以使用一个名为 WaitGroup 对象来完成等操作,它定义在头文件 sync 当中。

WaitGroup 有三个主要的方法,分别是 Add、Done 和 Wait。Add 负责告知 WaitGroup 当前还有多少个任务要完成(对于我们上面的例子,新建 10 个通道,对每个通道传入两个值,则有 20 个任务);Done 在 worker 内部打印结束后执行,告知外部任务已经完成;Wait 在外部执行,表示等待协程运行完毕之后再执行下一条语句,在我们的示例中,没有下一条语句,则函数返回。

完整的示例如下:

package main

import (
	"fmt"
	"sync"
)

func doWorker(id int, c chan int, wg *sync.WaitGroup) {
	for n := range c {
		fmt.Printf("Worker %d received %c\n",
			id, n)
		wg.Done()
	}
}

type worker struct {
	in chan int
	wg *sync.WaitGroup
}

func createWorker(id int, wg *sync.WaitGroup) worker {
	w := worker{
		in: make(chan int),
		wg: wg, // 传递的是指针, 因为 WaitGroup 只有一个, 大家共用
	}
	go doWorker(id, w.in, wg)
	return w
}

func chanDemo() {
	var wg sync.WaitGroup
	var workers [10]worker

	wg.Add(20) // 有 20 个任务
	for i := 0; i < 10; i++ {
		workers[i] = createWorker(i, &wg)
	}

	for i, worker := range workers {
		worker.in <- 'a' + i
		// Wg.Add(1)	// 这种做法也是可以的
	}
	for i, worker := range workers {
		worker.in <- 'A' + i
	}

	wg.Wait()

}

func main() {
	chanDemo()
}

输出如下:
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到