使用 Channel 等待任务结束
首先回顾上一节 channel 这一概念介绍时所写的代码:
package main
import (
"fmt"
"time"
)
func worker(id int, c chan int) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
}
}
func createWorker(id int) chan<- int {
c := make(chan int)
go worker(id, c)
return c
}
func chanDemo() {
var channels [10]chan<- int
for i := 0; i < 10; i++ {
channels[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
time.Sleep(time.Millisecond)
}
func main() {
chanDemo()
}
👆 上述代码的流程如下:
首先在 main 函数中找到入口,即 chanDemo。在 chanDemo 中我们使用 var 新建了一个存储 10 个接收消息的 chan int 类型的数组,并定义了 createWorker 方法来对每一个 chan<- int
类型对象进行构造。(值得注意的是,createWorker 方法是一个返回值是chan<- int
的函数)
在 createWorker 中,使用 go worker(id, c)
来启动协程,c 既作为返回值返回,也会作为 worker 函数的参数在启动的协程中被使用。
此时,程序有多个流程在并行执行(并发),分别是 10 个 worker 和一个 chanDemo。我们在 chanDemo 中向 chan<- int
类型的通道发送消息,当 worker 当中的 c 有消息接收到的时候,会将消息打印出来。
现在我们希望对上述代码进行进一步的优化,在 worker 函数当中,当 n 打印完毕时,我们希望通知外部,此处的输出已经完成了。过去的做法可能会通过共享内存来进行通信,但是在 Golang 当中我们可以使用 channel 来实现上述需求,即:直接通过通信来共享内存。
我们对 worker 函数进行改进,添加一个名为 done 的 chan bool 类型,用来告知外部当前是否但因完毕:
func worker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
done <- true
}
}
当然,在 chanDemo 函数中,需要有一个变量来接受 done 这个 chan bool 的值。
为此,我们新建一个结构,命名为 worker,并将上面同名的函数修改为 goWorker。worker 的定义如下:
type worker struct {
in chan int
done chan bool
}
相应地对 createWorker 方法进行修改:
func createWorker(id int) worker {
w := worker{ // 使用花括号显式地对 worker 进行构造
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w
}
函数 doWorker:
func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
done <- true // 当打印执行结束时, 将 true 输入到 chan bool 的 done 当中
}
}
最后的 chanDemo 函数定义如下:
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
<-workers[i].done // 不用使用变量来对值进行接受
// 直接显式地将值写出来, 即可在函数结束之前完成 chan int 的值的打印
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
<-workers[i].done
}
}
完整的代码如下:
package main
import (
"fmt"
)
func doWorker(id int, c chan int, done chan bool) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
done <- true
}
}
type worker struct {
in chan int
done chan bool
}
func createWorker(id int) worker {
w := worker{
in: make(chan int),
done: make(chan bool),
}
go doWorker(id, w.in, w.done)
return w
}
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i := 0; i < 10; i++ {
workers[i].in <- 'a' + i
<-workers[i].done
}
for i := 0; i < 10; i++ {
workers[i].in <- 'A' + i
<-workers[i].done
}
}
func main() {
chanDemo()
}
得到的输出如下:
一个问题在于,上述输出是顺序执行的,失去了并行的意义。
我们想要做的是,一次性将想要打印的 20 个信息发送给 10 个通道,发送所有信息之后再等待所有通道的打印结束。
一个可能的改进如下:
func chanDemo() {
var workers [10]worker
for i := 0; i < 10; i++ {
workers[i] = createWorker(i)
}
for i, worker := range workers {
worker.in <- 'a' + i // 向 channel 发送数据
}
for _, worker := range workers {
<-worker.done // 首先接受数据, 再发送数据
}
for i, worker := range workers {
worker.in <- 'A' + i
}
for _, worker := range workers {
<-worker.done
}
}
此时得到的结果是:
使用 WaitGroup
除了显式地新建一个名为 done 的 chan bool 之外,还可以使用一个名为 WaitGroup 对象来完成等操作,它定义在头文件 sync 当中。
WaitGroup 有三个主要的方法,分别是 Add、Done 和 Wait。Add 负责告知 WaitGroup 当前还有多少个任务要完成(对于我们上面的例子,新建 10 个通道,对每个通道传入两个值,则有 20 个任务);Done 在 worker 内部打印结束后执行,告知外部任务已经完成;Wait 在外部执行,表示等待协程运行完毕之后再执行下一条语句,在我们的示例中,没有下一条语句,则函数返回。
完整的示例如下:
package main
import (
"fmt"
"sync"
)
func doWorker(id int, c chan int, wg *sync.WaitGroup) {
for n := range c {
fmt.Printf("Worker %d received %c\n",
id, n)
wg.Done()
}
}
type worker struct {
in chan int
wg *sync.WaitGroup
}
func createWorker(id int, wg *sync.WaitGroup) worker {
w := worker{
in: make(chan int),
wg: wg, // 传递的是指针, 因为 WaitGroup 只有一个, 大家共用
}
go doWorker(id, w.in, wg)
return w
}
func chanDemo() {
var wg sync.WaitGroup
var workers [10]worker
wg.Add(20) // 有 20 个任务
for i := 0; i < 10; i++ {
workers[i] = createWorker(i, &wg)
}
for i, worker := range workers {
worker.in <- 'a' + i
// Wg.Add(1) // 这种做法也是可以的
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wg.Wait()
}
func main() {
chanDemo()
}
输出如下: