Golang中的channel

并行特性

通过goroutines实现并行执行程序。

通过channel实现goroutine之间的通信。

例如不使用并行执行任务:

1
2
3
4
5
6
7
func main() {
tasks := getTasks()

for _, task := range tasks {
do(task)
}
}

通过go关键字使用协程并发执行

1
2
3
4
5
6
7
8
func main() {
tasks := getTasks()

for _, task := range tasks {
task := task
go do(task)
}
}

通过channel实现任务队列分发,并且限定worker个数

1
2
3
4
5
6
7
8
9
10
11
12
func main() {
tasks := getTasks()

ch := make(chan Task, 3)
for i := 0; i < 3; i++ {
go worker(ch)
}

for _, task := range tasks {
ch <- task
}
}

channel特性

channel在使用过程中的特性:

  • 并发安全
  • 在goroutine之间存储、传递values
  • 提供FIFO(fist in first out)语义
  • 可能导致goroutine阻塞(block)和唤醒(unblock)

make chan

不带缓冲的channel

1
ch := make(chan Task)

带缓冲的channel

1
ch := make(chan Task, 1)

结构体,在runtime/chan.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements 所传递元素的指针,是一个环队列
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index 发送数据的角标
recvx uint // receive index 接收数据的角标
recvq waitq // list of recv waiters 接受者列表
sendq waitq // list of send waiters 发送者列表

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex 互斥锁
}

type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g // Go routine

next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack) // 即将发送的数据

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool

// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

当一个数据往channel中发送,占据buf容量如下:

image-20220919204303358

此时 sedx 为1,recvx为0,当环队列满的时候,则无法继续发送到队列中。

创建channel,make chan 会分配到堆上,最终返回一个指针。

image-20220919204452147

返回的一个堆区的地址,因此channel可以安全的在两个函数之间传递。

receives & sends

ch <- task0 :

  1. 加锁,lock.Lock()
  2. 将task0,copy一份,放到buf的数组中
  3. 释放锁,lock.Unlock()

t := <- ch :

  1. 加锁,lock.Lock()
  2. 出队,将task0,copy到t
  3. 释放锁,lock.Unlock()

阻塞和唤醒

发送这阻塞

当队列满时,发送者再往队里中发送消息会出现阻塞。除非有接受者从另一端接受数据。

goroutines是go runtime创建和维护的。

当队列满,此时发送者的G1(goroutine)会进入gopark阶段,会唤醒go的调度器,将该G1的状态改成waiting状态,M和G1解绑,M执行G0,调度local queue中的G到M上执行。

在从G1调度到G0之前,channel的sendq字段会记录该G的信息,以及发送的数据,暂存到sudog的elem中。

此时,G2从队列中取出1个数据:

首先,会将sendq中的elem取出来,放到channel中,然后将G1的状态设置为runanble。(也就是G2将G1唤醒)

接受者阻塞

当队列为空,接受者G2从队列中获取数据,接受者此时阻塞。

此时,接受者G2队列信息记录在recvq的字段中,g字段保留G2的信息,elem字段为队列接受者的地址(t := <- ch,t的地址),将G2状态设置为gopard。

当G1往队列中发送1个数据:

通过runtime,实现goroutine之间栈内存的操作,G1直接将数据写入到t中。(这个过程就不需要加锁、操作环形队列、赋值等过程,减少了内存拷贝的过程)。

总结

  • 线程安全(内部使用互斥锁)

  • 存储值,FIFO(数据拷贝到一个环形buffer)

  • 会导致goroutine阻塞和唤醒(通过sodug队列,和runtime 的Schedule切换G状态)