Golang并发编程

并发和并行

并发(concurrency):指的是同一时刻只能有一条指令执行,但是多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果。同一时间点,任务并不会同时执行。

并行(parallel):同一时刻,有多条指令在多个处理器上同时执行。同一时间点,任务会同时执行。

可见,并行建立在多核的场景。Golang实现多核并行 非常容易。

1
2
3
4
5
6
7
8
9
10
11
12
cpunum := runtime.NumCPU()                                    // 获取CPU核数
fmt.Printf("CPU核数:%d", cpunum)
runtime.GOMAXPROCS(cpunum) // 设置最大并行个数
for i := 0; i < cpunum; i++ {
go func(i int) { // 通过 go 关键字开启并行执行
sum := 0
for j := 0; j < 10000; j++ {
sum += j
}
fmt.Printf("CPU核:%d,执行结果%d\n", i, sum)
}(i)
}

程序和进程

程序:是指安装在操作系统上的一堆二进制指令组成的可执行文件。

进程:当程序执行,加载在内存中,占用系统资源,产生一个进程。

进程状态

  • 初始态
  • 就绪态:等待CPU分配
  • 运行态:占用CPU
  • 挂起态:等待CPU意外的其他资源释放CPU
  • 终止态:结束运行

通过进程并发可能出现的问题:

  • 系统开销大,占用资源多,
  • 在unix、linux系统下,会产生孤儿进程和僵尸进程(正常情况下,子进程是通过父进程fork创建的,子进程再创建新进程,父进程无法预测子进程到什么时候结束,进程完成工作终止之后,父进程需要调用系统调用取得子进程的终止状态)(父进程结束,则子进程成为孤儿进程。)(进程终止,父进程尚未回收,子进程残留资源在内核中,变成僵尸进程。)

创建进程

创建进程的方法可以通过syscall包的exec函数(linux下可使用syscall

1
fmt.Println(syscall.Exec("/bin/ls", []string{"l"}, os.Environ()))

还可以通过os/exec包执行外部命令,以及和外部命令交互。

1
2
3
4
5
6
7
cmd := exec.Command("code")    // 例如启动vscode
err := cmd.Run() // 运行
out, err := cmd.CombinedOutput() // 获取输出或者错误
stdoutStderr, err := cmd.Output() // 获取输出
if err != nil {
fmt.Println(err)
}

CombinedOutput 的源码,实际是将标准输出和错误输出都输出出来

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *Cmd) CombinedOutput() ([]byte, error) {
if c.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
if c.Stderr != nil {
return nil, errors.New("exec: Stderr already set")
}
var b bytes.Buffer
c.Stdout = &b
c.Stderr = &b
err := c.Run()
return b.Bytes(), err
}

获取标准输出和错误输出

1
2
3
4
5
6
7
8
9
10
11
12
    cmd := exec.Command("ping", "abc", "-c 4")		// 执行命令
var stdout, stderr bytes.Buffer // 创建io.Writer对象
cmd.Stdout = &stdout // 绑定输出到io.Writer对象,输出到本程序的输出
cmd.Stderr = &stderr // 绑定错误输出到对象
cmd.Run()
fmt.Println("stdout: ", stdout.String())
fmt.Println("stderr: ", stderr.String())

// 或者
cmd := exec.Command("echo", "abc")
cmd.Stdout = os.Stdout
cmd.Run()

获取命令行输入

1
2
cmd := exec.Command("echo", "a", "b")
fmt.Println(cmd.Args)

获取进程信息

1
2
3
4
5
6
7
fmt.Println(cmd.Env)                    // 获取进程的环境变量
cmd.Env = []string{"def"} // 设置环境变量
fmt.Println(cmd.Process.Pid) // 进程Pid
fmt.Println(cmd.Path) // 程序路径
fmt.Println(cmd.ProcessState.String()) // 进程状态
fmt.Println(cmd.ProcessState.Pid()) // 进程Pid
cmd.Process.Kill() // 杀死进程

进程执行

1
2
3
4
5
6
7
8
cmd.Run()                        // 其实是Start和Wait方法实现的。

func (c *Cmd) Run() error {
if err := c.Start(); err != nil { // 命令运行,不等待执行结果
return err
}
return c.Wait() // 等待命令完成
}

通过管道通信

进程和进程之间通信,方式有很多,可以通过管道。

通过管道输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
cmd := exec.Command("echo", "-n", `{"Name": "Bob", "Age": 32}`)
stdout, err := cmd.StdoutPipe() // io.ReadCloser对象
if err != nil {
log.Fatal(err)
}
if err := cmd.Start(); err != nil { // 开始运行,过程保证管道可读,如果是cmd.Run(),则执行完成,管道也会关闭
log.Fatal(err)
}
var person struct {
Name string
Age int
}
if err := json.NewDecoder(stdout).Decode(&person); err != nil {
log.Fatal(err)
}
if err := cmd.Wait(); err != nil { // 等待命令执行完成,并且将标准输出和错误输出输出到指定位置
log.Fatal(err)
}
fmt.Printf("%s is %d years old\n", person.Name, person.Age)

通过管道输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe() // 创建进程输入管道
if err != nil {
log.Fatal(err)
}
go func() {
defer stdin.Close() // 管道关闭,命令执行
io.WriteString(stdin, "values written to stdin are passed to cmd's standard input") // 使用协程写入
}()
out, err := cmd.CombinedOutput() // 获取标准输出和错误输出的组合
if err != nil {
log.Fatal(err)
}
fmt.Printf("%s\n", out)

子进程(fork)和孤儿进程

通过一个进程启动另外一个进程

1
2
3
4
5
// 子进程代码:
for {
fmt.Println("I am living")
time.Sleep(10 * time.Second)
}

进程代码

1
2
3
4
5
6
7
8
9
10
11
cmd := exec.Command("./living")
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
fmt.Println(cmd.Process.Pid)
cmd.Stdout = os.Stdout
if err != nil {
log.Fatal(err)
}
cmd.Process.Kill() //

os.Kill函数注释

Kill causes the Process to exit immediately. Kill does not wait until the Process has actually exited. This only kills the Process itself, not any other processes it may have started.

主进程执行之后,执行不需要等待结果输出,执行完,退出。通过打印子进程的Pid,可以查看到子进程信息

1
2
# ps aux | grep 6094
6094 0.0 0.0 409202496 3520 s004 S 9:45PM 0:00.01 ./living

此时主进程退出,子进程依然存在,子进程为孤儿进程

结束孤儿进程的方法

1
2
3
4
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true, // 将该进程设为一个新的进程组
}
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) // -cmd.Process.Pid 传入进程组号,

创建子进程的另外一种方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
process, err := os.StartProcess("/bin/ls", []string{"l"}, &os.ProcAttr{Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}})
if err != nil {
log.Fatal(err)
}
state, err := process.Wait() // 如果没有wait,则可能出现父进程还在执行,子进程结束完成,但是子进程的进程描述符还在系统中,此时子进程变成僵尸进程。
if err != nil {
log.Println(err)
os.Exit(3)
}
fmt.Println(state.Pid()) // 子进程pid
fmt.Println(state.SystemTime()) // 子进程系统时间
fmt.Println(state.UserTime()) // 子进程用户时间
fmt.Println(state.Success()) // 子进程是否成功执行
fmt.Println(state.ExitCode()) // 子进程结束码
fmt.Println(state.Exited()) // 子进程是否退出

创建子进程的第三种方法

1
pid, err := syscall.ForkExec("/bin/ls", []string{"l"}, nil)

通过第三方包实现的方案,可以参考Docker的reexec

参考资料:

docker reexec源码分析

docker reexec源码

进程共享内存

进程之间共享内存,需要进程在同一台机器上。

Linux下需要通过结合cgo,通过syscall系统调用。参考代码:http://www.codebaoku.com/it-go/it-go-61414.html

windows下可以通过syscall直接创建共享内存,参考代码

进程锁

启动进程的时候,需要保证一个节点上同时只有一个进程运行,一般通过文件存放进程id实现进程锁。(Linux下,通过syscall.Flock函数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func LockProcess() {
lockFile := "/var/run/xxx.pid"
lock, err := os.Create(lockFile) // 创建进程锁文件
if err != nil {
log.Fatal("create lock file error:", err)
}
defer os.Remove(lockFile) // 解锁,文件删除
defer lock.Close() // 文件关闭

err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) // syscall调用文件锁,作用在文件描述符上
if err != nil {
log.Fatal("xxx is running,/var/run/xxx.pid exists")
os.Exit(1)
}

// todo logic
}

可以保证该程序多次执行,只会有一个进程。另外一个进程执行时,会出现报错

1
xxx is running,/var/run/xxx.pid exists

这个方案也可以用于文件锁,方法是获取文件描述符,syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)实现文件锁。加锁是创建文件锁,解锁时删除文件锁。

守护进程

守护进程是指当一个进程结束,守护进程负责将该进程启动起来。

1
2
3
4
5
6
7
8
9
10
11
12
RESTART:
cmd := exec.Command("./process", "")
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
log.Fatal(err)
}
if cmd.Process == nil || cmd.ProcessState.Exited() { // 判断子进程是否退出
goto RESTART
}

后台进程和守护进程的开源实现方式:xdaemon

多进程模型pagent

进程间通信的方法

Linux下进程间通信的方法有两类,一类是传输数据,二类是共享内存:

  • 管道:pipe,负责传送字节流,
  • 消息队列:message queue,传输结构化对象
  • socket:网络通信
  • 共享内存区
  • 信号

pipe

主进程通过管道接收子进程的输出:

1
2
3
4
5
6
7
cmd := exec.Command("echo", "-n", "hello")
stdoutPipe, _ := cmd.StdoutPipe()
cmd.Start()

bytes := make([]byte, 100)
stdoutPipe.Read(bytes)
fmt.Println(string(bytes))

匿名pipe

1
2
3
4
5
6
7
8
9
cmd := exec.Command("echo", "-n", "hello")

buf := bytes.Buffer{}

cmd.Stdout = &buf

cmd.Start()
cmd.Wait()
fmt.Println(buf.String())

通过os.pipe管道通信(以文件为主)

1
2
3
4
5
6
7
8
9
10
11
rpipe, wpipe, err := os.Pipe()
if err != nil {
log.Fatal(err)
}
n, _ := wpipe.WriteString("abcdef")
fmt.Println(n)

out := make([]byte, 10)
o, _ := rpipe.Read(out)
fmt.Println(o)
fmt.Println(string(out[:o]))

通过io.pipe通信(在内存中)

1
2
3
4
5
6
7
8
9
10
11
reader, writer := io.Pipe()
go func() {
data := []byte("abcdef")
n, _ := writer.Write(data) // 如果对端没有读取,会阻塞
fmt.Println(n)
}()

read := make([]byte, 10)
r, _ := reader.Read(read)
fmt.Println(r)
fmt.Println(string(read[:r]))

信号

1
2
3
4
5
6
7
8
9
10
11
12
13
sigs := make(chan os.Signal, 1)
done := make(chan struct{}, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) // 捕获interrupt的信号(ctrl+c)和termina的信号,将信号发送到sigs中

go func() {
fmt.Println("running") // 主要逻辑正在执行
sig := <-sigs // 接受到信号
fmt.Println(sig)
done <- struct{}{} // 主要逻辑结束完成
}()

<-done // 阻塞,等待主要逻辑执行
fmt.Println("exiting")

在进程优雅退出的场景,可以很好的使用到信号通信。

父进程通知子进程结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    var cmdline = `
ping baidu.com
`
cmd := exec.Command("bash", "-c", cmdline)
cmd.Stdout = os.Stdout
err := cmd.Start()
if err != nil {
log.Fatal(err)
}
go func() {
fmt.Println("chlid process running") // 模拟子进程运行
time.Sleep(2 * time.Second)
cmd.Process.Signal(os.Interrupt) // 将信号发送给子进程
}()
err = cmd.Wait()
if err != nil {
log.Fatal(err)
}

扩展阅读:信号种类

socket

例如网络编程,就是通过socket通信

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"log"
"net"
)

func main() {
listener, err := net.Listen("tcp", "127.0.0.1:8081") // 开启监听
if err != nil {
log.Fatal(err)
}
for {
fmt.Println("listening...")
accept, err := listener.Accept() // 阻塞,等待接收信息
if err != nil {
log.Fatal(err)
}
read := make([]byte, 1024)
_, err = accept.Read(read)
if err != nil {
log.Fatal(err)
}
fmt.Println("get: ", string(read))
rep := append([]byte("reploy"), read...)
_, err = accept.Write(rep)
if err != nil {
log.Fatal(err)
}
fmt.Println("send: ", string(rep))
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"fmt"
"log"
"net"
)

func main() {
dial, err := net.Dial("tcp", "127.0.0.1:8081") // 建立通信
if err != nil {
log.Fatal(err)
}
w := []byte("hello")
_, err = dial.Write(w)
if err != nil {
log.Fatal(err)
}
fmt.Println("send: ", string(w))
r := make([]byte, 1024)
_, err = dial.Read(r)
if err != nil {
log.Fatal(err)
}
fmt.Println("get: ", string(r))
}

扩展阅读:网络编程

线程并发

进程:有独立的地址空间,拥有PCB(进程信息被放在一个叫做进程控制块的数据结构中,也就是PCB(process control block))。是最小分配资源单位,可看成是只有一个线程的进程。

线程:轻量级的进程,本质还是进程,是最小的执行单位。有独立的PCB,但是没有独立的地址空间(共享)。

进程的概念与PCB

goroutine

goroutineGo语言中的轻量级线程,由Go运行时管理,Go会智能的将goroutine中的任务合理的分配给每个CPU。

1
2
3
4
5
6
7
8
// 获取CPU逻辑核个数
fmt.Println(runtime.NumCPU())

// 设置可同时执行的最大CPU数,最大并行个数
runtime.GOMAXPROCS(10)

fmt.Println(runtime.GOOS) // 返回go env中的goos
fmt.Println(runtime.GOROOT()) // 返回go env中的goroot

通过协程处理函数

1
2
3
go func() {
// todo
}()

由于主goroutine退出,其他的goroutine也会退出。可通过sync.Watigroup将主goroutine等待其他goroutine

1
2
3
4
5
6
7
wg := new(sync.WaitGroup)
wg.Add(1)
go func(wg *sync.WaitGroup) {
// todo
wg.Done()
}(wg)
wg.Wait()

线程同步

作用于注册线程数量,等待线程执行完成

1
2
3
4
wg := sync.WaitGroup{}
wg.Add(n) // 注册n个线程,线程数+n
wg.Done() // 结束一个线程,线程数-1
wg.Wait() // 等待线程数归0

runtime

运行时,处理golang运行过程中垃圾回收、协程调度等

出让当前协程

1
2
3
4
5
6
7
8
9
go func() {
for {
fmt.Println("子协程")
}
}()
for {
runtime.Gosched() // 让当前协程让出CPU以让其他线程运行,它不会挂起当前线程,因此当前线程会在未来继续执行
fmt.Println("主协程")
}

终止调用它的goroutine。没有其他goroutine受到影响。Goexit在终止goroutine之前运行所有延迟调用。

1
2
3
4
5
6
7
8
9
10
11
go func() {
for {
time.Sleep(10 * time.Millisecond)
fmt.Println("子协程")
}
}()
for {
runtime.Goexit() // 结束当前协程,后续代码不再执行。(defer内的会执行)
time.Sleep(10 * time.Millisecond)
fmt.Println("主协程")
}

channel

Go语言的并发模型是CSP,提倡通过通信共享内存,而不是通过共享内存实现通信。channel就是实现协程之间相互通信的载体。channel是一种特殊的类型,通道,遵循先入先出,保证收发顺序,而且是并发安全。

无缓冲

1
2
3
ch := make(chan int)

ch <- 1 // 阻塞,无缓冲channel接受数据时,对端没有接受者,此时会出现阻塞

有缓冲

1
2
3
ch1 := make(chan int, 1)

len(ch1), cap(ch1) // len:返回channel内部数据个数,cap返回channel容量

关闭channel

1
close(ch)
  • 对一个关闭的通道再次发送值会导致panic
  • 对一个关闭的通道接收,会一直获取值,直到通道为空
  • 对一个关闭且没有值的通道执行接收操作,会得到响应类型的零值
  • 关闭一个已经关闭的通道会panic
  • nil channel关闭出现panicpanic: close of nil channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
ch1 := make(chan int)
fmt.Printf("len: %d,cap: %d\n", len(ch1), cap(ch1))
go func() {
for {
i := <-ch1
time.Sleep(100 * time.Millisecond)
fmt.Println("get from channel: ", i)
}
}()
for i := 0; i < 5; i++ {
ch1 <- i
}
close(ch1)
time.Sleep(5 * time.Second)

输出

1
2
3
4
5
6
7
8
9
len: 0,cap: 0
get from channel: 0
get from channel: 1
get from channel: 2
get from channel: 3
get from channel: 4
get from channel: 0
get from channel: 0
get from channel: 0

for-range读取channel

1
2
3
4
5
6
7
8
9
10
11
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
close(ch)
}()

for i := range ch { // 当channel关闭,range结束
fmt.Println("get from channel: ", i)
}

单向通道

1
2
3
chout := make(<-chan int)            // 只能接受
chin := make(chan<- int) // 只能发送
chin <- <-chout // 从chout中接收数据,发送到chin

作为函数参数,函数声明形参只能接受单向通道,则实参需要是满足方向的channel

1
2
3
4
5
6
func outchannel(ch <-chan int) {}

ch := make(chan int)
outchannel(chout)
outchannel(ch) // 双向通道,满足读取
outchannel(chin) // 无法读取出来,cannot use chin (variable of type chan<- int) as type <-chan int in argument to outchannel

读取时判断是否是零值

1
2
3
4
i, ok := <-ch1  
if ok { // 读取channel中的数据,ok为true,零值为false
fmt.Println("get from channel: ", i)
}

channel本质是一个指针,作为函数参数传递时,拷贝指针传递。

channel源码,在go/src/runtime/chan.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
type hchan struct {
qcount uint // total data in the queue 元素数量
dataqsiz uint // size of the circular queue 底层循环数组长度
buf unsafe.Pointer // points to an array of dataqsiz elements // 指向底层循环数组的指针,值针对有缓冲的channel
elemsize uint16 // chan中元素大小
closed uint32 // 是否关闭的标志
elemtype *_type // element type 元素类型
sendx uint // send index 已发送元素在循环数组中的索引
recvx uint // receive index 已接收元素在循环数组中的索引
recvq waitq // list of recv waiters 等待接受的goroutine队列
sendq waitq // list of send waiters 等待发送的goroutine队列

// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex // 锁,保护hchan中所有字段
}

type waitq struct {
first *sudog
last *sudog
}

type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool

// success indicates whether communication over channel c
// succeeded. It is true if the goroutine was awoken because a
// value was delivered over channel c, and false if awoken
// because c was closed.
success bool

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

死锁

当所有协程都处于阻塞状态,就会出现死锁

fatal error: all goroutines are asleep - deadlock!

1
2
3
ch := make(chan string)
ch <- "hello"
fmt.Println(<-ch)

select

通过select可以监听channel上的数据流动,并且可以同时监听多个channel。但是只处理一个。

1
2
3
4
5
select {            // 自由切换,多路复用
case <-ch1:
case <-ch2:
default: // 如果所有的chan都阻塞,默认会走default
}
  • 如果所有的channel阻塞,则会走default逻辑
  • 如果有channel能读取数据,则读取该channel中的数据
  • 如果channel close,则会一直读取该channel的数据
  • select会阻塞当前goroutine
  • select存在多个满足的case,则随机选择一个
  • select只存在一个case,则阻塞住该case

超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ch := make(chan int)
go func() {
GOTO:
for {
select {
case n := <-ch:
fmt.Println("get from ch", n)
case <-time.After(3 * time.Second):
fmt.Println("timeout")
break GOTO
}
}
fmt.Println("go routine done")
}()
ch <- 1
time.Sleep(5 * time.Second)
fmt.Println("main done")

计时器,处理一个时间

1
2
3
4
5
6
7
8
9
10
11
t := time.NewTimer(3 * time.Second)    // 3 秒之后发送一个消息到队列
go func() {
<-t.C // 获取消息
fmt.Println("Timer over")
}()
//t.Reset(3 * time.Second) // 重置定时
//t.Stop() // 防止定时器触发。如果调用停止计时器,则返回true;如果计时器已过期或已停止,则返回false。Stop不会关闭通道,以防止通道读取错误。
time.Sleep(5 * time.Second)
if t.Stop() { // 如果调用停止计时器,则返回true;如果计时器已过期或已停止,则返回false
fmt.Println("time stop ")
}

定时器,定时处理一个时间

1
2
3
4
5
6
7
8
9
10
t := time.NewTicker(1 * time.Second)    // 打点器,1秒执行一次
go func() {
for {
select {
case <-t.C: // 没隔一段时间就可以从t.C中获取一个数据
fmt.Println("time pass", time.Now())
}
}
}()
time.Sleep(5 * time.Second)

延时回调

1
2
3
4
5
6
7
exit := make(chan int)
fmt.Println("start")
time.AfterFunc(3*time.Second, func() { // 3 秒之后,处理函数
fmt.Println("boom")
exit <- 0
})
<-exit

线程安全

当多个线程共同访问一个资源,对资源同时进行读写,就会出现资源竞争,导致数据产生错误的结果。

解决资源竞争的常用方式是锁。

Gosync包提供了互斥锁读写锁,用于处理并发过程中可能出现同时两个或多个协程读写同一个变量的情况。

互斥锁

互斥锁只能同时被一个goroutine锁定,其他goroutine将阻塞直到互斥锁被解锁(重新争抢对互斥锁的锁定)。

源码

1
2
3
4
type Mutex struct {
state int32
sema uint32
}

互斥锁是互斥锁。互斥锁的零值是未锁定的互斥锁。

互斥锁首次使用后不得复制。因此一般初始化指针变量。

锁定的互斥锁与特定的goroutine不关联。一个goroutine可以重新锁定(锁定)一个RWMutex,然后安排另一个goroutine运行锁定(解锁)它。

方法

1
2
3
func (m *Mutex) Lock() {}            // 加锁,如果已经加了锁,则会阻塞
func (m *Mutex) TryLock() bool {} // TryLock尝试锁定m并报告它是否成功。很少会用到。
func (m *Mutex) Unlock() { // 解锁,如果已经解锁,再解锁会panic fatal error: sync: unlock of unlocked mutex

例如:

1
2
3
4
5
6
7
8
9
10
mu := sync.Mutex{}
for i := 0; i < 10; i++ {
go func(i int) {
mu.Lock()
defer mu.Unlock()
fmt.Printf("%d get lock\n", i)
time.Sleep(100 * time.Millisecond)
fmt.Printf("%d release lock\n", i)
}(i)
}

输出

1
2
3
4
5
6
7
8
9
10
1 get lock
1 release lock
4 get lock
4 release lock
2 get lock
2 release lock
3 get lock
3 release lock
0 get lock
0 release lock

可以看到,只有获得锁的goroutine释放锁,其他goroutine才能获得锁。

读写锁

  • 同时只有一个goroutine能够获得写锁定;
  • 同时可以有任意多个goroutine获得读锁定;
  • 同时只能存在写锁定和读锁定(读和写互斥);

源码:

1
2
3
4
5
6
7
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

RWMutex的零值是未锁定的mutex

首次使用后不得复制RWMutex。因此一般初始化为指针变量。

方法

1
2
3
4
5
6
7
func (rw *RWMutex) RLock() {}                    // 加读锁
func (rw *RWMutex) TryRLock() bool {} // 尝试加读锁,并且返回操作结果
func (rw *RWMutex) RUnlock() {} // 解读锁,如果解锁时没有读锁,会panic
func (rw *RWMutex) Lock() {} // 加写锁
func (rw *RWMutex) TryLock() bool {} // 尝试加写锁,并且返回操作结果
func (rw *RWMutex) Unlock() {} // 解写锁,如果没有写锁,调用时会panic
func (rw *RWMutex) RLocker() Locker {} // 返回一个Locker接口,可以调用Lock和Unlock方法

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
rw := sync.RWMutex{}
for i := 0; i < 10; i++ {
go func(i int) {
rw.Lock()
defer rw.Unlock()
fmt.Printf("%d get write lock\n", i)
time.Sleep(100 * time.Millisecond)
fmt.Printf("%d release write lock\n", i)
}(i)
}

for i := 0; i < 10; i++ {
go func(i int) {
rw.RLock()
defer rw.RUnlock()
fmt.Printf("%d get read lock\n", i)
time.Sleep(100 * time.Millisecond)
fmt.Printf("%d release read lock\n", i)
}(i)
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
0 get write lock
0 release write lock
2 get read lock
1 get read lock
3 get read lock
4 get read lock
0 get read lock
0 release read lock
4 release read lock
1 release read lock
2 release read lock
3 release read lock
1 get write lock
1 release write lock
3 get write lock
3 release write lock
4 get write lock
4 release write lock
2 get write lock
2 release write lock

可以看到,当一个goroutine加上写锁,则既无法再加写锁或者读锁。当写锁释放之后,可以加任意多的读锁,读写锁不能同时加。

读写锁相比,在读很多,写很少的场景,效率比互斥锁高。

  • 锁通过阻塞线程解决数据竞态问题
  • 已经加锁,再加锁会进入阻塞状态
  • 已经解锁,再解锁,会出现panic,无论是互斥锁还是读写锁

原子操作

原子操作指操作具有原子性,该操作不能被多个goroutine分割,而且相比加锁操作,加锁会涉及上下文切换,原子操作性能更好。原子操作由内置的sync/atomic包提供。

1
2
3
4
5
6
7
8
9
10
11
var sum int64 = 0
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
atomic.AddInt64(&(sum), int64(1))
wg.Done()
}()
}
wg.Wait()
fmt.Println(sum)

atomic包中的方法可以分为以下几类:

  • 读取
  • 写入
  • 交换并且交换
  • 比较
  • 增加
1
2
3
4
5
func StoreInt64(addr *int64, val int64)                        // 将val写入addr指向的内存中
func LoadInt64(addr *int64) (val int64) // 从addr的值中读取
func AddInt64(addr *int64, delta int64) (new int64) // 增加,返回增加后的值
SwapInt64(addr *int64, new int64) (old int64) // 将addr指向的值交换成新的值,返回老值
CompareAndSwapInt64(addr *int64, old int64, new int64) (swapped bool) // 比较addr指向的值和old是否相等,如果相等,则替换成new值,并且返回true

Once

保证指定函数代码只执行一次,类似于单例模式,常用语应用启动时的一些全局初始化操作。

例如通过全局变量实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
type Tool struct {
values int
}

var instance *Tool

func NewInstance() *Tool {
if instance == nil {
instance = new(Tool)
}
return instance
}

但是在多线程下,可能会多次申请内存,所以需要加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type Tool struct {
values int
}

var instance *Tool
var mu *sync.Mutex = new(sync.Mutex)

func NewInstance() *Tool {
mu.Lock()
defer mu.Unlock()
if instance == nil {
instance = new(Tool)
}
return instance
}

但是加锁和解锁会消耗性能

那么可以转换思路,通过init()函数初始化,其实也是利用了init函数只会执行一次的特点。确定是程序启动时就创建在内存中,无法通过程序判断是否需要使用和初始化。

优化加锁的方式,双重判断,在第一次创建的时候加锁,后续不加锁。

1
2
3
4
5
6
7
8
9
10
func NewInstance() *Tool {
if instance == nil {
mu.Lock()
if instance == nil {
instance = new(Tool)
}
defer mu.Unlock()
}
return instance
}

或者使用sync.Once的方式。源码

1
2
3
4
type Once struct {
done uint32 // 标记是否已经执行
m Mutex
}

使用

1
2
3
4
5
6
once := sync.Once{}
for i := 0; i < 1000; i++ {
once.Do(func() { // once只有一个方法,就是DO,保证只会执行一次
instance = new(Tool)
})
}

条件变量

channel的消费者比生产者多,可以在对应的共享数据状态发生变化时,通知阻塞在某个条件上的写成。

例如当channel满了,需要通知生产者不生产,当channel空了,需要通知消费者不消费。

sys.Cond类型代表条件变量,条件变量要与锁一起使用。

当然,如果使用golangchannel,会自动阻塞,就不需要使用条件变量。这种场景比较适合在限流或者不会自动阻塞的生产者消费者,需要不停的确认状态的情况。

源码

1
2
3
4
5
6
7
8
9
type Cond struct {
noCopy noCopy

// L is held while observing or changing the condition
L Locker

notify notifyList
checker copyChecker
}

常用方法有三个:Wait Signal Broadcast

Wait:阻塞等待条件变量满足;释放已经加上的互斥锁(前两这两步相当于一个原子操作);当被唤醒,Wait函数返回时,解除阻塞并重新获取互斥锁。

Signal:单发通知,给一个正等待在该条件变量上的线程发送通知

Broadcast:广播通知,给所有正等待在该条件变量上的线程发送通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
var cond sync.Cond

func producer(out chan<- int, index int) {
for {
cond.L.Lock()
for len(out) == 10 { // 不停地判断是否是满的
fmt.Printf("%d full.\n", index)
cond.Wait() // 如果是满的,阻塞 // 如果没有cond,则需要循环检测是否是满的,会一直占用CPU资源
}
num := rand.Intn(1000)
out <- num
fmt.Printf("producer %d send %d to channel\n", index, num)
time.Sleep(time.Second)
cond.L.Unlock()
cond.Signal() // 生产了一个,发送消息,如果是生产者接受,则会继续判断,是否是满的,如果是消费者接受到消息,则消费者会退出阻塞
// 这里换成cond.Broadcast 也是合适的。
}
}

func consumer(in <-chan int, index int) {
for {
cond.L.Lock()
for len(in) == 0 { // 不停地判断是否是空的,如果是空的,阻塞
fmt.Printf("%d empty.\n", index)
cond.Wait() // 等待条件变量发送消息
}
num := <-in
fmt.Printf("consumer %d get %d from channel\n", index, num)
time.Sleep(time.Second)

cond.L.Unlock()
cond.Signal()

}
}

func main() {
ch := make(chan int, 10)
rand.Seed(time.Now().UnixNano())
cond.L = new(sync.Mutex)
for i := 0; i < 10; i++ {
go producer(ch, i)
}

for i := 0; i < 1; i++ {
go consumer(ch, i)
}

select {}

}

临时对象池

当对象过多,频繁GC,会对性能产生影响,此时如果对象可以重复利用,则会大大减少GC的资源消耗。

sync.Pool就是一个临时对象池,可以用来临时存储对象,下次使用从对象池获取,避免重复创建对象。(池化的使用场景例如数据库链接、网络连接,但是这两种场景并不适合使用sync.Pool)。当然,GC会清除sync.Pool缓存的对象,对象的缓存有效期为下一次GC之前。

获取对象

  • 尝试从私有对象获取
  • 私有对象不存在,尝试从当前Processor的共享池获取
  • 如果当前Processor共享池也是空的,那么尝试去其他Processor的共享池获取
  • 如果所有池都是空的,最后就用用户指定的New函数产生一个新的对象返回

其中,私有对象池是协程安全的,Processor访问不需要加锁。共享池是线程不安全的,Processor访问需要加锁。

放回对象

  • 如果私有对象不存在,则保存为私有对象
  • 如果私有对象存在,放入当前Processor子池的共享池中

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type Pool struct {
noCopy noCopy

local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array

victim unsafe.Pointer // local from previous cycle
victimSize uintptr // size of victims array

// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() any // 当池子为空,默认通过New函数创建的值返回
}

方法:

1
2
func (p *Pool) Put(x any)        // 放入对象池
func (p *Pool) Get() any // 从对象池取

例如

1
2
3
4
5
6
7
8
9
10
pool := sync.Pool{}
pool.New = func() any {
return "this is default"
}
pool.Put("abc")
//runtime.GC()
fmt.Println(pool.Get()) // abc 没有GC,获取的是abc,从对象池中取出来之后,该对象就不存在于对象池中
//fmt.Println(pool.Get()) // gc 之后,获取到的就是 this is default
fmt.Println(pool.Get()) // this is default
fmt.Println(pool.Get()) // this is default

因此,sync.Pool适用的场景:

  • 适用于通过复用,降低复杂对象的创建和GC代价
  • 协程安全,会有锁的开销(创建对象的开销大还是使用锁的开销大,如果创建开销大,则适合使用sync.Pool
  • 生命周期受GC影响,不适合于做连接池等,需自己管理生命周期的资源的池化

线程安全的map

map在多线程并发读写时,会panic

1
2
3
4
5
6
7
8
9
10
11
m := make(map[int]int)
for i := 0; i < 10; i++ {
go func(i int) {
m[i] = i
}(i)
}
for i := 0; i < 10; i++ {
go func(i int) {
fmt.Println(m[i])
}(i)
}
1
fatal error: concurrent map writes

这是由于map底层结构体中有falg标志位,当检测到同时写入会panic

可以通过锁实现并发安全的map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 互斥锁
func main19() {
m := make(map[int]int)
mu := new(sync.Mutex)
for i := 0; i < 10; i++ {
go func(i int) {
mu.Lock()
m[i] = i
mu.Unlock()
}(i)
}
for i := 0; i < 10; i++ {
go func(i int) {
mu.Lock()
_ = m[i]
mu.Unlock()
}(i)
}
}

// 读写锁
func main20() {
m := make(map[int]int)
mu := new(sync.RWMutex)
for i := 0; i < 10; i++ {
go func(i int) {
mu.Lock()
m[i] = i
mu.Unlock()
}(i)
}
for i := 0; i < 10; i++ {
go func(i int) {
mu.RLock()
_ = m[i]
mu.RUnlock()
}(i)
}
}

golang中自带线程安全的锁,sync.Map

1
2
3
4
5
6
7
8
9
type Map struct {
mu Mutex

read atomic.Value // readOnly

dirty map[any]*entry

misses int
}

官方推荐,适用于两种情况:

  1. 当给定的key仅写入一次但多次读取时(如在只增长的缓存中)
  2. 当多个goroutine读取、写入和覆盖不相同的key时

方法:

1
2
3
4
5
6
func (m *Map) Load(key any) (value any, ok bool) {} //返回存储在map中的键的值,如果不存在值,则返回nil。ok结果指示是否在映射中找到值。
func (m *Map) Store(key, value any) {} // 为key存储一个value
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {} // 返回key的现有值(如果存在)。否则,它存储并返回给定值。如果存储了值,则结果为真,如果没有存储,则为假。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {} // 删除键的值,如果有,则返回这个值,如果没有,则返回nil。结果报告key是否存在。
func (m *Map) Delete(key any) {} // 删除一个key和value
func (m *Map) Range(f func(key, value any) bool) {} // 为map中存在的每个键和值顺序调用f。如果f返回false,range将停止迭代。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var myMap *sync.Map = &sync.Map{}
myMap.Store("a", "1") // 存储 a:1
fmt.Println(myMap.Load("a")) // 1 true
fmt.Println(myMap.LoadOrStore("b", "2")) // 2 false
fmt.Println(myMap.LoadOrStore("a", "11")) // 1 true
fmt.Println(myMap.LoadAndDelete("a")) // 1 true
fmt.Println(myMap.LoadAndDelete("c")) // nil false
myMap.Store("b", "2")
myMap.Store("c", "3")
myMap.Store("d", "4")
myMap.Range(func(key, value any) bool {
fmt.Println(key, value) // 打印 b 2,c 3,d 4
return true
})

上下文

多线程并发过程中,可以通过waitgroup控制主协程等待所有子协程,但是无法通知子协程退出。此时可以通过select + channel的方式。但是当有多个子协程嵌套时,这个方式就不方便。Golang提供Context标准库解决这个问题。

Context有两个主要的功能:

  1. 通知子协程退出(正常退出、超时退出等)
  2. 传递必要的参数

创建context

1
2
3
4
func Background() Context {} // 返回Context的interface

实际返回的是 background = new(emptyCtx)
type emptyCtx int

正常退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func quit(ctx context.Context) {
for {
select {
case <-ctx.Done(): // cancel()执行之后,ctx.Done() close
fmt.Println("cancel")
return
default:
fmt.Println("running")
time.Sleep(time.Second)
}
}
}

func main15() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 函数执行完,执行cancel() cancel()可以重复执行
go quit(ctx)
time.Sleep(1 * time.Second)
}

超时退出

1
2
3
4
5
func main16() {
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) // 5s之后,会自动关闭ctx.Done
go quit(ctx)
time.Sleep(10 * time.Second)
}

传递参数

1
func WithValue(parent Context, key any, val any) Context

例如通过ctx传参

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    ctx := context.WithValue(context.Background(), "user", "mitaka")
go quit(ctx)

// 另一个协程中
func quit(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("cancel")
return
default:
fmt.Println("running", ctx.Value("user")) // 通过key获取value
time.Sleep(time.Second)
}
}
}

推荐用法

1
2
3
4
5
6
7
8
9
10
11
12
type favContextKey string
f := func(ctx context.Context, k favContextKey) {
if v := ctx.Value(k); v != nil { // 判断是否有值
fmt.Println("found value:", v)
return
}
fmt.Println("key not found:", k)
}
k := favContextKey("language")
ctx := context.WithValue(context.Background(), k, "Go") // key 是一个自定义类型的变量
f(ctx, k)
f(ctx, favContextKey("color"))

控制最长运行时间,到某个时间节点退出

1
2
d := time.Now().Add(5 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), d)