并发编程在编程中的一些使用场景

​ Golang并发编程在实际使用过程中,多用于多任务处理场景,这里简单列举几个,增加对并发编程的理解。

  • 可通过取消机制,取消异步任务的处理,并且确保协程中的数据不丢失
  • 资源池的使用
  • 生产者消费者模型,确保消费者的任务都消费完成

调度后台处理任务

一个Worker,在后天执行Job。

Worker属性有超时控制。Worker通过channel判断服务运行执行状态。

例如以下例子中,如果有一个tracker异步处理一些http请求的信息,异步处理,做记录。同时,当主服务需要停止时,要确保协程中的数据不会丢失,需要等待协程正常退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"context"
"fmt"
"log"
"math/rand"
"time"
)

type Tracker struct {
data chan string
stop chan struct{}
}

func NewTracker() *Tracker {
return &Tracker{
data: make(chan string),
stop: make(chan struct{}),
}
}

func (t *Tracker) Run() { // 异步任务处理
for i := range t.data {
time.Sleep(10 * time.Millisecond)
log.Println("tracker got: ", i)
}
t.stop <- struct{}{}
}

func (t *Tracker) Shutdown(ctx context.Context) {
close(t.data) // 需要注意,这里一定需要先关闭外部的请求,不然往一个已经关闭的channel发送信息会panic
select {
case <-ctx.Done():
fmt.Println("shutdown timeout")
case <-t.stop:
fmt.Println("all data handled")
return
}
}

func (t *Tracker) Event(ctx context.Context, data string) error {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
select {
case <-ctx.Done():
return ctx.Err()
case t.data <- data:
return nil
}
}

func main() {
rand.Seed(time.Now().UnixNano())
// 注册全局tracker
tr := NewTracker()
go tr.Run()
// 模拟请求发送过来的之后,往tr中注册消息
_ = tr.Event(context.Background(), "event1")
_ = tr.Event(context.Background(), "event2")
_ = tr.Event(context.Background(), "event3")
_ = tr.Event(context.Background(), "event4")
// 模拟关闭过程
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
tr.Shutdown(ctx)
}

通过缓冲通道实现共享资源池

例如数据库连接,在没有连接时需要创建一个新的连接,如果资源池中有连接,则直接使用。使用完之后,将链接放到资源池中。这里不能使用sync.Pool,这是由于sync.PoolGC,如果是一个链接,GC之后,对象会被回收,变成nil对象。sync.Pool适合存储一些非复杂结构的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
"fmt"
"io"
)

type Pool struct { // 需要根据实际情况决定是否加互斥锁,或者是读写锁
ch chan io.Closer
fn func() (io.Closer, error)
close bool
}

func NewPool(size int, fn func() (io.Closer, error)) *Pool {
return &Pool{
ch: make(chan io.Closer, size),
fn: fn,
}
}

func (p *Pool) Get() (io.Closer, error) { // 并发下获取资源,如果有前提条件,需要有并发安全考虑的情况下,需要加锁。例如这里是并发安全的。
select {
case resource := <-p.ch:
fmt.Println("use old resource")
return resource, nil
default:
fmt.Println("create new resource")
return p.fn()
}
}

func (p *Pool) Put(resource io.Closer) { // 在放入资源池的时候,如果需要操作额外资源,也需要注意并发安全
if p.close { // 这里用锁,检查状态会更好一点,防止并发操作关闭
resource.Close()
return
}

select {
case p.ch <- resource:
fmt.Println("resource back")
default:
fmt.Println("resource drop")
resource.Close()
}
}

func (p *Pool) Close() {
if p.close { // 同理,这里用锁检查状态,也会更好一点。可以减去后续的逻辑判断。
return
}
p.close = true
close(p.ch)
for i := range p.ch {
i.Close()
}
}

通过无缓冲通道创建协程池

这是一种任务分发机制,将无缓冲通道作为任务分发通道,消费者获取任务,初始化时传入的size则是消费者个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"sync"
)

type Worker interface {
Work()
}

type Pool struct {
ch chan Worker
wg sync.WaitGroup
}

func NewPool(size int) *Pool {
p := new(Pool)
p.ch = make(chan Worker)
p.wg.Add(size)
for i := 0; i < size; i++ {
go func() {
defer p.wg.Done() // 通过done确保所有的消费者都已经没有任务了
for w := range p.ch {
w.Work()
}
}()
}
return p
}

func (p *Pool) Run(w Worker) { // 传入任务,生产者
p.ch <- w // 这里可以优化一下,增加一个是否关闭的状态,如果队列关闭,则返回错误。如果加上队列是否关闭的状态,则最好也加上互斥锁。
}

func (p *Pool) Done() { // done关闭队列,等待所有消费者处理完任务
close(p.ch) // 之类需要注意,如果队列关闭,则不能再发送任务
p.wg.Wait()
}

调用者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"time"
)

type work struct {
id int
}

func newWork(id int) *work {
return &work{id: id}
}

func (w work) Work() {
time.Sleep(time.Second)
fmt.Println("worker doing", w.id)
}

func main() {
pool := NewPool(2)
for i := 0; i < 5; i++ {
w := newWork(i)
pool.Run(w)
}
pool.Done()
}