goroutine
f() // 等待 f() 返回
go f() // 新建一个调用 f() 的 goroutine,不用等待
在 Go 语言里,goroutine 是并发执行的活动单元。与顺序执行程序不同,在有多个 goroutine 的并发程序中,不同函数可同时执行。程序启动时,首个调用main函数的 goroutine 为主 goroutine,新的 goroutine 通过go语句创建,go语句在函数或方法调用前加上go关键字,且go语句本身执行立即完成,不等待函数执行结束。
package main
import (
"fmt"
"time"
)
func main() {
go sinner(100 * time.Microsecond)
const n = 45
fibN := fib(n)
fmt.Printf("\rFibonacci(%d) = %d\n", n, fibN)
}
func sinner(delay time.Duration) {
for {
for _, r := range `-\|/` {
fmt.Printf("\r%c", r)
time.Sleep(delay)
}
}
}
func fib(x int) int {
if x < 2 {
return x
}
return fib(x - 1) + fib(x - 2)
}
示例:主 goroutine 计算第 45 个斐波那契数(使用低效递归算法,耗时较长),同时启动另一个 goroutine 运行spinner函数,spinner函数不断打印旋转字符提示程序在运行。当fib(45)计算完成,main函数输出结果后返回,此时所有 goroutine 强制终结,程序退出。除程序正常返回或退出外,无法从外部直接停止一个 goroutine,但可通过通信让其自行停止。程序中两个并发活动(指示器显示和斐波那契数计算)相互独立运行 。
示例:并发时钟服务器
顺序时钟服务器
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
handleConn(conn)
}
}
func handleConn(conn net.Conn) {
defer conn.Close()
for {
_, err := io.WriteString(conn, time.Now().Format("15:04:05\n"))
if err != nil {
return
}
time.Sleep(1 * time.Second)
}
}
- 实现原理:使用
net.Listen创建net.Listener在localhost:8000监听 TCP 连接,Accept方法阻塞等待连接,接收到连接后由handleConn函数处理。handleConn函数在循环中,每秒通过time.Now获取当前时间,利用net.Conn满足io.Writer接口的特性,将时间格式化后写入连接发送给客户端。若写入失败(如客户端断开连接),则关闭连接,继续等待下一个连接请求。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
mustCopy(os.Stdout, conn)
}
func mustCopy(dst io.Writer, src io.Reader) {
if _, err := io.Copy(dst, src); err != nil {
log.Fatal(err)
}
}
- 客户端连接:客户端可使用
telnet,nc(netcat)工具或自定义的基于net.Dial实现的 Go 版netcat程序连接服务器。顺序服务器一次只能处理一个客户端请求,第二个客户端需等第一个结束才能正常工作。
并发时钟服务器
func main() {
listener, err := net.Listen("tcp", "localhost:8000")
if err != nil {
log.Fatal(err)
}
for {
conn, err := listener.Accept()
if err != nil {
log.Print(err)
continue
}
go handleConn(conn)
}
}
在顺序时钟服务器基础上,只需在调用handleConn函数处添加go关键字,使其在新的 goroutine 内执行,就能让服务器并发处理多个客户端连接,多个客户端可同时接收到时间信息。
示例:并发回声服务器
简单回声服务器实现
func handleConn(c net.Conn) {
io.Copy(c, c) // 注意: 忽略错误
c.Close()
}
普通回声服务器可通过handleConn函数,利用io.Copy将读取到的内容写回客户端,之后关闭连接,但此版本忽略错误处理。
模拟真实回声的回声服务器
func echo(c net.Conn, shout string, delay time.Duration) {
fmt.Fprintln(c, "\t", strings.ToUpper(shout))
time.Sleep(delay)
fmt.Fprintln(c, "\t", shout)
time.Sleep(delay)
fmt.Fprintln(c, "\t", strings.ToLower(shout))
}
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
echo(c, input.Text(), 1*time.Second)
}
// 注意: 忽略input.Err()中可能的错误
c.Close()
}
echo函数:定义echo函数,接收net.Conn、字符串和延迟时间参数,先将字符串转为大写输出,延迟后输出原字符串,再延迟后转为小写输出,模拟真实回声效果。handleConn函数:在handleConn函数中,使用bufio.NewScanner读取客户端输入,循环调用echo函数处理输入内容,同样忽略了输入错误处理,处理完后关闭连接。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err!= nil {
log.Fatal(err)
}
defer conn.Close()
go mustCopy(os.Stdout, conn)
mustCopy(conn, os.Stdin)
}
netcat程序:客户端程序使用net.Dial连接服务器,通过两个mustCopy调用,一个将标准输入发送到服务器,另一个将服务器回复输出到标准输出,主 goroutine 处理输入,另一个 goroutine 处理输出,输入结束时程序停止。
实现并发回声效果
func handleConn(c net.Conn) {
input := bufio.NewScanner(c)
for input.Scan() {
go echo(c, input.Text(), 1*time.Second)
}
// 注意: 忽略input.Err()中可能的错误
c.Close()
}
为使回声真正并发,在调用echo函数时添加go关键字,让每个回声处理在单独的 goroutine 中执行,实现多个回声在时间上相互重合的并发效果。在添加go关键字实现并发时,要考虑net.Conn并发调用的安全性。
通道
通道是 Go 程序中 goroutine 之间的通信机制,可看作是特定类型元素的导管,如chan int表示存放int类型元素的通道。它是 goroutine 间发送特定值的通信桥梁,是并发编程中实现同步和数据传递的重要工具。
ch := make(chan int)
- 创建方式:使用内置的
make函数创建通道,如ch := make(chan int)。通道和map类似,是引用类型,复制或传递时传递的是引用,指向同一份数据结构,其零值为nil。同种类型通道可用==比较,若为同一通道引用则比较值为true,也可与nil比较 。
ch <- x // 发送语句
x = <-ch // 接收语句
<-ch // 丢弃
- 发送与接收:通道有发送(
send)和接收(receive)两个主要操作,使用<-操作符。发送语句如ch <- x;接收语句如x = <-ch,也可丢弃接收结果,如<-ch。
close(ch)
- 关闭操作:可使用
close函数关闭通道,关闭后发送操作会导致宕机,接收操作会获取已发送的值,通道为空时接收立即完成并获取元素类型零值 。
类型
ch = make(chan int) // 无缓冲通道
ch = make(chan int, 0) // 无缓冲通道
ch = make(chan inr, 3) // 容量为3的缓冲通道
使用简单make调用创建的是无缓冲通道,make还可接受第二个可选参数表示通道容量。容量为 0 时是无缓冲通道,如ch = make(chan int)或ch = make(chan int, 0) ;指定容量(如ch = make(chan int, 3) )则为缓冲通道 ,下面会再进行介绍缓冲通道 。
无缓冲通道
无缓冲通道上的发送操作会阻塞,直到有另一个 goroutine 在对应通道上执行接收操作,此时值传送完成,两个 goroutine 可继续执行;反之,接收操作先执行也会阻塞,直到有 goroutine 发送值。这种通信机制使发送和接收的 goroutine 同步化,所以无缓冲通道也叫同步通道 。
func main() {
conn, err := net.Dial("tcp", "localhost:8000")
if err!= nil {
log.Fatal(err)
}
done := make(chan struct{})
go func() {
io.Copy(os.Stdout, conn) // 注意: 忽略错误
log.Println("done")
done <- struct{}{} // 指示主 goroutine
}()
mustCopy(conn, os.Stdin)
conn.Close()
<-done // 等待后台 goroutine 完成
}
示例:客户端程序基础上,通过创建无缓冲通道done来同步主 goroutine 和后台 goroutine。主 goroutine 等待从done通道接收值,后台 goroutine 在完成任务(如io.Copy操作)后,向done通道发送一个值,主 goroutine 接收到值后才继续执行后续操作(如关闭连接 )。当用户关闭标准输入流,mustCopy返回,后台 goroutine 记录消息并向done通道发送值,主 goroutine 接收到值后关闭连接,保证程序按预期顺序执行 。
消息与事件的概念
- 通过通道发送消息时,不仅消息的值重要,通信本身及发生时间也很关键,这种用于同步、不携带额外信息的消息称为事件。可使用
struct{}元素类型的通道来强调同步功能,bool或int类型通道也可以 。
管道
管道是利用通道连接 goroutine 的一种方式,使一个 goroutine 的输出成为另一个的输入。通过通道在多个包含无限循环的 goroutine 间进行全生命周期通信 。
func main() {
naturals := make(chan int)
squares := make(chan int)
// counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()
// squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()
// printer (在主 goroutine 中)
for x := range squares {
fmt.Println(x)
}
}

示例:程序由
counter、squarer、printer三个 goroutine 和两个通道组成。counter产生自然数序列并通过naturals通道发送给squarer,squarer计算平方后通过squares通道发送给printer,printer输出结果 。通道关闭处理:当发送方知道无更多数据发送时,可关闭通道告知接收方停止等待 。如
counter在发送一定数量元素(如 100 个 )后关闭naturals通道,squarer接收到通道关闭信号后处理并关闭squares通道 。接收操作有变体,可返回接收值和表示是否成功的布尔值 ,利用此特性可判断通道是否关闭并处理 。也可使用range循环语法,在通道接收完所有值后自动结束循环 ,简化通道关闭和数据处理逻辑 。关闭通道的必要性:不是必须的,仅在需通知接收方数据发送完毕时进行 。通道回收由垃圾回收器根据可访问性决定,与文件关闭操作不同 。关闭已关闭通道会导致宕机,关闭通道还可作为广播机制 。
缓冲通道
ch = make(chan string, 3)
- 创建:缓冲通道通过
make函数创建,可设置容量参数,如ch = make(chan string, 3)创建一个能容纳三个字符串的缓冲通道 。 - 发送与接收操作:发送操作向通道队列尾部插入元素,接收操作从头部移除元素 。通道满时发送操作阻塞,通道空时接收操作阻塞,部分填满时发送和接收操作不阻塞 。可通过
cap函数获取通道容量,len函数获取当前元素个数 。
func mirroredQuery() string {
responses := make(chan string, 3)
go func() { responses <- request("asia.gopl.io") }()
go func() { responses <- request("europe.gopl.io") }()
go func() { responses <- request("americas.gopl.io") }()
return <-responses // return the quickest response
}
示例:它并发向三个镜像地址发送请求,将响应通过缓冲通道发送,只接收最早返回的响应 。缓冲通道在并发场景中解耦发送和接收 goroutine 的作用 。同时使用无缓冲通道可能导致 goroutine 泄漏(长时间阻塞无法结束 ),要合理选择无缓冲或缓冲通道以及设置缓冲通道容量。
缓冲通道与性能
以蛋糕店厨师工作场景类比,无缓冲通道类似厨师需等待下一个接收,同步性强;缓冲通道可容纳一定量任务,容量为 1 时可消除速率差异,容量更大可应对更大速率波动 。还提到可通过创建额外 goroutine 辅助处理任务,以优化程序性能 。
并行循环
- 初步并行:直接在循环中添加
go关键字启动 goroutine 进行并行处理,但此版本存在问题,外层 goroutine 可能在内部 goroutine 完成任务前就返回,导致任务未真正完成 。 - 使用通道同步:创建无缓冲通道,内层 goroutine 完成任务时向通道发送信号,外层 goroutine 通过接收通道信号计数,等待所有任务完成 。此过程需注意循环变量在闭包中的使用问题,避免 goroutine 读取到错误的变量值 。
- 处理错误返回:为使外层 goroutine 能获取内层 goroutine 执行函数的错误,创建用于接收错误的通道,内层 goroutine 将错误发送到通道,外层 goroutine 接收并处理错误 。但简单处理方式可能导致 goroutine 泄漏(如遇到错误时未正确结束 goroutine ),可通过使用有足够容量的缓冲通道或其他方案解决 。
- 返回处理结果:进一步改进,创建缓冲通道,内层 goroutine 将生成的内容及错误信息发送到通道,外层 goroutine 接收并处理 。
使用sync.WaitGroup同步
func makeThumbnails6(filenames <-chan string) int64 {
sizes := make(chan int64)
var wg sync.WaitGroup // 工作goroutine的个数
for f := range filenames {
wg.Add(1)
go func(f string) {
defer wg.Done()
thumb, err := thumbnail.ImageFile(f)
if err != nil {
log.Println(err)
return
}
info, _ := os.Stat(thumb) // 可以忽略错误
sizes <- info.Size()
}(f)
}
// closer
go func() {
wg.Wait()
close(sizes)
}()
var total int64
for size := range sizes {
total += size
}
return total
}
为更好地控制和等待所有 goroutine 结束,引入sync.WaitGroup 。在启动每个工作 goroutine 前使用Add方法增加计数,工作 goroutine 结束时调用Done方法减少计数,主 goroutine 通过Wait方法阻塞等待计数为 0,即所有工作 goroutine 完成 。同时,结合通道传递处理结果(如文件大小 ),实现更健壮的并行处理 。
使用 select 多路复用
select {
case <-ch1:
//...
case x := <-ch2:
//...use x...
case ch3 <- y:
//...
default:
//...
}
select语句用于在多个通道操作中进行选择,实现多路复用。它类似switch语句,有一系列情况和可选的默认分支,每个情况指定一次通道上的发送或接收操作及关联代码块 。select会一直等待,直到有一个通道操作可执行,然后执行对应语句,其他未满足条件的操作不会执行;若没有对应情况且无默认分支,select将永远等待 。
示例
func main() {
fmt.Println("Commencing countdown.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
<-tick
}
launch()
}
- 火箭发射倒计时:以火箭发射倒计时为例,最初的倒计时程序通过
time.Tick函数按固定时间间隔发送事件进行倒计时 。为实现可取消发射,启动一个 goroutine 从标准输入读取字符,若成功则向abort通道发送值 。使用select语句等待time.Tick通道的计时事件或abort通道的取消事件 。还可结合time.After函数设置超时,若在指定时间(如 10s )内未收到取消事件则开始发射 。
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x) // "0" "2" "4" "6" "8"
case ch <- i:
}
}
- 通道状态操作选择:对于缓冲区大小为 1 的通道
ch,通过select语句根据通道状态(空或满 )及循环变量i的奇偶性,决定是从通道接收还是向通道发送数据 。当多个情况同时满足时,select随机选择一个执行 。
func main() {
//...创建中止通道...
fmt.Println("Commencing countdown. Press return to abort.")
tick := time.Tick(1 * time.Second)
for countdown := 10; countdown > 0; countdown-- {
fmt.Println(countdown)
select {
case <-tick:
// 什么操作也不执行
case <-abort:
fmt.Println("Launch aborted!")
return
}
}
launch()
}
- 带输出的倒计时:改进火箭发射倒计时程序,在每次迭代中使用
select语句使程序等待 1s 以检测中止事件,同时输出倒计时数值 。
注意事项
ticker := time.NewTicker(1 * time.Second)
<-ticker.C // 从ticker的通道接收
ticker.Stop() // 造成ticker的goroutine终止
select {
case <-abort:
fmt.Printf("Launch aborted!\n")
return
default:
// 不执行任何操作
}
time.Tick函数使用可能导致的 goroutine 泄漏问题,因为即使不再接收其通道事件,对应的 goroutine 仍在运行 。建议使用time.NewTicker创建定时器,使用完后通过Stop方法终止相关 goroutine 。select语句可实现非阻塞通信,通过添加默认分支,在没有通道操作就绪时立即执行默认代码块 ,重复此操作可实现对通道轮询 。nil通道上的操作永远不会被选中 ,可利用这一特性开启或禁用特定情况 。
取消
在一些场景下,需要让 goroutine 停止当前任务,如 Web 服务器处理客户端请求时客户端断开连接 。但直接终止一个 goroutine 会使共享变量状态不确定,且难以准确知道有多少 goroutine 在工作,简单通过通道发送固定数量事件来取消多个 goroutine 存在计数不准确、操作卡住等问题 。
基于通道关闭的广播式取消机制
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
利用通道关闭特性实现广播式取消 。创建取消通道done ,不向其发送值,而是通过关闭它来表明取消操作 。定义cancelled函数,使用select语句检测done通道是否关闭 ,若关闭则返回true ,否则返回false 。同时,启动一个 goroutine 读取标准输入,一旦检测到输入(如用户按回车键 ),就关闭done通道广播取消事件 。
// 当检测到输入时取消遍历
go func() {
os.Stdin.Read(make([]byte, 1)) // 读一个字节
close(done)
}()
for {
select {
case <-done:
// 耗尽fileSizes以允许已有的goroutine结束
for range fileSizes {
// 不执行任何操作
}
return
case size, ok := <-fileSizes:
//...
}
}
- 主 goroutine:在主 goroutine 的
select语句中添加从done通道接收的情况 。当接收到取消信号时,先耗尽fileSizes通道中的值(防止卡住 ),然后返回 。
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
//...
}
}
walkDir函数:walkDir函数开始时轮询取消状态,若检测到取消(cancelled为true),则立即返回 ,不再执行后续操作 。在walkDir循环中也可进行取消状态轮询 ,避免取消后创建新的 goroutine 。
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // 获取令牌
case <-done:
return nil // 取消
}
defer func() { <-sema }() // 释放令牌
//...read directory...
}
dirents函数:在dirents函数中,使用select语句同时处理获取信号量令牌和检测取消通道done,若检测到取消则直接返回nil,实现更快速的取消响应 。通过这些措施,当取消事件发生时,后台 goroutine 能迅速停止,main函数返回,程序退出。
alkDir函数**:walkDir函数开始时轮询取消状态,若检测到取消(cancelled为true ),则立即返回 ,不再执行后续操作 。在walkDir`循环中也可进行取消状态轮询 ,避免取消后创建新的 goroutine 。
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // 获取令牌
case <-done:
return nil // 取消
}
defer func() { <-sema }() // 释放令牌
//...read directory...
}
dirents函数:在dirents函数中,使用select语句同时处理获取信号量令牌和检测取消通道done,若检测到取消则直接返回nil,实现更快速的取消响应 。通过这些措施,当取消事件发生时,后台 goroutine 能迅速停止,main函数返回,程序退出。
参考资料:《Go程序设计语言》