并发和并行 并发(concurrency
):指的是同一时刻只能有一条指令执行,但是多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果。同一时间点,任务并不会同时执行。
并行(parallel
):同一时刻,有多条指令在多个处理器上同时执行。同一时间点,任务会同时执行。
可见,并行建立在多核的场景。Golang
实现多核并行 非常容易。
1 2 3 4 5 6 7 8 9 10 11 12 cpunum := runtime.NumCPU() // 获取CPU核数 fmt.Printf("CPU核数:%d", cpunum) runtime.GOMAXPROCS(cpunum) // 设置最大并行个数 for i := 0; i < cpunum; i++ { go func(i int) { // 通过 go 关键字开启并行执行 sum := 0 for j := 0; j < 10000; j++ { sum += j } fmt.Printf("CPU核:%d,执行结果%d\n", i, sum) }(i) }
程序和进程 程序:是指安装在操作系统上的一堆二进制指令组成的可执行文件。
进程:当程序执行,加载在内存中,占用系统资源,产生一个进程。
进程状态
初始态
就绪态:等待CPU分配
运行态:占用CPU
挂起态:等待CPU意外的其他资源释放CPU
终止态:结束运行
通过进程并发可能出现的问题:
系统开销大,占用资源多,
在unix、linux系统下,会产生孤儿进程和僵尸进程(正常情况下,子进程是通过父进程fork创建的,子进程再创建新进程,父进程无法预测子进程到什么时候结束,进程完成工作终止之后,父进程需要调用系统调用取得子进程的终止状态)(父进程结束,则子进程成为孤儿进程。)(进程终止,父进程尚未回收,子进程残留资源在内核中,变成僵尸进程。)
创建进程 创建进程的方法可以通过syscall
包的exec
函数(linux
下可使用syscall
)
1 fmt.Println(syscall.Exec("/bin/ls", []string{"l"}, os.Environ()))
还可以通过os/exec
包执行外部命令,以及和外部命令交互。
1 2 3 4 5 6 7 cmd := exec.Command("code" ) err := cmd.Run() out, err := cmd.CombinedOutput() stdoutStderr, err := cmd.Output() if err != nil { fmt.Println(err) }
CombinedOutput
的源码,实际是将标准输出和错误输出都输出出来
1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *Cmd) CombinedOutput() ([]byte , error ) { if c.Stdout != nil { return nil , errors.New("exec: Stdout already set" ) } if c.Stderr != nil { return nil , errors.New("exec: Stderr already set" ) } var b bytes.Buffer c.Stdout = &b c.Stderr = &b err := c.Run() return b.Bytes(), err }
获取标准输出和错误输出
1 2 3 4 5 6 7 8 9 10 11 12 cmd := exec.Command("ping" , "abc" , "-c 4" ) var stdout, stderr bytes.Buffer cmd.Stdout = &stdout cmd.Stderr = &stderr cmd.Run() fmt.Println("stdout: " , stdout.String()) fmt.Println("stderr: " , stderr.String()) cmd := exec.Command("echo" , "abc" ) cmd.Stdout = os.Stdout cmd.Run()
获取命令行输入
1 2 cmd := exec.Command("echo", "a", "b") fmt.Println(cmd.Args)
获取进程信息
1 2 3 4 5 6 7 fmt.Println(cmd.Env) // 获取进程的环境变量 cmd.Env = []string{"def"} // 设置环境变量 fmt.Println(cmd.Process.Pid) // 进程Pid fmt.Println(cmd.Path) // 程序路径 fmt.Println(cmd.ProcessState.String()) // 进程状态 fmt.Println(cmd.ProcessState.Pid()) // 进程Pid cmd.Process.Kill() // 杀死进程
进程执行
1 2 3 4 5 6 7 8 cmd.Run() func (c *Cmd) Run() error { if err := c.Start(); err != nil { return err } return c.Wait() }
通过管道通信 进程和进程之间通信,方式有很多,可以通过管道。
通过管道输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 cmd := exec.Command("echo" , "-n" , `{"Name": "Bob", "Age": 32}` ) stdout, err := cmd.StdoutPipe() if err != nil { log.Fatal(err) } if err := cmd.Start(); err != nil { log.Fatal(err) } var person struct { Name string Age int } if err := json.NewDecoder(stdout).Decode(&person); err != nil { log.Fatal(err) } if err := cmd.Wait(); err != nil { log.Fatal(err) } fmt.Printf("%s is %d years old\n" , person.Name, person.Age)
通过管道输入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 cmd := exec.Command("cat" ) stdin, err := cmd.StdinPipe() if err != nil { log.Fatal(err) } go func () { defer stdin.Close() io.WriteString(stdin, "values written to stdin are passed to cmd's standard input" ) }() out, err := cmd.CombinedOutput() if err != nil { log.Fatal(err) } fmt.Printf("%s\n" , out)
子进程(fork
)和孤儿进程
通过一个进程启动另外一个进程
1 2 3 4 5 // 子进程代码: for { fmt.Println("I am living") time.Sleep(10 * time.Second) }
进程代码
1 2 3 4 5 6 7 8 9 10 11 cmd := exec.Command("./living") err := cmd.Start() if err != nil { log.Fatal(err) } fmt.Println(cmd.Process.Pid) cmd.Stdout = os.Stdout if err != nil { log.Fatal(err) } cmd.Process.Kill() //
os.Kill函数注释
Kill causes the Process to exit immediately. Kill does not wait until the Process has actually exited. This only kills the Process itself, not any other processes it may have started.
主进程执行之后,执行不需要等待结果输出,执行完,退出。通过打印子进程的Pid,可以查看到子进程信息
1 2 # ps aux | grep 6094 6094 0.0 0.0 409202496 3520 s004 S 9:45PM 0:00.01 ./living
此时主进程退出,子进程依然存在,子进程为孤儿进程
结束孤儿进程的方法
1 2 3 4 cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true , } syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
创建子进程的另外一种方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 process, err := os.StartProcess("/bin/ls" , []string {"l" }, &os.ProcAttr{Files: []*os.File{os.Stdin, os.Stdout, os.Stderr}}) if err != nil { log.Fatal(err) } state, err := process.Wait() if err != nil { log.Println(err) os.Exit(3 ) } fmt.Println(state.Pid()) fmt.Println(state.SystemTime()) fmt.Println(state.UserTime()) fmt.Println(state.Success()) fmt.Println(state.ExitCode()) fmt.Println(state.Exited())
创建子进程的第三种方法
1 pid, err := syscall.ForkExec("/bin/ls", []string{"l"}, nil)
通过第三方包实现的方案,可以参考Docker的reexec
参考资料:
docker reexec源码分析
docker reexec源码
进程共享内存 进程之间共享内存,需要进程在同一台机器上。
Linux
下需要通过结合cgo
,通过syscall
系统调用。参考代码:http://www.codebaoku.com/it-go/it-go-61414.html
windows下可以通过syscall直接创建共享内存,参考代码
进程锁 启动进程的时候,需要保证一个节点上同时只有一个进程运行,一般通过文件存放进程id
实现进程锁。(Linux
下,通过syscall.Flock
函数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func LockProcess () { lockFile := "/var/run/xxx.pid" lock, err := os.Create(lockFile) if err != nil { log.Fatal("create lock file error:" , err) } defer os.Remove(lockFile) defer lock.Close() err = syscall.Flock(int (lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) if err != nil { log.Fatal("xxx is running,/var/run/xxx.pid exists" ) os.Exit(1 ) } }
可以保证该程序多次执行,只会有一个进程。另外一个进程执行时,会出现报错
1 xxx is running,/var/run/xxx.pid exists
这个方案也可以用于文件锁,方法是获取文件描述符,syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
实现文件锁。加锁是创建文件锁,解锁时删除文件锁。
守护进程 守护进程是指当一个进程结束,守护进程负责将该进程启动起来。
1 2 3 4 5 6 7 8 9 10 11 12 RESTART: cmd := exec.Command("./process" , "" ) cmd.Stdin = os.Stdin cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() if err != nil { log.Fatal(err) } if cmd.Process == nil || cmd.ProcessState.Exited() { goto RESTART }
后台进程和守护进程的开源实现方式:xdaemon
多进程模型pagent
进程间通信的方法 Linux下进程间通信的方法有两类,一类是传输数据,二类是共享内存:
管道:pipe,负责传送字节流,
消息队列:message queue,传输结构化对象
socket:网络通信
共享内存区
信号
pipe 主进程通过管道接收子进程的输出:
1 2 3 4 5 6 7 cmd := exec.Command("echo" , "-n" , "hello" ) stdoutPipe, _ := cmd.StdoutPipe() cmd.Start() bytes := make ([]byte , 100 ) stdoutPipe.Read(bytes) fmt.Println(string (bytes))
匿名pipe
1 2 3 4 5 6 7 8 9 cmd := exec.Command("echo" , "-n" , "hello" ) buf := bytes.Buffer{} cmd.Stdout = &buf cmd.Start() cmd.Wait() fmt.Println(buf.String())
通过os.pipe
管道通信(以文件为主)
1 2 3 4 5 6 7 8 9 10 11 rpipe, wpipe, err := os.Pipe() if err != nil { log.Fatal(err) } n, _ := wpipe.WriteString("abcdef" ) fmt.Println(n) out := make ([]byte , 10 ) o, _ := rpipe.Read(out) fmt.Println(o) fmt.Println(string (out[:o]))
通过io.pipe
通信(在内存中)
1 2 3 4 5 6 7 8 9 10 11 reader, writer := io.Pipe() go func () { data := []byte ("abcdef" ) n, _ := writer.Write(data) fmt.Println(n) }() read := make ([]byte , 10 ) r, _ := reader.Read(read) fmt.Println(r) fmt.Println(string (read[:r]))
信号 1 2 3 4 5 6 7 8 9 10 11 12 13 sigs := make (chan os.Signal, 1 ) done := make (chan struct {}, 1 ) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func () { fmt.Println("running" ) sig := <-sigs fmt.Println(sig) done <- struct {}{} }() <-done fmt.Println("exiting" )
在进程优雅退出的场景,可以很好的使用到信号通信。
父进程通知子进程结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 var cmdline = ` ping baidu.com ` cmd := exec.Command("bash" , "-c" , cmdline) cmd.Stdout = os.Stdout err := cmd.Start() if err != nil { log.Fatal(err) } go func () { fmt.Println("chlid process running" ) time.Sleep(2 * time.Second) cmd.Process.Signal(os.Interrupt) }() err = cmd.Wait() if err != nil { log.Fatal(err) }
扩展阅读:信号种类
socket 例如网络编程,就是通过socket
通信
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "fmt" "log" "net" ) func main () { listener, err := net.Listen("tcp" , "127.0.0.1:8081" ) if err != nil { log.Fatal(err) } for { fmt.Println("listening..." ) accept, err := listener.Accept() if err != nil { log.Fatal(err) } read := make ([]byte , 1024 ) _, err = accept.Read(read) if err != nil { log.Fatal(err) } fmt.Println("get: " , string (read)) rep := append ([]byte ("reploy" ), read...) _, err = accept.Write(rep) if err != nil { log.Fatal(err) } fmt.Println("send: " , string (rep)) } }
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package mainimport ( "fmt" "log" "net" ) func main () { dial, err := net.Dial("tcp" , "127.0.0.1:8081" ) if err != nil { log.Fatal(err) } w := []byte ("hello" ) _, err = dial.Write(w) if err != nil { log.Fatal(err) } fmt.Println("send: " , string (w)) r := make ([]byte , 1024 ) _, err = dial.Read(r) if err != nil { log.Fatal(err) } fmt.Println("get: " , string (r)) }
扩展阅读:网络编程
线程并发 进程:有独立的地址空间,拥有PCB(进程信息被放在一个叫做进程控制块的数据结构中,也就是PCB(process control block))。是最小分配资源单位,可看成是只有一个线程的进程。
线程:轻量级的进程,本质还是进程,是最小的执行单位。有独立的PCB,但是没有独立的地址空间(共享)。
进程的概念与PCB
goroutine goroutine
:Go
语言中的轻量级线程,由Go
运行时管理,Go
会智能的将goroutine
中的任务合理的分配给每个CPU。
1 2 3 4 5 6 7 8 fmt.Println(runtime.NumCPU()) runtime.GOMAXPROCS(10 ) fmt.Println(runtime.GOOS) fmt.Println(runtime.GOROOT())
通过协程处理函数
由于主goroutine
退出,其他的goroutine
也会退出。可通过sync.Watigroup
将主goroutine
等待其他goroutine
1 2 3 4 5 6 7 wg := new (sync.WaitGroup) wg.Add(1 ) go func (wg *sync.WaitGroup) { wg.Done() }(wg) wg.Wait()
线程同步 作用于注册线程数量,等待线程执行完成
1 2 3 4 wg := sync.WaitGroup{} wg.Add(n) // 注册n个线程,线程数+n wg.Done() // 结束一个线程,线程数-1 wg.Wait() // 等待线程数归0
runtime 运行时,处理golang
运行过程中垃圾回收、协程调度等
出让当前协程
1 2 3 4 5 6 7 8 9 go func() { for { fmt.Println("子协程") } }() for { runtime.Gosched() // 让当前协程让出CPU以让其他线程运行,它不会挂起当前线程,因此当前线程会在未来继续执行 fmt.Println("主协程") }
终止调用它的goroutine
。没有其他goroutine
受到影响。Goexit
在终止goroutine
之前运行所有延迟调用。
1 2 3 4 5 6 7 8 9 10 11 go func() { for { time.Sleep(10 * time.Millisecond) fmt.Println("子协程") } }() for { runtime.Goexit() // 结束当前协程,后续代码不再执行。(defer内的会执行) time.Sleep(10 * time.Millisecond) fmt.Println("主协程") }
channel Go语言的并发模型是CSP
,提倡通过通信共享内存,而不是通过共享内存实现通信。channel
就是实现协程之间相互通信的载体。channel
是一种特殊的类型,通道,遵循先入先出,保证收发顺序,而且是并发安全。
无缓冲
1 2 3 ch := make(chan int) ch <- 1 // 阻塞,无缓冲channel接受数据时,对端没有接受者,此时会出现阻塞
有缓冲
1 2 3 ch1 := make(chan int, 1) len(ch1), cap(ch1) // len:返回channel内部数据个数,cap返回channel容量
关闭channel
对一个关闭的通道再次发送值会导致panic
对一个关闭的通道接收,会一直获取值,直到通道为空
对一个关闭且没有值的通道执行接收操作,会得到响应类型的零值
关闭一个已经关闭的通道会panic
nil channel
关闭出现panic
(panic: close of nil channel
)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ch1 := make (chan int ) fmt.Printf("len: %d,cap: %d\n" , len (ch1), cap (ch1)) go func () { for { i := <-ch1 time.Sleep(100 * time.Millisecond) fmt.Println("get from channel: " , i) } }() for i := 0 ; i < 5 ; i++ { ch1 <- i } close (ch1)time.Sleep(5 * time.Second)
输出
1 2 3 4 5 6 7 8 9 len: 0,cap: 0 get from channel: 0 get from channel: 1 get from channel: 2 get from channel: 3 get from channel: 4 get from channel: 0 get from channel: 0 get from channel: 0
for-range
读取channel
1 2 3 4 5 6 7 8 9 10 11 ch := make (chan int ) go func () { for i := 0 ; i < 10 ; i++ { ch <- i } close (ch) }() for i := range ch { fmt.Println("get from channel: " , i) }
单向通道
1 2 3 chout := make (<-chan int ) chin := make (chan <- int ) chin <- <-chout
作为函数参数,函数声明形参只能接受单向通道,则实参需要是满足方向的channel
1 2 3 4 5 6 func outchannel (ch <-chan int ) {} ch := make (chan int ) outchannel(chout) outchannel(ch) outchannel(chin)
读取时判断是否是零值
1 2 3 4 i, ok := <-ch1 if ok { fmt.Println("get from channel: " , i) }
channel
本质是一个指针,作为函数参数传递时,拷贝指针传递。
channel
源码,在go/src/runtime/chan.go
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 closed uint32 elemtype *_type sendx uint recvx uint recvq waitq sendq waitq lock mutex } type waitq struct { first *sudog last *sudog } type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer acquiretime int64 releasetime int64 ticket uint32 isSelect bool success bool parent *sudog waitlink *sudog waittail *sudog c *hchan }
死锁 当所有协程都处于阻塞状态,就会出现死锁
fatal error: all goroutines are asleep - deadlock!
1 2 3 ch := make (chan string ) ch <- "hello" fmt.Println(<-ch)
select 通过select
可以监听channel
上的数据流动,并且可以同时监听多个channel
。但是只处理一个。
1 2 3 4 5 select { case <-ch1: case <-ch2: default : }
如果所有的channel
阻塞,则会走default
逻辑
如果有channel
能读取数据,则读取该channel
中的数据
如果channel close
,则会一直读取该channel
的数据
空select
会阻塞当前goroutine
select
存在多个满足的case
,则随机选择一个
select
只存在一个case
,则阻塞住该case
超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ch := make (chan int ) go func () {GOTO: for { select { case n := <-ch: fmt.Println("get from ch" , n) case <-time.After(3 * time.Second): fmt.Println("timeout" ) break GOTO } } fmt.Println("go routine done" ) }() ch <- 1 time.Sleep(5 * time.Second) fmt.Println("main done" )
计时器,处理一个时间
1 2 3 4 5 6 7 8 9 10 11 t := time.NewTimer(3 * time.Second) go func () { <-t.C fmt.Println("Timer over" ) }() time.Sleep(5 * time.Second) if t.Stop() { fmt.Println("time stop " ) }
定时器,定时处理一个时间
1 2 3 4 5 6 7 8 9 10 t := time.NewTicker(1 * time.Second) go func () { for { select { case <-t.C: fmt.Println("time pass" , time.Now()) } } }() time.Sleep(5 * time.Second)
延时回调
1 2 3 4 5 6 7 exit := make (chan int ) fmt.Println("start" ) time.AfterFunc(3 *time.Second, func () { fmt.Println("boom" ) exit <- 0 }) <-exit
线程安全 当多个线程共同访问一个资源,对资源同时进行读写,就会出现资源竞争,导致数据产生错误的结果。
解决资源竞争的常用方式是锁。
Go
的sync
包提供了互斥锁 和读写锁 ,用于处理并发过程中可能出现同时两个或多个协程读写同一个变量的情况。
互斥锁 互斥锁只能同时被一个goroutine
锁定,其他goroutine
将阻塞直到互斥锁被解锁(重新争抢对互斥锁的锁定)。
源码
1 2 3 4 type Mutex struct { state int32 sema uint32 }
互斥锁是互斥锁。互斥锁的零值是未锁定的互斥锁。
互斥锁首次使用后不得复制。因此一般初始化指针变量。
锁定的互斥锁与特定的goroutine
不关联。一个goroutine
可以重新锁定(锁定)一个RWMutex
,然后安排另一个goroutine
运行锁定(解锁)它。
方法
1 2 3 func (m *Mutex) Lock() {} // 加锁,如果已经加了锁,则会阻塞 func (m *Mutex) TryLock() bool {} // TryLock尝试锁定m并报告它是否成功。很少会用到。 func (m *Mutex) Unlock() { // 解锁,如果已经解锁,再解锁会panic fatal error: sync: unlock of unlocked mutex
例如:
1 2 3 4 5 6 7 8 9 10 mu := sync.Mutex{} for i := 0 ; i < 10 ; i++ { go func (i int ) { mu.Lock() defer mu.Unlock() fmt.Printf("%d get lock\n" , i) time.Sleep(100 * time.Millisecond) fmt.Printf("%d release lock\n" , i) }(i) }
输出
1 2 3 4 5 6 7 8 9 10 1 get lock 1 release lock 4 get lock 4 release lock 2 get lock 2 release lock 3 get lock 3 release lock 0 get lock 0 release lock
可以看到,只有获得锁的goroutine
释放锁,其他goroutine
才能获得锁。
读写锁
同时只有一个goroutine
能够获得写锁定;
同时可以有任意多个goroutine
获得读锁定;
同时只能存在写锁定和读锁定(读和写互斥);
源码:
1 2 3 4 5 6 7 type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers }
RWMutex
的零值是未锁定的mutex
。
首次使用后不得复制RWMutex
。因此一般初始化为指针变量。
方法
1 2 3 4 5 6 7 func (rw *RWMutex) RLock() {} // 加读锁 func (rw *RWMutex) TryRLock() bool {} // 尝试加读锁,并且返回操作结果 func (rw *RWMutex) RUnlock() {} // 解读锁,如果解锁时没有读锁,会panic func (rw *RWMutex) Lock() {} // 加写锁 func (rw *RWMutex) TryLock() bool {} // 尝试加写锁,并且返回操作结果 func (rw *RWMutex) Unlock() {} // 解写锁,如果没有写锁,调用时会panic func (rw *RWMutex) RLocker() Locker {} // 返回一个Locker接口,可以调用Lock和Unlock方法
用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 rw := sync.RWMutex{} for i := 0 ; i < 10 ; i++ { go func (i int ) { rw.Lock() defer rw.Unlock() fmt.Printf("%d get write lock\n" , i) time.Sleep(100 * time.Millisecond) fmt.Printf("%d release write lock\n" , i) }(i) } for i := 0 ; i < 10 ; i++ { go func (i int ) { rw.RLock() defer rw.RUnlock() fmt.Printf("%d get read lock\n" , i) time.Sleep(100 * time.Millisecond) fmt.Printf("%d release read lock\n" , i) }(i) }
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 0 get write lock 0 release write lock 2 get read lock 1 get read lock 3 get read lock 4 get read lock 0 get read lock 0 release read lock 4 release read lock 1 release read lock 2 release read lock 3 release read lock 1 get write lock 1 release write lock 3 get write lock 3 release write lock 4 get write lock 4 release write lock 2 get write lock 2 release write lock
可以看到,当一个goroutine
加上写锁,则既无法再加写锁或者读锁。当写锁释放之后,可以加任意多的读锁,读写锁不能同时加。
读写锁相比,在读很多,写很少的场景,效率比互斥锁高。
锁通过阻塞线程解决数据竞态问题
已经加锁,再加锁会进入阻塞状态
已经解锁,再解锁,会出现panic,无论是互斥锁还是读写锁
原子操作 原子操作指操作具有原子性,该操作不能被多个goroutine
分割,而且相比加锁操作,加锁会涉及上下文切换,原子操作性能更好。原子操作由内置的sync/atomic
包提供。
1 2 3 4 5 6 7 8 9 10 11 var sum int64 = 0 wg := sync.WaitGroup{} for i := 0 ; i < 1000 ; i++ { wg.Add(1 ) go func () { atomic.AddInt64(&(sum), int64 (1 )) wg.Done() }() } wg.Wait() fmt.Println(sum)
atomic
包中的方法可以分为以下几类:
1 2 3 4 5 func StoreInt64(addr *int64, val int64) // 将val写入addr指向的内存中 func LoadInt64(addr *int64) (val int64) // 从addr的值中读取 func AddInt64(addr *int64, delta int64) (new int64) // 增加,返回增加后的值 SwapInt64(addr *int64, new int64) (old int64) // 将addr指向的值交换成新的值,返回老值 CompareAndSwapInt64(addr *int64, old int64, new int64) (swapped bool) // 比较addr指向的值和old是否相等,如果相等,则替换成new值,并且返回true
Once 保证指定函数代码只执行一次,类似于单例模式,常用语应用启动时的一些全局初始化操作。
例如通过全局变量实现单例模式
1 2 3 4 5 6 7 8 9 10 11 12 type Tool struct { values int } var instance *Toolfunc NewInstance () *Tool { if instance == nil { instance = new (Tool) } return instance }
但是在多线程下,可能会多次申请内存,所以需要加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type Tool struct { values int } var instance *Toolvar mu *sync.Mutex = new (sync.Mutex)func NewInstance () *Tool { mu.Lock() defer mu.Unlock() if instance == nil { instance = new (Tool) } return instance }
但是加锁和解锁会消耗性能
那么可以转换思路,通过init()
函数初始化,其实也是利用了init
函数只会执行一次的特点。确定是程序启动时就创建在内存中,无法通过程序判断是否需要使用和初始化。
优化加锁的方式,双重判断,在第一次创建的时候加锁,后续不加锁。
1 2 3 4 5 6 7 8 9 10 func NewInstance() *Tool { if instance == nil { mu.Lock() if instance == nil { instance = new(Tool) } defer mu.Unlock() } return instance }
或者使用sync.Once的方式。源码
1 2 3 4 type Once struct { done uint32 // 标记是否已经执行 m Mutex }
使用
1 2 3 4 5 6 once := sync.Once{} for i := 0 ; i < 1000 ; i++ { once.Do(func () { instance = new (Tool) }) }
条件变量 当channel
的消费者比生产者多,可以在对应的共享数据状态发生变化时,通知阻塞在某个条件上的写成。
例如当channel
满了,需要通知生产者不生产,当channel
空了,需要通知消费者不消费。
sys.Cond
类型代表条件变量,条件变量要与锁一起使用。
当然,如果使用golang
的channel
,会自动阻塞,就不需要使用条件变量。这种场景比较适合在限流或者不会自动阻塞的生产者消费者,需要不停的确认状态的情况。
源码
1 2 3 4 5 6 7 8 9 type Cond struct { noCopy noCopy // L is held while observing or changing the condition L Locker notify notifyList checker copyChecker }
常用方法有三个:Wait
Signal
Broadcast
Wait:阻塞等待条件变量满足;释放已经加上的互斥锁(前两这两步相当于一个原子操作);当被唤醒,Wait函数返回时,解除阻塞并重新获取互斥锁。
Signal:单发通知,给一个正等待在该条件变量上的线程发送通知
Broadcast:广播通知,给所有正等待在该条件变量上的线程发送通知
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 var cond sync.Condfunc producer (out chan <- int , index int ) { for { cond.L.Lock() for len (out) == 10 { fmt.Printf("%d full.\n" , index) cond.Wait() } num := rand.Intn(1000 ) out <- num fmt.Printf("producer %d send %d to channel\n" , index, num) time.Sleep(time.Second) cond.L.Unlock() cond.Signal() } } func consumer (in <-chan int , index int ) { for { cond.L.Lock() for len (in) == 0 { fmt.Printf("%d empty.\n" , index) cond.Wait() } num := <-in fmt.Printf("consumer %d get %d from channel\n" , index, num) time.Sleep(time.Second) cond.L.Unlock() cond.Signal() } } func main () { ch := make (chan int , 10 ) rand.Seed(time.Now().UnixNano()) cond.L = new (sync.Mutex) for i := 0 ; i < 10 ; i++ { go producer(ch, i) } for i := 0 ; i < 1 ; i++ { go consumer(ch, i) } select {} }
临时对象池 当对象过多,频繁GC
,会对性能产生影响,此时如果对象可以重复利用,则会大大减少GC
的资源消耗。
sync.Pool
就是一个临时对象池,可以用来临时存储对象,下次使用从对象池获取,避免重复创建对象。(池化的使用场景例如数据库链接、网络连接,但是这两种场景并不适合使用sync.Pool
)。当然,GC
会清除sync.Pool
缓存的对象,对象的缓存有效期为下一次GC
之前。
获取对象
尝试从私有对象获取
私有对象不存在,尝试从当前Processor
的共享池获取
如果当前Processor
共享池也是空的,那么尝试去其他Processor
的共享池获取
如果所有池都是空的,最后就用用户指定的New
函数产生一个新的对象返回
其中,私有对象池是协程安全的,Processor
访问不需要加锁。共享池是线程不安全的,Processor
访问需要加锁。
放回对象
如果私有对象不存在,则保存为私有对象
如果私有对象存在,放入当前Processor子池的共享池中
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Pool struct { noCopy noCopy local unsafe.Pointer localSize uintptr victim unsafe.Pointer victimSize uintptr New func () any }
方法:
1 2 func (p *Pool) Put(x any) // 放入对象池 func (p *Pool) Get() any // 从对象池取
例如
1 2 3 4 5 6 7 8 9 10 pool := sync.Pool{} pool.New = func () any { return "this is default" } pool.Put("abc" ) fmt.Println(pool.Get()) fmt.Println(pool.Get()) fmt.Println(pool.Get())
因此,sync.Pool
适用的场景:
适用于通过复用,降低复杂对象的创建和GC
代价
协程安全,会有锁的开销(创建对象的开销大还是使用锁的开销大,如果创建开销大,则适合使用sync.Pool
)
生命周期受GC
影响,不适合于做连接池等,需自己管理生命周期的资源的池化
线程安全的map map在多线程并发读写时,会panic
1 2 3 4 5 6 7 8 9 10 11 m := make (map [int ]int ) for i := 0 ; i < 10 ; i++ { go func (i int ) { m[i] = i }(i) } for i := 0 ; i < 10 ; i++ { go func (i int ) { fmt.Println(m[i]) }(i) }
1 fatal error: concurrent map writes
这是由于map
底层结构体中有falg
标志位,当检测到同时写入会panic
可以通过锁实现并发安全的map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func main19 () { m := make (map [int ]int ) mu := new (sync.Mutex) for i := 0 ; i < 10 ; i++ { go func (i int ) { mu.Lock() m[i] = i mu.Unlock() }(i) } for i := 0 ; i < 10 ; i++ { go func (i int ) { mu.Lock() _ = m[i] mu.Unlock() }(i) } } func main20 () { m := make (map [int ]int ) mu := new (sync.RWMutex) for i := 0 ; i < 10 ; i++ { go func (i int ) { mu.Lock() m[i] = i mu.Unlock() }(i) } for i := 0 ; i < 10 ; i++ { go func (i int ) { mu.RLock() _ = m[i] mu.RUnlock() }(i) } }
golang中自带线程安全的锁,sync.Map
1 2 3 4 5 6 7 8 9 type Map struct { mu Mutex read atomic.Value dirty map [any]*entry misses int }
官方推荐,适用于两种情况:
当给定的key仅写入一次但多次读取时(如在只增长的缓存中)
当多个goroutine读取、写入和覆盖不相同的key时
方法:
1 2 3 4 5 6 func (m *Map) Load(key any) (value any, ok bool) {} //返回存储在map中的键的值,如果不存在值,则返回nil。ok结果指示是否在映射中找到值。 func (m *Map) Store(key, value any) {} // 为key存储一个value func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {} // 返回key的现有值(如果存在)。否则,它存储并返回给定值。如果存储了值,则结果为真,如果没有存储,则为假。 func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {} // 删除键的值,如果有,则返回这个值,如果没有,则返回nil。结果报告key是否存在。 func (m *Map) Delete(key any) {} // 删除一个key和value func (m *Map) Range(f func(key, value any) bool) {} // 为map中存在的每个键和值顺序调用f。如果f返回false,range将停止迭代。
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 var myMap *sync.Map = &sync.Map{}myMap.Store("a" , "1" ) fmt.Println(myMap.Load("a" )) fmt.Println(myMap.LoadOrStore("b" , "2" )) fmt.Println(myMap.LoadOrStore("a" , "11" )) fmt.Println(myMap.LoadAndDelete("a" )) fmt.Println(myMap.LoadAndDelete("c" )) myMap.Store("b" , "2" ) myMap.Store("c" , "3" ) myMap.Store("d" , "4" ) myMap.Range(func (key, value any) bool { fmt.Println(key, value) return true })
上下文 多线程并发过程中,可以通过waitgroup
控制主协程等待所有子协程,但是无法通知子协程退出。此时可以通过select + channel
的方式。但是当有多个子协程嵌套时,这个方式就不方便。Golang
提供Context
标准库解决这个问题。
Context有两个主要的功能:
通知子协程退出(正常退出、超时退出等)
传递必要的参数
创建context
1 2 3 4 func Background() Context {} // 返回Context的interface 实际返回的是 background = new(emptyCtx) type emptyCtx int
正常退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func quit (ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("cancel" ) return default : fmt.Println("running" ) time.Sleep(time.Second) } } } func main15 () { ctx, cancel := context.WithCancel(context.Background()) defer cancel() go quit(ctx) time.Sleep(1 * time.Second) }
超时退出
1 2 3 4 5 func main16 () { ctx, _ := context.WithTimeout(context.Background(), 5 *time.Second) go quit(ctx) time.Sleep(10 * time.Second) }
传递参数
1 func WithValue(parent Context, key any, val any) Context
例如通过ctx
传参
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ctx := context.WithValue(context.Background(), "user" , "mitaka" ) go quit(ctx) func quit (ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("cancel" ) return default : fmt.Println("running" , ctx.Value("user" )) time.Sleep(time.Second) } } }
推荐用法
1 2 3 4 5 6 7 8 9 10 11 12 type favContextKey string f := func (ctx context.Context, k favContextKey) { if v := ctx.Value(k); v != nil { fmt.Println("found value:" , v) return } fmt.Println("key not found:" , k) } k := favContextKey("language" ) ctx := context.WithValue(context.Background(), k, "Go" ) f(ctx, k) f(ctx, favContextKey("color" ))
控制最长运行时间,到某个时间节点退出
1 2 d := time.Now().Add(5 * time.Second) ctx, cancel := context.WithDeadline(context.Background(), d)