channel原理
inner workings
concurrency features
并发能力
goroutines
to execute tasks independently, potentially in parallel.
可以独立执行任务,也可以并行执行任务
channels
for communication, synchronization between goroutines.
在两个 goroutine 之间提供信息共享和同步
例如:处理每一个任务
1 | func main() { |
优化,使用 channel
缓存任务
1 | func main() { |
使用多个 goroutine
,并发处理任务
1 |
|
通过 channel
发送任务,并发处理任务
1 | func main() { |
channels are inherently interesting
goroutine-safe
线程安全
store and pass values between coroutines
在两个 goroutine 之间存储和传递 value
provide FIFO semantics
提供 先进先出的语义
can cause goroutines to block and unblock
可能导致 goroutine 阻塞和唤醒。
making channels
the hchan struct
创建 channels
,也就是 hchan
结构体。
make chan
创建 chan
有两种方式,无缓冲和带缓冲
1 | # buffered channel |
1 | # unbuffered channel |
带缓冲的 channel
具备上述四个特点。
go/src/runtime/chan.go
1 | type hchan struct { |
空 chan
,sendx = 0
,revex = 0
一个入队,sendx = 1
,revex = 0
再入两个,满队,sendx = 0
,revex = 0
出队一个,sendx = 0
,revex = 1
allocates memory
内存分配
allocates an hchan struct on the heap.
初始化 channel
的时候(也就是 hchan
),内存分配在堆上。同时返回一个 hchan
的指针。
这也就是为什么可以在两个函数之间传递 channels
,而不需要传递 channels
的指针。
sends and receives
goroutine scheduling
发送和接受,也就是 goroutine
之间调度
sends
task queue
例如在任务队列中,G1 负责分发任务,G2 负责接收任务。
G1:
1 | func main() { |
G2:
1 | func worker() { |
实际情况:
在 G1 侧:ch <- task0
将 task0
发送到 ch
acquire:拿锁
enqueue:将
task0
的结构体拷贝一份,放到cha
中(如果task0
是指针,则拷贝的是8个字节,如果是结构体,则拷贝整个结构体。)释放锁
在 G2 侧:t := <- ch
从 ch
中获取任务
获取锁
出队,将
task0
的拷贝从队列中取出来,重新赋值给t
释放锁
no shared memory ( except hchan )
可以看到,整个过程没有共享内存(除了 hchan
之外),而是使用 copies
拷贝对象。
Do not communicate by sharing memory; instead, share memory by communicating.
(no shared memory) copies
如果超过队列最大限度
发送两个任务到
ch
ch <- task1
ch <- task2
再发送一个任务到
ch
ch <- task3
当
ch
满了,再发送一个任务到ch
ch <- task4
此时 chanel is full
,队列已满。
G1 执行阻塞,在队列接受后恢复。(通过 GMP
调度模型,当 G1 在阻塞之后,P 会切换到 g0
到 M 上执行,G1 的状态变为 gopark
,从而避免其他 goroutine
无法被调度执行。)
当 task
被取走,G1 从阻塞状态恢复到正常状态。
the runtime scheduler
运行时调度
goroutine
是 用户空间的线程,也就是说,创建和管理都是通过 Go runtime(g0
),并不是 OS,相比 OS 的线程,Go runtime 的线程更加轻量。
runtime 调度会将 goroutines 调度到 OS 的线程上。
Go 的 M:N
调度模型可以用三个结构体来表示
M:OS thread,操作系统线程
G:goroutine
P:context for scheduling,调度器:
- 负责调度
- 为了运行 G,M 一定会持有一个 P
pausing goroutines
阻塞 goroutines
ch <- task4
往满 channel
中发送时,这个 G 进入阻塞状态,G 进入 gopark
,G1 改为 waiting
取消 G1 和 M 之间的关联关系
从 P 中调度一个可运行的 G到 M 上执行,让 M 不处于阻塞状态。
这种处理方式,G1 根据需要被阻塞,但操作系统线程没有被阻塞。
当有一个消息从 channel
中被取出,ch
没有满时,G1 不再被阻塞,从阻塞状态中恢复回来。
resuming goroutines
hchan
结构体中存储等待的 senders
和 receivers
go/src/runtime/chan.go
1 | type hchan struct { |
go/src/runtime/runtime2.go
1 | // sudog represents a g in a wait list, such as for sending/receiving |
G1 阻塞时,会创建一个 sudog
,并且将自己存储进去
并且将这个 sudog
放到 sendq
中。receiver
通过 sendq
来唤醒 G1。而且是在执行调度之前就将 G 唤醒(也就是先唤醒 G ,然后 G 等待被 M 调度)
G2 开始执行
t := <- ch
将 task1
从 ch
中取出来
取走之后,从 sendq
中获取链表中的 first sodug
将 sudog
中的 elem
中的 task4
放到 ch
中
将 G1 的状态从 gopark
改为 goready
并且将 G1 调度到本地队列,依据亲缘性原则,将 G1 放到 gonext
中
sends and receives when the receiver comes first.
发送和接受,是接收首先发生。
receives
当接收者阻塞
t := <- ch
从一个空 channel
中接收数据
此时,会将 G2 暂停,同样,将 G2 状态改为 gopark
,将 G2
放到 sudog
中,加入到 ch
的 recvq
中。
并且将 t 的指针存储到 elem 中(这是很有意思的点)
在 G1 中,当 task
被放到 ch
中时:
本应该将 task
放到 buf
中,然后修改 G2 状态到 goready
。
但是这里有一个 smarter
的方法:
在 elem
中存储了 t
的指针,G1 将 task
直接存储到 t
上。
而 t
其实是一个局部变量,也就是 t
是存储在栈上的。也就是说,G1 直接操作了 G2 的栈。
正常情况下,如果一个线程操作另外一个线程的栈,会直接报错,或者引起数据不一致。但是在 Go 中,可以通过保证 happens before
来确保数据一致性。
这种情况,只会发生在 runtime
中。
在恢复时,G2不需要获取通道锁并操作缓冲区。同时,也只会有很少的内存拷贝。
understand channels
goroutine-safe
hchan
中有互斥锁Store values, pass in FIFO
拷贝到
hchan
的buffer
,也从buffer
拷贝出来can cause goroutines to pause and resume
利用
hchan
的sudog
队列调用运行时调度器,
gopark
,goready
unbuffered channels
无缓冲的 channels
的工作方式总是与 direct send
方式一致:
- 接收者先行:发送者写入到接收者的
stack
上 - 发送者先行:接收者直接从
sudog
上直接获取数据
select
一般情况下:
- 给所有的 channel 加锁
- 将一个
sudog
放到所有channel
的sendq/recvq
队列中 - 解锁,处于
select
下的 G 置于阻塞状态 - 使用 CAS 操作,因此只会有一个
channel
可继续执行 - resuming mirrors the pause sequence.(恢复镜像暂停序列)
stepping back:
design considerations,设计思想。
Go channel 的设计思想主要是 简单(simplicity) 和 高效(performance)。
Simplicity
使用锁的队列优先与无锁队列:
“The performance improvement does not materialize from the air, it comes with code complexity increase.” — dvyokov
“性能的提升并非凭空出现,而是伴随着代码复杂度的增加。” — dvyokov
使用锁,相比代码和使用更加简单。
performance
调用 runtime
调度器:
- OS 线程不会被阻塞(G 阻塞时,M上会执行其他 G)
跨 goroutine
栈的读和写
- goroutine 唤醒的路径是无锁的(唤醒后,不再从
buffer
中加锁获取数据) - 减少潜在的内存拷贝(唤醒后,直接通过
sudog
拿到数据,无需拷贝到buffer
;或者唤醒前,将数据放到sudog
,也无需拷贝到buffer
)
这两点,通过在 GC、压缩栈上,优化了内存管理。
这是在简单性和高效性上做的精确的取舍。
unbuffered channels
无缓冲的 channels
1 | ch := make(chan int) |
无缓冲 channel
的工作方式:
如果是接收者先行:(接收者从 ch
中接收数据,<- ch
)
- 将
sudog
放到recvq
队列中,然后阻塞 - 随后,接收者将 直接发送 数据到
sudog
,然后恢复接收者
如果是发送者先行:(发送者往 ch
中发送数据,ch <- 1
)
- 将它自己作为
sudog
放到sendq
队列中,然后阻塞 - 接收者从
sudog
中 直接接收 数据,并且恢复发送者
selects
使用方式和例子
1 | var taskCh = make(chan Task, 3) |
reference
GopherCon 2017: Kavya Joshi - Understanding Channels