Go的concurrency之channels
Channels
Do not communicate by sharing memory; instead, share memory by communicating.
channels
是一种类型安全的消息队列,充当两个 goroutine 之间的通道,将通过它同步的进行任意资源的交换。chan
控制 goroutine 交互的能力从而创建了 Go 同步机制。
当创建的 chan
没有容量时,称为无缓冲通道。反过来,使用容量创建的 chan
称为缓冲通道。
要了解通过 chan
交互的 goroutine 的同步行为是什么,就需要知道通道的类型和状态。根据使用的是无缓冲通道还是缓冲通道,场景会有所不同。
无缓冲通道
ch := make(chan struct{})
- 无缓冲
chan
没有容量,因此进行任何交换前,需要两个 goroutine 同时准备好 - 当 goroutine 试图将一个资源发送到一个无缓冲通道,并且没有 goroutine 等待接受该资源时,该通道将锁住发送 goroutine 并使其等待。
- 当 goroutine 尝试从无缓冲通道接收,并且没有 goroutine 等待发送资源时,该通道将锁住接受 goroutine 并使其等待。
无缓冲信道的本质是保证同步
1 | package main |
第一个 goroutine 在发送消息 foo 之后被阻塞,因为还没有接受者准备好。规范中对这种行为进行了很好的解释:
If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready.
如果容量为零或不存在,则信道没有缓冲,只有当发送器和接收器都准备好时,通信才能成功。
– https://go.dev/ref/spec#Channel_types
If the channel is unbuffered, the sender blocks until the receiver has received the value.
如果信道未缓冲,则发送器阻塞,直到接收器接收到该值。
– https://go.dev/doc/effective_go#channels
总结一下:
- Receive 先于 Send 发生
- 好处:100% 保证能收到
- 代价:延迟时间未知。
缓冲通道
buffered channel
具有容量,因此其行为可能有点不同。
当 goroutine 试图将资源发送到缓冲通道,而该通道已满时,该通道将锁住 goroutine 并使其等待缓冲区可用。
如果通道中有空间,发送可以立即进行, goroutine 可以继续。
当 goroutine 试图从缓冲通道接受数据,而缓冲通道为空时,该通道将锁住 goroutine 并使其等待资源被发送。
1 | package main |
缓冲区大小不足导致的延迟
Latencies due to under-sized buffer
在 chan
创建过程中定义的缓冲区大小可能会极大地影响性能。
这里密集地使用 chan
的扇出模式来查看不同缓冲区大小的影响。
在基准测试中,一个 producer
将在通道中注入百万个整数元素,而 5 个 worker
读取并将它们追加到一个名为 total
的结果变量中。
总结:
- Send 先于 Receive 发生
- 好处:延迟更小(有空间时,不需要获取锁,减少上下文)
- 代价:不保证数据到达,越大的
buffer
,越小的保障到达。buffer = 1
时,给你延迟一个消息的保障。
Go并发模式
Go Concurrency Patterns
Go 使用 chan
的几种方式:
Go Concurrency Patterns: Timing out, moving on
TIming out:超时
1
2
3
4
5timeout := make(chan bool, 1)
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()通过带有1个缓冲区的
chan
,实现跨 goroutine 超时控制。1
2
3
4
5
6select {
case <-ch:
// a read from ch has occurred
case <-timeout:
// the read from ch has timed out
}另外一个 goroutine 通过监听 channel 实现超时控制。
Moving on:继续流程
1
2
3
4
5
6
7
8
9
10
11
12
13func Query(conns []Conn, query string) Result {
ch := make(chan Result)
for _, conn := range conns {
go func(c Conn) {
select {
case ch <- c.DoQuery(query):
// close(ch) // 这里也不能close,panic: send on closed channel
default: // 这个不能遗漏,不然会出现泄漏
}
}(conn)
}
return <-ch
}
Go Concurrency Patterns: Pipelines and cancellation
Pipeline:管道
1
2
3
4
5
6
7
8
9
10func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}通过入站通道从上游接收值
1
2
3
4
5
6
7
8
9
10func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}通过入站通道从上游接收值
1
2
3
4
5
6
7
8
9func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}对该数据执行某些功能,通常会产生新值
1
2
3
4
5
6func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
通过出站通道向下游发送值
Fan-out, Fan-in
多个函数从同一个
channel
读取数据,直到该chan
关闭;这称为扇出。这提供了一种在一组worker
之间分配工作以并行化 CPU 使用和 I/O 的方法。简单理解为多个 goroutine 同时处理多个任务。master-workers函数可以从多个输入中读取数据,并通过将输入通道复用到单个通道上来继续执行,直到所有输入都关闭,该通道在所有输入都关闭时关闭。这称为扇入。简单理解为将多个
chan
的任务合并到一个chan
中集中处理。1
2
3
4
5
6
7
8
9
10
11
12func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// 将多个 chan 的数据合并到一个 chan
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c) // 没有使用闭包
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait() // 等待所有的 chan 都发送完成,再关闭 chan
close(out)
}()
return out
}Cancellation:显式取消
Close 先于 Receive 发生(类似 Buffered)
不需要传递数据,或者传递 nil
非常适合取消和超时控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}通过 chan 给其他 goroutines 发送结束通知
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done: // 通过select实现取消控制
}
}
wg.Done()
}
// ... the rest is unchanged ...也可以通过关闭 chan 实现快速关闭
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}merge
时,通过判断done
,接受取消信号。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...1
2
3
4
5
6
7
8
9
10
11
12
13
14func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out) // 增加内部 close 机制,确保其输出通道在所有返回路径上关闭
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
Context
Go Concurrency Patterns: Timing out, moving on
Go Concurrency Patterns: Pipelines and cancellation
Advanced Go Concurrency Patterns
go-kratos/kratos – errgroup,pipeline
设计理念
If any given Send on a channel CAN cause the sending goroutine to block:
如果说给通道发消息的任何一个发送方,都可能会导致发送程序阻塞,那么:
Not allowed to use a Buffered channel large than 1.
不允许使用大于 1 的缓冲
channel
。Buffers larger than 1 must have reason/measurements
大于 1 的缓冲
channel
必须有确定的理由或者确定的大小。
Must know what happens when the sending goroutine blocks
必须知道发送程序阻塞时会发生什么情况
If any given Send on a channel WON’T cause the sending goroutine to block:
如果说通道发消息的任何给一个送,都不会导致发送程序阻塞,那么:
You have the exact number of buffers for each send.
每一个发送方都有准确的容量的缓冲。
Fan Out pattern
扇出模式
You have the buffer measured for max capacity.
已经确定了了缓冲区的最大容量。
Drop pattern
水滴模式
Less is more with buffers.
chan
的缓冲区少即是多。Don’t think about performance when thinking about buffers
考虑缓冲区时不要考虑性能问题。性能是消费者决定的。
Beffers can help to reduce blocking latency between signaling
缓冲区可减少信号之间的阻塞延迟
Reducing blocking latency towards zero does not nessarily mean better throughput
将阻塞延迟降至零并不意味着吞吐量的提高
if a buffer of one is giving you good enough throughput then keep it
如果一个缓冲区的吞吐量足够大,那就保留它。(是否使用带缓冲或者决定缓冲区大小,是性能和吞吐。)
Question buffers that are larger than one and measure for size.
对大于 1 的缓冲区要仔细考量,并确定其大小。
Find the smallest buffer possible that provides good enough throughput
找到尽可能小的缓冲区,以提供足够大的吞吐量
参考
The Behavior Of Channels
Go: Buffered and Unbuffered Channels:https://medium.com/a-journey-with-go/go-buffered-and-unbuffered-channels-29a107c00268
Go: Ordering in Select Statements:https://medium.com/a-journey-with-go/go-ordering-in-select-statements-fd0ff80fd8d6
The Nature Of Channels In Go
My Channel Select Bug
Advanced Go Concurrency Patterns
Concurrency is not parallelism
Go videos from Google I/O 2012
Go Concurrency Patterns: Timing out, moving on
Rethinking Classical Concurrency Patterns.pdf