我先解释一下缓存(cache)和池(pool)的区别,以及为什么这个区别对讨论很重要。Brad Fizpatrick 建议的类型实际上是一种池:一组可以互换的值,取出时并不关心具体的值是什么,因为每个值都是刚被初始化的状态,值是相同的。你甚至分不出来刚刚拿到的值是从池里取出来的,还是新创建的。另一方面,缓存是一些相呼映射的键和值。一个明显的例子是磁盘缓存。磁盘缓存将慢速存储中的文件缓存在系统主内存里,以便提高访问速度。如果缓存里有对应键 A 和 B 的值(磁盘缓存的例子里,就是文件名),而你请求了与 A 对应的值,你显然不想得到 B 所对应的值。实际上,缓存里的值是互不相同的,增加了缓存清除机制的复杂性,就是说到底哪个值应该被清除出缓存。维基百科上关于缓存算法的页面,列举了 13 种不同的清除缓存的算法,从著名的 LRU 缓存到更复杂的比如LIRS 缓存算法。
在经历了漫长的讨论后,Russ Cox 最终提议的 API 和回收策略非常简单:在垃圾收集时回收池空间。这个建议提醒我们,类型Pool的目的是在垃圾收集之间重用内存。它不应该避免垃圾回收,而是让垃圾回收变得更有效。
// AcquireContext returns an empty `Context` instance from the pool. // You must return the context by calling `ReleaseContext()`. // 获取context,从pool中拿到context,调用之后,需要ReleaseContext func(e *Echo) AcquireContext() Context { return e.pool.Get().(Context) }
// ReleaseContext returns the `Context` instance back to the pool. // You must call it after `AcquireContext()`. // 释放context,将context放到pool中 func(e *Echo) ReleaseContext(c Context) { e.pool.Put(c) }
// ServeHTTP implements `http.Handler` interface, which serves HTTP requests. func(e *Echo) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Acquire context // 处理请求 c := e.pool.Get().(*context) c.Reset(r, w) var h HandlerFunc
if e.premiddleware == nil { e.findRouter(r.Host).Find(r.Method, GetPath(r), c) h = c.Handler() h = applyMiddleware(h, e.middleware...) } else { h = func(c Context)error { e.findRouter(r.Host).Find(r.Method, GetPath(r), c) h := c.Handler() h = applyMiddleware(h, e.middleware...) return h(c) } h = applyMiddleware(h, e.premiddleware...) }
// Execute chain if err := h(c); err != nil { e.HTTPErrorHandler(err, c) }
funcmain() { p := &sync.Pool{} // 不指定new,则会返回nil a := p.Get() if a == nil { a = func()interface{} { return0 }() } p.Put(1) b := p.Get().(int) fmt.Println(a, b) } // 0 1
// A Pool is a set of temporary objects that may be individually saved and // retrieved. // // Any item stored in the Pool may be removed automatically at any time without // notification. If the Pool holds the only reference when this happens, the // item might be deallocated. // // A Pool is safe for use by multiple goroutines simultaneously. // // Pool's purpose is to cache allocated but unused items for later reuse, // relieving pressure on the garbage collector. That is, it makes it easy to // build efficient, thread-safe free lists. However, it is not suitable for all // free lists. // // An appropriate use of a Pool is to manage a group of temporary items // silently shared among and potentially reused by concurrent independent // clients of a package. Pool provides a way to amortize allocation overhead // across many clients. // // An example of good use of a Pool is in the fmt package, which maintains a // dynamically-sized store of temporary output buffers. The store scales under // load (when many goroutines are actively printing) and shrinks when // quiescent. // // On the other hand, a free list maintained as part of a short-lived object is // not a suitable use for a Pool, since the overhead does not amortize well in // that scenario. It is more efficient to have such objects implement their own // free list. // // A Pool must not be copied after first use. // Pool是一组可以单独保存和检索的临时对象。 // 存储在Pool中的任何对象可随时自动删除,无需通知。如果发生这种情况时Pool中只有一个引用,则该项可能会被释放。 // 一个Pool可以安全地同时被多个goroutine使用。 // Pool的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由列表变得容易。然而,它并不适用于所有空闲列表。 // Pool的一个适当用途是管理一组临时对象,这些临时对象在包的并发独立客户端之间默默共享,并可能被其重用。Pool提供了一种在许多client上分摊分配开销的方法。 // 一个很好地使用池的例子是fmt包,它维护了临时输出缓冲区的动态大小存储。存储在有负载时扩容(当许多goroutine正在打印时),在静止时缩小。 // 另一方面,作为短周期对象的一部分维护的空闲列表不适合用于Pool,因为在这种情况下开销不会很好地摊销。让这样的对象实现自己的空闲列表更有效。 // 首次使用后不得复制池。 type Pool struct { noCopy noCopy
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal 每个P的Pool的本地固定大小,实际类型为[P]poolLocal localSize uintptr// size of the local array 本地poolLocal链表的大小
victim unsafe.Pointer // local from previous cycle 上一次GC的该P Pool中的数据 victimSize uintptr// size of victims array 上一次GC的该P Pool中数据的个数
// New optionally specifies a function to generate // a value when Get would otherwise return nil. 如果没有设置New函数,则会返回nil // It may not be changed concurrently with calls to Get. // 它不能与Get调用同时更改。 New func() any }
// Local per-P Pool appendix. 本地per-P池附录。 type poolLocalInternal struct { private any // Can be used only by the respective P. shared poolChain // Local P can pushHead/popHead; any P can popTail. }
type poolLocal struct { poolLocalInternal
// Prevents false sharing on widespread platforms with // 128 mod (cache line size) = 0 . pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte }
// poolChain is a dynamically-sized version of poolDequeue. // // This is implemented as a doubly-linked list queue of poolDequeues // where each dequeue is double the size of the previous one. Once a // dequeue fills up, this allocates a new one and only ever pushes to // the latest dequeue. Pops happen from the other end of the list and // once a dequeue is exhausted, it gets removed from the list. type poolChain struct { // head is the poolDequeue to push to. This is only accessed // by the producer, so doesn't need to be synchronized. head *poolChainElt
// tail is the poolDequeue to popTail from. This is accessed // by consumers, so reads and writes must be atomic. tail *poolChainElt }
type poolChainElt struct { poolDequeue
// next and prev link to the adjacent poolChainElts in this // poolChain. // // next is written atomically by the producer and read // atomically by the consumer. It only transitions from nil to // non-nil. // // prev is written atomically by the consumer and read // atomically by the producer. It only transitions from // non-nil to nil. next, prev *poolChainElt }
funcpoolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // 在GC开始时,当STW时调用此函数。 // 这种时候,不能分配内存,也不会调用任何runtime的函数。
// Because the world is stopped, no pool user can be in a // pinned section (in effect, this has all Ps pinned). // 因为STW,所以Pool的使用者都不会在固定的P上(这个时候P是固定的,但是P队列中的G在STW时可能会被移动到其他的P上)
// Drop victim caches from all pools. // 从所有Pool中删除次级缓存 for _, p := range oldPools { p.victim = nil p.victimSize = 0 }
// Move primary cache to victim cache. // 将主缓存移动到次级缓存 // oldPools是可能具有非空二级缓存的Pool集合,其实是个切片。受STW保护。 for _, p := range allPools { p.victim = p.local p.victimSize = p.localSize p.local = nil p.localSize = 0 }
// The pools with non-empty primary caches now have non-empty // victim caches and no pools have primary caches. // 具有非空主缓存的Pool拥有非空的次级缓存,并且所有的Pool没有主缓存 oldPools, allPools = allPools, nil }
// Put adds x to the pool. // 将x放到池子中 func(p *Pool) Put(x any) { if x == nil { return } if race.Enabled { // 当打开数据竞态,有四分之一的概率,将数据直接丢弃 if fastrandn(4) == 0 { // Randomly drop x on floor. return } // 如果数据没有丢,通过散列处理x数据的指针,避免多线程同步修改 race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } // 获取当前p的本地缓存,如果本地缓存为空,说明缓存中没有使用,将x的值给到缓存,并且清掉x的值 l, _ := p.pin() if l.private == nil { l.private = x x = nil } // 如果本地缓存不为空,说明缓存中已经保存了对象,将x推送到共享缓存的头部 if x != nil { l.shared.pushHead(x) } // 运行时取消G到P的固定 runtime_procUnpin() if race.Enabled { race.Enable() } }
// Get selects an arbitrary item from the Pool, removes it from the // Pool, and returns it to the caller. // Get may choose to ignore the pool and treat it as empty. // Callers should not assume any relation between values passed to Put and // the values returned by Get. // // If Get would otherwise return nil and p.New is non-nil, Get returns // the result of calling p.New. // Get从Pool中拿到任意一个对象,将其从Pool中删除,并将其返回给调用者。 // Get可能会忽略Pool中已经存在的对象,返回一个nil。调用方不应假定传递给Put的值与Get返回的值之间存在任何关系。 // // 如果Get返回nil,而p.New为非nil,则Get返回调用p.New的结果。 func(p *Pool) Get() any { if race.Enabled { race.Disable() } // 获取当前线程的pid,将当前的goroutine固定到P,禁用抢占 // 此时可以优先从P的本地缓存中获取Pool对象 l, pid := p.pin() // 判断P的本地缓存中是否有值 x := l.private l.private = nil // 如果P本地缓存中的变量为nil if x == nil { // Try to pop the head of the local shard. We prefer // the head over the tail for temporal locality of // reuse. // 如果P本地缓存中的变量是空的,从共享缓存的头取出一个 x, _ = l.shared.popHead() // 如果还是为空,说明本地缓存和共享缓存中都没有值 if x == nil { // 从其他线程缓存列表中取 x = p.getSlow(pid) } } // 将当前的goroutine和P解除固定 runtime_procUnpin() if race.Enabled { race.Enable() if x != nil { // 如果竞态检测打开,并且x不为nil,则将x的地址进行静态检测 race.Acquire(poolRaceAddr(x)) } } // 如果x为空,并且此时本地缓存、共享缓存、其他P缓存中都没有值,而且p.New返回值不为空,则返回p.New() if x == nil && p.New != nil { x = p.New() } return x }
func(p *Pool) getSlow(pid int) any { // See the comment in pin regarding ordering of the loads. size := runtime_LoadAcquintptr(&p.localSize) // load-acquire locals := p.local // load-consume // Try to steal one element from other procs. // 尝试从其他P中窃取一个元素。 for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i+1)%int(size)) // 从其他P的共享缓存队列尾部获取一个对象 if x, _ := l.shared.popTail(); x != nil { // 获取到不为nil,就返回 return x } }
// Try the victim cache. We do this after attempting to steal // from all primary caches because we want objects in the // victim cache to age out if at all possible. // 尝试在二级缓存中获取。 // 在尝试从所有主缓存(本地缓存,共享缓存,其他P的缓存)中获取数据后执行此操作,因为二级缓存中的数据可能已经GC掉了,或者说老化了 size = atomic.LoadUintptr(&p.victimSize) ifuintptr(pid) >= size { returnnil } locals = p.victim l := indexLocal(locals, pid) if x := l.private; x != nil { l.private = nil return x } for i := 0; i < int(size); i++ { l := indexLocal(locals, (pid+i)%int(size)) if x, _ := l.shared.popTail(); x != nil { return x } }
// Mark the victim cache as empty for future gets don't bother // with it. // 前面便利了所有P的二级缓存,都没有值,则将二级缓存中的数据清空,以防止后面再次从二级缓存中获取数据时造成干扰 atomic.StoreUintptr(&p.victimSize, 0)
returnnil }
// pin pins the current goroutine to P, disables preemption and // returns poolLocal pool for the P and the P's id. // Caller must call runtime_procUnpin() when done with the pool. // pin将当前的goroutine固定到P,禁用其他的P抢占G,并将Pool和P的id返回。 // 调用方在处理完Pools后,必须调用runtime_procUnpin()。 func(p *Pool) pin() (*poolLocal, int) { // 通过运行时获取pid pid := runtime_procPin() // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). // 在pinSlow中,我们存储到local,然后存储到localSize,这里我们以相反的顺序加载。 // 由于我们禁用了抢占,GC不能在两者之间发生。 // 因此,在这里我们必须观察到localSize至少和localSize一样大。 // 我们可以观察到一个新的/更大的局部,它很好(我们必须观察它的零初始化性)。 s := runtime_LoadAcquintptr(&p.localSize) // load-acquire l := p.local // load-consume ifuintptr(pid) < s { return indexLocal(l, pid), pid } return p.pinSlow() }
func(p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. // G固定时无法锁定互斥锁。 runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() // 再次将G与P绑定 pid := runtime_procPin() // poolCleanup won't be called while we are pinned. // 锁定时不会调用poolCleanup。 s := p.localSize l := p.local ifuintptr(pid) < s { return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release return &local[pid], pid }