快速了解GO之Channel 通道

发布于:2025-05-27 ⋅ 阅读:(28) ⋅ 点赞:(0)

更多个人笔记见:
github个人笔记仓库
gitee 个人笔记仓库
个人学习,学习过程中还会不断补充~ (后续会更新在github上)

Channel通道

基本理解

概念:

  • 类似缓存区
  • 用于协程之间通信
    基本操作:
  • msgchannel := make(chan, int )
  • 写入:msgchannel <- i 写出: j := <- msgchannel

不用关心通道是否关闭,没有被引用的时候会被垃圾回收机制自动处理

常见示例

遍历channel
package main

import "fmt"

func main() {

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    for elem := range queue {
        fmt.Println(elem)
    }
}
通道传递
package main
		  import "fmt"
		  func ping(pings chan<- string, msg string) {
		      pings <- msg
		  }
		  func pong(pings <-chan string, pongs chan<- string) {
		      msg := <-pings
		      pongs <- msg
		  }
		  //the direction is set
		  func main() {
		      pings := make(chan string, 1)
		      pongs := make(chan string, 1)
		      ping(pings, "passed message")
		      pong(pings, pongs)
		      fmt.Println(<-pongs)
		      //output: passed message
		  }
channel循环输入输出
package main

import (
	"fmt"
)

func main() {
	jobs := make(chan int, 5)
	done := make(chan bool)

	go func() {
		for {
			j, more := <-jobs
			if more {
				fmt.Println("received job", j)
			} else {
				fmt.Println("received all jobs")
				done <- true
				return
			}
		}
	}()

	for j := 1; j <=3; j++ {
		jobs <- j
		fmt.Println("sent job", j)
		//time.Sleep(time.Millisecond) // 添加短暂延迟
	}
	close(jobs)
	fmt.Println("sent all jobs")
	<-done
	_, ok := <-jobs
	fmt.Println("received more jobs:", ok)
}

可以发现当 j 的循环范围增加的时候,会出现一些问问题

  • 当改为 7 时,发现会有先 received 后 sent 的现象,这是因为:主 goroutine 在发送 job 7 后被阻塞,等待 worker 接受了之后才打印出sent!! 可以取消注释添加打印延迟使得更加真实
通道死锁

无缓冲通道的读取和写入应该位于不同的协程中,不然死锁

func main() {
    var c = make(chan int)
	c <- 1
	<-c
}
//有死锁

//改进方案:
//增加了缓冲
func main() {
    c := make(chan int, 1) // 带1个缓冲的通道
    c <- 1
    <-c
    fmt.Println("Hello!")
}

//增加异步处理
func main() {
    c := make(chan int) //没有缓冲需要协程引入
    go func() {
        c <- 1
    }()
    <-c
}

通道关闭

利用close关闭
主通道关闭的时候也会收到通知

func main(){
	var c  = make(chan int)
	go func() {
		data,ok:= <-c
		fmt.Println("goroutine one: ",data,ok)
	}()
	go func() {
		data,ok:= <-c
		fmt.Println("goroutine two: ",data,ok)
	}()
	close(c)
	time.Sleep(1*time.Second)

}
// goroutine two:  0 false
// goroutine one:  0 false
通道作为一等公民分配
  • goroutine的顺序不固定但是工人分配到的工作是固定的
  • 通道可以作为类型放到数组中
package main

import (
	"fmt"
	"time"
)

func worker(id int, c chan int) {
	for n := range c { // // 这个for循环会一直运行,等待channel中有数据
		fmt.Printf("Worker %d received %d\n", id, n)
	}
    //只有当channel被关闭时,循环才会退出
}
func CreateWorker(id int) chan int {
	c := make(chan int)
	go worker(id, c)
	return c
}

func chanDemo() {
	var channels [10]chan int // create 10 channels
	for i := 0; i < 10; i++ {
		channels[i] = CreateWorker(i) // create 10 workers
        //此时 channel 中i 对应的位置还是一个空的channel c
	}

	for i := 0; i < 10; i++ {
		channels[i] <- 'a' + i
	}

	for i := 0; i < 10; i++ {
		channels[i] <- 'A' + i
	}
    // // 发送完所有数据后关闭channel (关闭会更加规范,虽然 main 函数结束也会释放)
    for i := 0; i < 10; i++ {
        close(channels[i])  // 显式关闭每个channel
    }
    //close(channels) //channels 是一个数组([10]chan int),而不是一个 channel 所以不用 close

	time.Sleep(time.Millisecond)
}

func main() {
	chanDemo()
	//'a' 97 开始往后 i 写入  'A'65 开始
}

通道访问理解

类似爬虫的逻辑,一个网站访问结束后开启新的协程检查

package main

import (
	"fmt"
	"net/http"
)

func main() {
	links := []string{
		"http://www.baidu.com",
		"http://www.jd.com/",
		"https://www.taobao.com/",
		"https://www.163.com/",
	}
	var c = make(chan string)
	for _, link := range links {
		go checkLink(link, c) // 并发执行
	}
	<-c //如果只有一个只会返回最先结束的子协程
	<-c
	<-c
	<-c
	// <-c //会卡死

}

func checkLink(link string, c chan string) {
	_, err := http.Get(link)
	if err != nil {
		fmt.Println(link, "might be down!")
		return
	}
	fmt.Println(link, "is up!")
	c <- "is up" // 通知主线程
}

select 结合使用

select的随机性
package main

import (
	"fmt"
)

func main() {
	c := make(chan int, 1)
	c <- 1
	select {
	//体现了
	case <-c:
		fmt.Println("random 01")
	case <-c:
		fmt.Println("random 02")
	}

}

//01 and 02

  • 可以使用default来避免阻塞 :
    case <-time.After(800 * time.Millisecond)
  • 当channel为nil时,它将永远不会被选中

循环 select 输出:

func main() {
	c := make(chan int,1)
	tick := time.Tick(time.Second) //创建了一个定时器通道,每隔一秒向通道发送一个当前时间
	for {
		select{
		case <-c:
			fmt.Println("random 01")
		case <-tick:
			fmt.Println("tick 01")
		case <-time.After(800*time.Millisecond): //设置1s就会只有tick
		//这个就是交替的
			fmt.Println("timeout 01")
		}
	}	
}

其他功能函数加入

TImer

在一定时间后将值发送到channel中 后面记得带上.C

package main
  
import (
    "fmt"
    "time"
)

func main() {

    timer1 := time.NewTimer(2 * time.Second)

    <-timer1.C // 阻塞等待,直到2秒后定时器触发
    fmt.Println("Timer 1 fired")

    timer2 := time.NewTimer(time.Second)
    go func() {
        <-timer2.C // 在goroutine中等待定时器
        fmt.Println("Timer 2 fired")
    }()
    stop2 := timer2.Stop() // 立即尝试停止定时器
    if stop2 {
        fmt.Println("Timer 2 stopped")// 停止成功则打印
    }

    time.Sleep(2 * time.Second)
}

Stop()函数阻止计时器触发,如果成功停止计时器则返回true

Ticker

帮助你定期重复做某事

package main
	  import (
	      "fmt"
	      "time"
	  )
	  func main() {
	      ticker := time.NewTicker(500 * time.Millisecond)
	      done := make(chan bool)
	      go func() {
	          for {
	              select {
	              case <-done:
	                  return
	              case t := <-ticker.C:
	              //will send repeatedly
	                  fmt.Println("Tick at", t)
	              }
	          }
	      }()
	      time.Sleep(1600 * time.Millisecond)
	      ticker.Stop()
	      done <- true
	      fmt.Println("Ticker stopped")
	  }
原子钟 Atomic Counter

可以看: https://gobyexample.com/atomic-counters

进阶示例

工人池
package main

import (
	"fmt"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int) {
	for j := range jobs {
		fmt.Println("worker", id, "started  job", j)
		time.Sleep(time.Second)
		fmt.Println("worker", id, "finished job", j)
		results <- j * 2
	}
}
func main() {
	const numJobs = 5
	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)
	for w := 1; w <= 3; w++ {
		go worker(w, jobs, results)
	}
	// to fill in the job
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)
	// to ensure the certain amount of the job is done
	for a := 1; a <= numJobs; a++ {
		<-results
	}
}

使用相同的通道来精确任务

速率限流
package main

import (
	"fmt"
	"time"
)

func main() {
	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)
	// like a array
	limiter := time.Tick(200 * time.Millisecond)
	//Ticker,every 200ms receive a value (for channel)
	for req := range requests {
		<-limiter
		fmt.Println("request", req, time.Now())
	}
	// print the formal rate

	burstyLimiter := make(chan time.Time, 3)
	// a bursty handler
	for i := 0; i < 3; i++ {
		burstyLimiter <- time.Now()
	}
	go func() {
		for t := range time.Tick(200 * time.Millisecond) {
			burstyLimiter <- t
		}
	}()
	//to fill in the limiter(3) after the initual output
	burstyRequests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		burstyRequests <- i
	}
	close(burstyRequests)
	for req := range burstyRequests {
		<-burstyLimiter
		// has three value at first, so handle at the same time
		fmt.Println("request", req, time.Now())
	}
}


网站公告

今日签到

点亮在社区的每一天
去签到