Go语言管道Channel通信教程
目录
Channel基础概念
什么是Channel?
Channel是Go语言中用于goroutine之间通信的管道。它体现了Go的并发哲学:“不要通过共享内存来通信,而要通过通信来共享内存”。
Channel的特性
- 类型安全:每个channel只能传输特定类型的数据
- 同步机制:提供goroutine之间的同步
- 方向性:可以限制channel的读写方向
- 缓冲控制:支持无缓冲和有缓冲两种模式
Channel类型与创建
无缓冲Channel
package main
import (
"fmt"
"time"
)
func main() {
// 创建无缓冲channel
ch := make(chan string)
// 启动发送者goroutine
go func() {
time.Sleep(1 * time.Second)
ch <- "Hello, Channel!"
fmt.Println("Message sent")
}()
// 主goroutine接收消息
fmt.Println("Waiting for message...")
message := <-ch
fmt.Println("Received:", message)
}
有缓冲Channel
package main
import (
"fmt"
"time"
)
func main() {
// 创建缓冲大小为3的channel
ch := make(chan int, 3)
// 发送数据(不会阻塞,因为有缓冲)
ch <- 1
ch <- 2
ch <- 3
fmt.Printf("Channel length: %d, capacity: %d\n", len(ch), cap(ch))
// 接收数据
for i := 0; i < 3; i++ {
value := <-ch
fmt.Printf("Received: %d\n", value)
}
}
方向性Channel
package main
import "fmt"
// 只能发送的channel
func sender(ch chan<- string) {
ch <- "Hello from sender"
close(ch)
}
// 只能接收的channel
func receiver(ch <-chan string) {
for message := range ch {
fmt.Println("Received:", message)
}
}
func main() {
ch := make(chan string)
go sender(ch)
receiver(ch)
}
Channel操作详解
发送和接收
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 2)
// 发送操作
ch <- 42
ch <- 100
// 接收操作
value1 := <-ch
value2 := <-ch
fmt.Printf("Received: %d, %d\n", value1, value2)
// 带ok的接收操作
ch <- 200
close(ch)
value3, ok := <-ch
fmt.Printf("Received: %d, ok: %t\n", value3, ok)
value4, ok := <-ch
fmt.Printf("Received: %d, ok: %t\n", value4, ok) // ok为false,channel已关闭
}
关闭Channel
package main
import "fmt"
func producer(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
fmt.Printf("Sent: %d\n", i)
}
close(ch) // 关闭channel表示不再发送数据
}
func consumer(ch <-chan int) {
// 使用range遍历channel,直到channel关闭
for value := range ch {
fmt.Printf("Received: %d\n", value)
}
fmt.Println("Channel closed, consumer finished")
}
func main() {
ch := make(chan int, 2)
go producer(ch)
consumer(ch)
}
Select语句
基本Select用法
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Message from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Message from ch2"
}()
// select语句等待多个channel操作
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
}
}
}
带超时的Select
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "Delayed message"
}()
select {
case msg := <-ch:
fmt.Println("Received:", msg)
case <-time.After(2 * time.Second):
fmt.Println("Timeout: no message received within 2 seconds")
}
}
非阻塞Select
package main
import "fmt"
func main() {
ch := make(chan int, 1)
// 非阻塞发送
select {
case ch <- 42:
fmt.Println("Sent 42")
default:
fmt.Println("Channel is full, cannot send")
}
// 非阻塞接收
select {
case value := <-ch:
fmt.Printf("Received: %d\n", value)
default:
fmt.Println("No value available")
}
// 再次尝试非阻塞接收
select {
case value := <-ch:
fmt.Printf("Received: %d\n", value)
default:
fmt.Println("No value available")
}
}
Channel通信模式
生产者-消费者模式
package main
import (
"fmt"
"sync"
"time"
)
type Task struct {
ID int
Data string
}
func producer(tasks chan<- Task, wg *sync.WaitGroup) {
defer wg.Done()
defer close(tasks)
for i := 1; i <= 10; i++ {
task := Task{
ID: i,
Data: fmt.Sprintf("Task-%d", i),
}
tasks <- task
fmt.Printf("Produced: %s\n", task.Data)
time.Sleep(100 * time.Millisecond)
}
}
func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Consumer %d processing: %s\n", id, task.Data)
time.Sleep(200 * time.Millisecond) // 模拟处理时间
fmt.Printf("Consumer %d finished: %s\n", id, task.Data)
}
}
func main() {
tasks := make(chan Task, 5) // 缓冲channel
var wg sync.WaitGroup
// 启动生产者
wg.Add(1)
go producer(tasks, &wg)
// 启动多个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, tasks, &wg)
}
wg.Wait()
fmt.Println("All tasks completed")
}
管道模式
package main
import "fmt"
// 第一阶段:生成数字
func generate(nums chan<- int) {
for i := 1; i <= 10; i++ {
nums <- i
}
close(nums)
}
// 第二阶段:计算平方
func square(nums <-chan int, squares chan<- int) {
for num := range nums {
squares <- num * num
}
close(squares)
}
// 第三阶段:过滤偶数
func filter(squares <-chan int, evens chan<- int) {
for square := range squares {
if square%2 == 0 {
evens <- square
}
}
close(evens)
}
func main() {
nums := make(chan int)
squares := make(chan int)
evens := make(chan int)
// 启动管道的各个阶段
go generate(nums)
go square(nums, squares)
go filter(squares, evens)
// 输出最终结果
fmt.Println("Even squares:")
for even := range evens {
fmt.Println(even)
}
}
扇入模式
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, output chan<- string) {
for i := 1; i <= 3; i++ {
message := fmt.Sprintf("Worker %d - Message %d", id, i)
output <- message
time.Sleep(time.Second)
}
close(output)
}
func fanIn(inputs ...<-chan string) <-chan string {
output := make(chan string)
var wg sync.WaitGroup
// 为每个输入channel启动一个goroutine
for _, input := range inputs {
wg.Add(1)
go func(ch <-chan string) {
defer wg.Done()
for message := range ch {
output <- message
}
}(input)
}
// 等待所有输入完成后关闭输出channel
go func() {
wg.Wait()
close(output)
}()
return output
}
func main() {
// 创建多个worker的输出channel
ch1 := make(chan string)
ch2 := make(chan string)
ch3 := make(chan string)
// 启动workers
go worker(1, ch1)
go worker(2, ch2)
go worker(3, ch3)
// 扇入所有worker的输出
merged := fanIn(ch1, ch2, ch3)
// 接收合并后的消息
for message := range merged {
fmt.Println("Received:", message)
}
}
高级Channel技巧
Channel的Channel
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan chan string) {
for job := range jobs {
result := fmt.Sprintf("Worker %d processed job", id)
job <- result
close(job)
}
}
func main() {
jobs := make(chan chan string, 3)
// 启动workers
for i := 1; i <= 2; i++ {
go worker(i, jobs)
}
// 发送任务
for i := 1; i <= 5; i++ {
resultCh := make(chan string, 1)
jobs <- resultCh
// 等待结果
result := <-resultCh
fmt.Printf("Job %d result: %s\n", i, result)
}
close(jobs)
}
信号量模式
package main
import (
"fmt"
"sync"
"time"
)
type Semaphore chan struct{}
func NewSemaphore(capacity int) Semaphore {
return make(Semaphore, capacity)
}
func (s Semaphore) Acquire() {
s <- struct{}{}
}
func (s Semaphore) Release() {
<-s
}
func worker(id int, sem Semaphore, wg *sync.WaitGroup) {
defer wg.Done()
sem.Acquire() // 获取信号量
defer sem.Release() // 释放信号量
fmt.Printf("Worker %d started\n", id)
time.Sleep(2 * time.Second) // 模拟工作
fmt.Printf("Worker %d finished\n", id)
}
func main() {
const maxConcurrent = 3
sem := NewSemaphore(maxConcurrent)
var wg sync.WaitGroup
// 启动10个worker,但最多只有3个同时运行
for i := 1; i <= 10; i++ {
wg.Add(1)
go worker(i, sem, &wg)
}
wg.Wait()
fmt.Println("All workers completed")
}
实战案例
并发Web爬虫
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
type CrawlResult struct {
URL string
StatusCode int
Error error
Duration time.Duration
}
type Crawler struct {
maxConcurrent int
semaphore chan struct{}
}
func NewCrawler(maxConcurrent int) *Crawler {
return &Crawler{
maxConcurrent: maxConcurrent,
semaphore: make(chan struct{}, maxConcurrent),
}
}
func (c *Crawler) crawlURL(url string, results chan<- CrawlResult, wg *sync.WaitGroup) {
defer wg.Done()
// 获取信号量
c.semaphore <- struct{}{}
defer func() { <-c.semaphore }()
start := time.Now()
resp, err := http.Get(url)
duration := time.Since(start)
result := CrawlResult{
URL: url,
Duration: duration,
Error: err,
}
if err == nil {
result.StatusCode = resp.StatusCode
resp.Body.Close()
}
results <- result
}
func (c *Crawler) Crawl(urls []string) <-chan CrawlResult {
results := make(chan CrawlResult, len(urls))
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go c.crawlURL(url, results, &wg)
}
go func() {
wg.Wait()
close(results)
}()
return results
}
func main() {
urls := []string{
"https://www.google.com",
"https://www.github.com",
"https://www.stackoverflow.com",
"https://www.golang.org",
"https://www.reddit.com",
}
crawler := NewCrawler(3) // 最多3个并发请求
results := crawler.Crawl(urls)
fmt.Println("Crawling results:")
for result := range results {
if result.Error != nil {
fmt.Printf("❌ %s: %v\n", result.URL, result.Error)
} else {
fmt.Printf("✅ %s: %d (%v)\n", result.URL, result.StatusCode, result.Duration)
}
}
}
实时数据处理管道
package main
import (
"fmt"
"math/rand"
"time"
)
type DataPoint struct {
ID int
Value float64
Timestamp time.Time
}
type ProcessedData struct {
DataPoint
Processed bool
Result float64
}
// 数据生成器
func dataGenerator(output chan<- DataPoint) {
defer close(output)
for i := 1; i <= 20; i++ {
data := DataPoint{
ID: i,
Value: rand.Float64() * 100,
Timestamp: time.Now(),
}
output <- data
time.Sleep(100 * time.Millisecond)
}
}
// 数据处理器
func dataProcessor(input <-chan DataPoint, output chan<- ProcessedData) {
defer close(output)
for data := range input {
// 模拟数据处理
time.Sleep(50 * time.Millisecond)
processed := ProcessedData{
DataPoint: data,
Processed: true,
Result: data.Value * 2, // 简单的处理逻辑
}
output <- processed
}
}
// 数据过滤器
func dataFilter(input <-chan ProcessedData, output chan<- ProcessedData) {
defer close(output)
for data := range input {
// 只传递结果大于100的数据
if data.Result > 100 {
output <- data
}
}
}
func main() {
// 创建管道
rawData := make(chan DataPoint, 5)
processedData := make(chan ProcessedData, 5)
filteredData := make(chan ProcessedData, 5)
// 启动管道各阶段
go dataGenerator(rawData)
go dataProcessor(rawData, processedData)
go dataFilter(processedData, filteredData)
// 输出最终结果
fmt.Println("Filtered results (Result > 100):")
for data := range filteredData {
fmt.Printf("ID: %d, Original: %.2f, Result: %.2f, Time: %s\n",
data.ID, data.Value, data.Result, data.Timestamp.Format("15:04:05"))
}
}
总结
Channel是Go语言并发编程的核心工具,提供了优雅的goroutine间通信方式:
关键概念
- 无缓冲vs有缓冲:控制同步行为
- 方向性:限制channel的使用方式
- Select语句:处理多个channel操作
- 关闭channel:信号传递机制
常用模式
- 生产者-消费者:解耦数据生产和消费
- 管道:数据流式处理
- 扇入扇出:并发处理和结果聚合
- 信号量:控制并发数量
最佳实践
- 发送者负责关闭channel
- 使用range遍历channel
- 利用select实现超时和非阻塞操作
- 合理设置缓冲大小
- 避免channel泄漏
掌握Channel的使用是成为Go并发编程专家的必经之路。记住:通过通信来共享内存,而不是通过共享内存来通信。