Golang并发编程在实际使用过程中,多用于多任务处理场景,这里简单列举几个,增加对并发编程的理解。
可通过取消机制,取消异步任务的处理,并且确保协程中的数据不丢失
资源池的使用
生产者消费者模型,确保消费者的任务都消费完成
调度后台处理任务 一个Worker,在后天执行Job。
Worker属性有超时控制。Worker通过channel
判断服务运行执行状态。
例如以下例子中,如果有一个tracker
异步处理一些http
请求的信息,异步处理,做记录。同时,当主服务需要停止时,要确保协程中的数据不会丢失,需要等待协程正常退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package mainimport ( "context" "fmt" "log" "math/rand" "time" ) type Tracker struct { data chan string stop chan struct {} } func NewTracker () *Tracker { return &Tracker{ data: make (chan string ), stop: make (chan struct {}), } } func (t *Tracker) Run() { for i := range t.data { time.Sleep(10 * time.Millisecond) log.Println("tracker got: " , i) } t.stop <- struct {}{} } func (t *Tracker) Shutdown(ctx context.Context) { close (t.data) select { case <-ctx.Done(): fmt.Println("shutdown timeout" ) case <-t.stop: fmt.Println("all data handled" ) return } } func (t *Tracker) Event(ctx context.Context, data string ) error { time.Sleep(time.Millisecond * time.Duration(rand.Intn(100 ))) select { case <-ctx.Done(): return ctx.Err() case t.data <- data: return nil } } func main () { rand.Seed(time.Now().UnixNano()) tr := NewTracker() go tr.Run() _ = tr.Event(context.Background(), "event1" ) _ = tr.Event(context.Background(), "event2" ) _ = tr.Event(context.Background(), "event3" ) _ = tr.Event(context.Background(), "event4" ) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5 ) defer cancel() tr.Shutdown(ctx) }
通过缓冲通道实现共享资源池 例如数据库连接,在没有连接时需要创建一个新的连接,如果资源池中有连接,则直接使用。使用完之后,将链接放到资源池中。这里不能使用sync.Pool
,这是由于sync.Pool
会GC
,如果是一个链接,GC
之后,对象会被回收,变成nil
对象。sync.Pool
适合存储一些非复杂结构的对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "fmt" "io" ) type Pool struct { ch chan io.Closer fn func () (io.Closer, error ) close bool } func NewPool (size int , fn func () (io.Closer, error )) *Pool { return &Pool{ ch: make (chan io.Closer, size), fn: fn, } } func (p *Pool) Get() (io.Closer, error ) { select { case resource := <-p.ch: fmt.Println("use old resource" ) return resource, nil default : fmt.Println("create new resource" ) return p.fn() } } func (p *Pool) Put(resource io.Closer) { if p.close { resource.Close() return } select { case p.ch <- resource: fmt.Println("resource back" ) default : fmt.Println("resource drop" ) resource.Close() } } func (p *Pool) Close() { if p.close { return } p.close = true close (p.ch) for i := range p.ch { i.Close() } }
通过无缓冲通道创建协程池 这是一种任务分发机制,将无缓冲通道作为任务分发通道,消费者获取任务,初始化时传入的size
则是消费者个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "sync" ) type Worker interface { Work() } type Pool struct { ch chan Worker wg sync.WaitGroup } func NewPool (size int ) *Pool { p := new (Pool) p.ch = make (chan Worker) p.wg.Add(size) for i := 0 ; i < size; i++ { go func () { defer p.wg.Done() for w := range p.ch { w.Work() } }() } return p } func (p *Pool) Run(w Worker) { p.ch <- w } func (p *Pool) Done() { close (p.ch) p.wg.Wait() }
调用者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package mainimport ( "fmt" "time" ) type work struct { id int } func newWork (id int ) *work { return &work{id: id} } func (w work) Work() { time.Sleep(time.Second) fmt.Println("worker doing" , w.id) } func main () { pool := NewPool(2 ) for i := 0 ; i < 5 ; i++ { w := newWork(i) pool.Run(w) } pool.Done() }