Go的concurrency之channels

Channels

Do not communicate by sharing memory; instead, share memory by communicating.

channels 是一种类型安全的消息队列,充当两个 goroutine 之间的通道,将通过它同步的进行任意资源的交换。chan 控制 goroutine 交互的能力从而创建了 Go 同步机制。

当创建的 chan 没有容量时,称为无缓冲通道。反过来,使用容量创建的 chan 称为缓冲通道。

要了解通过 chan 交互的 goroutine 的同步行为是什么,就需要知道通道的类型和状态。根据使用的是无缓冲通道还是缓冲通道,场景会有所不同。

无缓冲通道

ch := make(chan struct{})

image-20230921155701536

  • 无缓冲 chan 没有容量,因此进行任何交换前,需要两个 goroutine 同时准备好
  • 当 goroutine 试图将一个资源发送到一个无缓冲通道,并且没有 goroutine 等待接受该资源时,该通道将锁住发送 goroutine 并使其等待。
  • 当 goroutine 尝试从无缓冲通道接收,并且没有 goroutine 等待发送资源时,该通道将锁住接受 goroutine 并使其等待。

无缓冲信道的本质是保证同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (
"sync"
"time"
)

func main() {
c := make(chan string)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
c <- "foo"
}()

go func() {
defer wg.Done()

time.Sleep(time.Second * 1)
print("Message: " + <-c)
}()

wg.Wait()
}

第一个 goroutine 在发送消息 foo 之后被阻塞,因为还没有接受者准备好。规范中对这种行为进行了很好的解释:

If the capacity is zero or absent, the channel is unbuffered and communication succeeds only when both a sender and receiver are ready.

如果容量为零或不存在,则信道没有缓冲,只有当发送器和接收器都准备好时,通信才能成功。

https://go.dev/ref/spec#Channel_types

If the channel is unbuffered, the sender blocks until the receiver has received the value.

如果信道未缓冲,则发送器阻塞,直到接收器接收到该值。

https://go.dev/doc/effective_go#channels

总结一下:

  • Receive 先于 Send 发生
  • 好处:100% 保证能收到
  • 代价:延迟时间未知。

缓冲通道

image-20230921162908914

buffered channel 具有容量,因此其行为可能有点不同。

当 goroutine 试图将资源发送到缓冲通道,而该通道已满时,该通道将锁住 goroutine 并使其等待缓冲区可用。

如果通道中有空间,发送可以立即进行, goroutine 可以继续。

当 goroutine 试图从缓冲通道接受数据,而缓冲通道为空时,该通道将锁住 goroutine 并使其等待资源被发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"sync"
"time"
)

func main() {
c := make(chan string, 2)

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
c <- "foo"
c <- "bar"
}()

go func() {
defer wg.Done()

time.Sleep(time.Second * 1)
println("Message: " + <-c)
println("Message: " + <-c)
}()

wg.Wait()
}

缓冲区大小不足导致的延迟

Latencies due to under-sized buffer

chan 创建过程中定义的缓冲区大小可能会极大地影响性能。

这里密集地使用 chan 的扇出模式来查看不同缓冲区大小的影响。

在基准测试中,一个 producer 将在通道中注入百万个整数元素,而 5 个 worker 读取并将它们追加到一个名为 total 的结果变量中。

image-20230921163455423

image-20230921163510983

image-20230921163643628

总结:

  • Send 先于 Receive 发生
  • 好处:延迟更小(有空间时,不需要获取锁,减少上下文)
  • 代价:不保证数据到达,越大的 buffer,越小的保障到达。buffer = 1时,给你延迟一个消息的保障。

Go并发模式

Go Concurrency Patterns

Go 使用 chan 的几种方式:

Go Concurrency Patterns: Timing out, moving on

  • TIming out:超时

    1
    2
    3
    4
    5
    timeout := make(chan bool, 1)
    go func() {
    time.Sleep(1 * time.Second)
    timeout <- true
    }()

    通过带有1个缓冲区的 chan,实现跨 goroutine 超时控制。

    1
    2
    3
    4
    5
    6
    select {
    case <-ch:
    // a read from ch has occurred
    case <-timeout:
    // the read from ch has timed out
    }

    另外一个 goroutine 通过监听 channel 实现超时控制。

  • Moving on:继续流程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func Query(conns []Conn, query string) Result {
    ch := make(chan Result)
    for _, conn := range conns {
    go func(c Conn) {
    select {
    case ch <- c.DoQuery(query):
    // close(ch) // 这里也不能close,panic: send on closed channel
    default: // 这个不能遗漏,不然会出现泄漏
    }
    }(conn)
    }
    return <-ch
    }

Go Concurrency Patterns: Pipelines and cancellation

  • Pipeline:管道

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
    for _, n := range nums {
    out <- n
    }
    close(out)
    }()
    return out
    }

    通过入站通道从上游接收值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
    for n := range in {
    out <- n * n
    }
    close(out)
    }()
    return out
    }

    通过入站通道从上游接收值

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
    }

    对该数据执行某些功能,通常会产生新值

    1
    2
    3
    4
    5
    6
    func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
    fmt.Println(n) // 16 then 81
    }
    }

​ 通过站通道向下游发送值

  • Fan-out, Fan-in

    多个函数从同一个 channel 读取数据,直到该chan关闭;这称为扇出。这提供了一种在一组worker之间分配工作以并行化 CPU 使用和 I/O 的方法。简单理解为多个 goroutine 同时处理多个任务。master-workers

    函数可以从多个输入中读取数据,并通过将输入通道复用到单个通道上来继续执行,直到所有输入都关闭,该通道在所有输入都关闭时关闭。这称为扇入。简单理解为将多个 chan 的任务合并到一个 chan 中集中处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
    fmt.Println(n) // 4 then 9, or 9 then 4
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // 将多个 chan 的数据合并到一个 chan 
    func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
    for n := range c {
    out <- n
    }
    wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
    go output(c) // 没有使用闭包
    }

    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    go func() {
    wg.Wait() // 等待所有的 chan 都发送完成,再关闭 chan
    close(out)
    }()
    return out
    }
  • Cancellation:显式取消

    • Close 先于 Receive 发生(类似 Buffered)

    • 不需要传递数据,或者传递 nil

    • 非常适合取消和超时控制

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      func main() {
      in := gen(2, 3)

      // Distribute the sq work across two goroutines that both read from in.
      c1 := sq(in)
      c2 := sq(in)

      // Consume the first value from output.
      done := make(chan struct{}, 2)
      out := merge(done, c1, c2)
      fmt.Println(<-out) // 4 or 9

      // Tell the remaining senders we're leaving.
      done <- struct{}{}
      done <- struct{}{}
      }

      通过 chan 给其他 goroutines 发送结束通知

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
      var wg sync.WaitGroup
      out := make(chan int)

      // Start an output goroutine for each input channel in cs. output
      // copies values from c to out until c is closed or it receives a value
      // from done, then output calls wg.Done.
      output := func(c <-chan int) {
      for n := range c {
      select {
      case out <- n:
      case <-done: // 通过select实现取消控制
      }
      }
      wg.Done()
      }
      // ... the rest is unchanged ...

      也可以通过关闭 chan 实现快速关闭

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      func main() {
      // Set up a done channel that's shared by the whole pipeline,
      // and close that channel when this pipeline exits, as a signal
      // for all the goroutines we started to exit.
      done := make(chan struct{})
      defer close(done)

      in := gen(done, 2, 3)

      // Distribute the sq work across two goroutines that both read from in.
      c1 := sq(done, in)
      c2 := sq(done, in)

      // Consume the first value from output.
      out := merge(done, c1, c2)
      fmt.Println(<-out) // 4 or 9

      // done will be closed by the deferred call.
      }

      merge 时,通过判断 done ,接受取消信号。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
      var wg sync.WaitGroup
      out := make(chan int)

      // Start an output goroutine for each input channel in cs. output
      // copies values from c to out until c or done is closed, then calls
      // wg.Done.
      output := func(c <-chan int) {
      defer wg.Done()
      for n := range c {
      select {
      case out <- n:
      case <-done:
      return
      }
      }
      }
      // ... the rest is unchanged ...
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      func sq(done <-chan struct{}, in <-chan int) <-chan int {
      out := make(chan int)
      go func() {
      defer close(out) // 增加内部 close 机制,确保其输出通道在所有返回路径上关闭
      for n := range in {
      select {
      case out <- n * n:
      case <-done:
      return
      }
      }
      }()
      return out
      }
  • Context

    Go Concurrency Patterns: Timing out, moving on

    Go Concurrency Patterns: Pipelines and cancellation

    Advanced Go Concurrency Patterns

    go-kratos/kratos – errgroup,pipeline

设计理念

  • If any given Send on a channel CAN cause the sending goroutine to block:

    如果说给通道发消息的任何一个发送方,都可能会导致发送程序阻塞,那么:

    • Not allowed to use a Buffered channel large than 1.

      不允许使用大于 1 的缓冲 channel

      • Buffers larger than 1 must have reason/measurements

        大于 1 的缓冲channel必须有确定的理由或者确定的大小。

    • Must know what happens when the sending goroutine blocks

      必须知道发送程序阻塞时会发生什么情况

  • If any given Send on a channel WON’T cause the sending goroutine to block:

    如果说通道发消息的任何给一个送,都不会导致发送程序阻塞,那么:

    • You have the exact number of buffers for each send.

      每一个发送方都有准确的容量的缓冲。

      • Fan Out pattern

        扇出模式

    • You have the buffer measured for max capacity.

      已经确定了了缓冲区的最大容量。

      • Drop pattern

        水滴模式

  • Less is more with buffers.

    chan的缓冲区少即是多。

    • Don’t think about performance when thinking about buffers

      考虑缓冲区时不要考虑性能问题。性能是消费者决定的。

    • Beffers can help to reduce blocking latency between signaling

      缓冲区可减少信号之间的阻塞延迟

      • Reducing blocking latency towards zero does not nessarily mean better throughput

        将阻塞延迟降至零并不意味着吞吐量的提高

      • if a buffer of one is giving you good enough throughput then keep it

        如果一个缓冲区的吞吐量足够大,那就保留它。(是否使用带缓冲或者决定缓冲区大小,是性能和吞吐。)

      • Question buffers that are larger than one and measure for size.

        对大于 1 的缓冲区要仔细考量,并确定其大小。

      • Find the smallest buffer possible that provides good enough throughput

        找到尽可能小的缓冲区,以提供足够大的吞吐量

参考

The Behavior Of Channels

Go: Buffered and Unbuffered Channels:https://medium.com/a-journey-with-go/go-buffered-and-unbuffered-channels-29a107c00268

Go: Ordering in Select Statements:https://medium.com/a-journey-with-go/go-ordering-in-select-statements-fd0ff80fd8d6

The Nature Of Channels In Go

My Channel Select Bug

Advanced Go Concurrency Patterns

Concurrency is not parallelism

Go videos from Google I/O 2012

Go Concurrency Patterns: Timing out, moving on

Rethinking Classical Concurrency Patterns.pdf