Golang中的sync.Pool

Pool是一组可以单独保存和检索的临时对象。Pool是线程安全的

发展历程

我先解释一下缓存(cache)和池(pool)的区别,以及为什么这个区别对讨论很重要。Brad Fizpatrick 建议的类型实际上是一种池:一组可以互换的值,取出时并不关心具体的值是什么,因为每个值都是刚被初始化的状态,值是相同的。你甚至分不出来刚刚拿到的值是从池里取出来的,还是新创建的。另一方面,缓存是一些相呼映射的键和值。一个明显的例子是磁盘缓存。磁盘缓存将慢速存储中的文件缓存在系统主内存里,以便提高访问速度。如果缓存里有对应键 A 和 B 的值(磁盘缓存的例子里,就是文件名),而你请求了与 A 对应的值,你显然不想得到 B 所对应的值。实际上,缓存里的值是互不相同的,增加了缓存清除机制的复杂性,就是说到底哪个值应该被清除出缓存。维基百科上关于缓存算法的页面,列举了 13 种不同的清除缓存的算法,从著名的 LRU 缓存到更复杂的比如LIRS 缓存算法

在经历了漫长的讨论后,Russ Cox 最终提议的 API 和回收策略非常简单:在垃圾收集时回收池空间。这个建议提醒我们,类型Pool的目的是在垃圾收集之间重用内存。它不应该避免垃圾回收,而是让垃圾回收变得更有效。

Pool设计用意是在全局变量里维护的释放链表,尤其是被多个 goroutine 同时访问的全局变量。使用Pool代替自己写的释放链表,可以让程序运行的时候,在恰当的场景下从池里重用某项值。sync.Pool一种合适的方法是,为临时缓冲区创建一个池,多个客户端使用这个缓冲区来共享全局资源。另一方面,如果释放链表是某个对象的一部分,并由这个对象维护,而这个对象只由一个客户端使用,在这个客户端工作完成后释放链表,那么用Pool实现这个释放链表是不合适的。

Pool的提出,主要是用于解决GC负重的问题。GC是自动的,好处是能够减轻使用者负担,但是劣势是增加了运行时的开销,使用不当会严重影响程序的性能。在高性能下,不能任意产生太多的垃圾,否则GC负担重,会影响性能。

解决方案:

  • 重用对象

    Pool包:目的是用来保存和复用临时对象,以减少内存分配,降低GC压力

例如在Echo源码中,Echo的路由器基于radix tree,使得路由查找非常快速。它利用同步池来重用内存,并在没有GC开销的情况下实现零动态内存分配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// New creates an instance of Echo.
func New() (e *Echo) {
e = &Echo{
filesystem: createFilesystem(),
Server: new(http.Server),
TLSServer: new(http.Server),
AutoTLSManager: autocert.Manager{
Prompt: autocert.AcceptTOS,
},
Logger: log.New("echo"),
colorer: color.New(),
maxParam: new(int),
ListenerNetwork: "tcp",
}
e.Server.Handler = e
e.TLSServer.Handler = e
e.HTTPErrorHandler = e.DefaultHTTPErrorHandler
e.Binder = &DefaultBinder{}
e.JSONSerializer = &DefaultJSONSerializer{}
e.Logger.SetLevel(log.ERROR)
e.StdLogger = stdLog.New(e.Logger.Output(), e.Logger.Prefix()+": ", 0)
// 创建context
e.pool.New = func() interface{} {
return e.NewContext(nil, nil)
}
e.router = NewRouter(e)
e.routers = map[string]*Router{}
return
}

// NewContext returns a Context instance.
func (e *Echo) NewContext(r *http.Request, w http.ResponseWriter) Context {
return &context{
request: r,
response: NewResponse(w, e),
store: make(Map),
echo: e,
pvalues: make([]string, *e.maxParam),
handler: NotFoundHandler,
}
}

// AcquireContext returns an empty `Context` instance from the pool.
// You must return the context by calling `ReleaseContext()`.
// 获取context,从pool中拿到context,调用之后,需要ReleaseContext
func (e *Echo) AcquireContext() Context {
return e.pool.Get().(Context)
}

// ReleaseContext returns the `Context` instance back to the pool.
// You must call it after `AcquireContext()`.
// 释放context,将context放到pool中
func (e *Echo) ReleaseContext(c Context) {
e.pool.Put(c)
}

// ServeHTTP implements `http.Handler` interface, which serves HTTP requests.
func (e *Echo) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Acquire context
// 处理请求
c := e.pool.Get().(*context)
c.Reset(r, w)
var h HandlerFunc

if e.premiddleware == nil {
e.findRouter(r.Host).Find(r.Method, GetPath(r), c)
h = c.Handler()
h = applyMiddleware(h, e.middleware...)
} else {
h = func(c Context) error {
e.findRouter(r.Host).Find(r.Method, GetPath(r), c)
h := c.Handler()
h = applyMiddleware(h, e.middleware...)
return h(c)
}
h = applyMiddleware(h, e.premiddleware...)
}

// Execute chain
if err := h(c); err != nil {
e.HTTPErrorHandler(err, c)
}

// Release context
e.pool.Put(c)
}

又例如在Gin源码中,context通过poolgetput

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func New() *Engine {
debugPrintWARNINGNew()
engine := &Engine{
RouterGroup: RouterGroup{
Handlers: nil,
basePath: "/",
root: true,
},
FuncMap: template.FuncMap{},
RedirectTrailingSlash: true,
RedirectFixedPath: false,
HandleMethodNotAllowed: false,
ForwardedByClientIP: true,
RemoteIPHeaders: []string{"X-Forwarded-For", "X-Real-IP"},
TrustedPlatform: defaultPlatform,
UseRawPath: false,
RemoveExtraSlash: false,
UnescapePathValues: true,
MaxMultipartMemory: defaultMultipartMemory,
trees: make(methodTrees, 0, 9),
delims: render.Delims{Left: "{{", Right: "}}"},
secureJSONPrefix: "while(1);",
trustedProxies: []string{"0.0.0.0/0", "::/0"},
trustedCIDRs: defaultTrustedCIDRs,
}
engine.RouterGroup.engine = engine
engine.pool.New = func() any {
return engine.allocateContext(engine.maxParams)
}
return engine
}

// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()

engine.handleHTTPRequest(c)

engine.pool.Put(c)
}

sync.Pool不是缓存,因此它不叫做sync.Cache.

使用

sync.Pool有两种使用方式

方式一:初始化数据和存放数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"fmt"
"sync"
)

func main() {
p := sync.Pool{
New: func() interface{} {
return 0
},
}

a := p.Get().(int)
p.Put(1)
b := p.Get().(int)
c := p.Get().(int)
fmt.Println(a, b, c)
}
//
0 1 0

方式二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"
"sync"
)

func main() {
p := &sync.Pool{}
// 不指定new,则会返回nil
a := p.Get()
if a == nil {
a = func() interface{} {
return 0
}()
}
p.Put(1)
b := p.Get().(int)
fmt.Println(a, b)
}
// 0 1

源码阅读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// A Pool is a set of temporary objects that may be individually saved and
// retrieved.
//
// Any item stored in the Pool may be removed automatically at any time without
// notification. If the Pool holds the only reference when this happens, the
// item might be deallocated.
//
// A Pool is safe for use by multiple goroutines simultaneously.
//
// Pool's purpose is to cache allocated but unused items for later reuse,
// relieving pressure on the garbage collector. That is, it makes it easy to
// build efficient, thread-safe free lists. However, it is not suitable for all
// free lists.
//
// An appropriate use of a Pool is to manage a group of temporary items
// silently shared among and potentially reused by concurrent independent
// clients of a package. Pool provides a way to amortize allocation overhead
// across many clients.
//
// An example of good use of a Pool is in the fmt package, which maintains a
// dynamically-sized store of temporary output buffers. The store scales under
// load (when many goroutines are actively printing) and shrinks when
// quiescent.
//
// On the other hand, a free list maintained as part of a short-lived object is
// not a suitable use for a Pool, since the overhead does not amortize well in
// that scenario. It is more efficient to have such objects implement their own
// free list.
//
// A Pool must not be copied after first use.
// Pool是一组可以单独保存和检索的临时对象。
// 存储在Pool中的任何对象可随时自动删除,无需通知。如果发生这种情况时Pool中只有一个引用,则该项可能会被释放。
// 一个Pool可以安全地同时被多个goroutine使用。
// Pool的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由列表变得容易。然而,它并不适用于所有空闲列表。
// Pool的一个适当用途是管理一组临时对象,这些临时对象在包的并发独立客户端之间默默共享,并可能被其重用。Pool提供了一种在许多client上分摊分配开销的方法。
// 一个很好地使用池的例子是fmt包,它维护了临时输出缓冲区的动态大小存储。存储在有负载时扩容(当许多goroutine正在打印时),在静止时缩小。
// 另一方面,作为短周期对象的一部分维护的空闲列表不适合用于Pool,因为在这种情况下开销不会很好地摊销。让这样的对象实现自己的空闲列表更有效。
// 首次使用后不得复制池。
type Pool struct {
noCopy noCopy

local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal 每个P的Pool的本地固定大小,实际类型为[P]poolLocal
localSize uintptr // size of the local array 本地poolLocal链表的大小

victim unsafe.Pointer // local from previous cycle 上一次GC的该P Pool中的数据
victimSize uintptr // size of victims array 上一次GC的该P Pool中数据的个数

// New optionally specifies a function to generate
// a value when Get would otherwise return nil. 如果没有设置New函数,则会返回nil
// It may not be changed concurrently with calls to Get.
// 它不能与Get调用同时更改。
New func() any
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// Local per-P Pool appendix. 本地per-P池附录。
type poolLocalInternal struct {
private any // Can be used only by the respective P.
shared poolChain // Local P can pushHead/popHead; any P can popTail.
}

type poolLocal struct {
poolLocalInternal

// Prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

// poolChain is a dynamically-sized version of poolDequeue.
//
// This is implemented as a doubly-linked list queue of poolDequeues
// where each dequeue is double the size of the previous one. Once a
// dequeue fills up, this allocates a new one and only ever pushes to
// the latest dequeue. Pops happen from the other end of the list and
// once a dequeue is exhausted, it gets removed from the list.
type poolChain struct {
// head is the poolDequeue to push to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *poolChainElt

// tail is the poolDequeue to popTail from. This is accessed
// by consumers, so reads and writes must be atomic.
tail *poolChainElt
}

type poolChainElt struct {
poolDequeue

// next and prev link to the adjacent poolChainElts in this
// poolChain.
//
// next is written atomically by the producer and read
// atomically by the consumer. It only transitions from nil to
// non-nil.
//
// prev is written atomically by the consumer and read
// atomically by the producer. It only transitions from
// non-nil to nil.
next, prev *poolChainElt
}

可以看到,Pool的数据结构底层时一个双向链表,Pool不能指定大小,大小只受限于GC临界值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
// 在GC开始时,当STW时调用此函数。
// 这种时候,不能分配内存,也不会调用任何runtime的函数。

// Because the world is stopped, no pool user can be in a
// pinned section (in effect, this has all Ps pinned).
// 因为STW,所以Pool的使用者都不会在固定的P上(这个时候P是固定的,但是P队列中的G在STW时可能会被移动到其他的P上)

// Drop victim caches from all pools.
// 从所有Pool中删除次级缓存
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}

// Move primary cache to victim cache.
// 将主缓存移动到次级缓存
// oldPools是可能具有非空二级缓存的Pool集合,其实是个切片。受STW保护。
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}

// The pools with non-empty primary caches now have non-empty
// victim caches and no pools have primary caches.
// 具有非空主缓存的Pool拥有非空的次级缓存,并且所有的Pool没有主缓存
oldPools, allPools = allPools, nil
}

可以看到,对象的最大缓存周期是GC周期,当GC调用时,没有被引用的对象都会被清理掉;

也就是说,在执行runtime.GC()之后,再Get(),获取到的值就是0;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Put adds x to the pool.
// 将x放到池子中
func (p *Pool) Put(x any) {
if x == nil {
return
}
if race.Enabled {
// 当打开数据竞态,有四分之一的概率,将数据直接丢弃
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
// 如果数据没有丢,通过散列处理x数据的指针,避免多线程同步修改
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 获取当前p的本地缓存,如果本地缓存为空,说明缓存中没有使用,将x的值给到缓存,并且清掉x的值
l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
// 如果本地缓存不为空,说明缓存中已经保存了对象,将x推送到共享缓存的头部
if x != nil {
l.shared.pushHead(x)
}
// 运行时取消G到P的固定
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}

可以看到,Put()方法会判断当前P本地缓存中是否已经有缓存的数据,如果本地有,则会将数据插入到共享缓存的头中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Get selects an arbitrary item from the Pool, removes it from the
// Pool, and returns it to the caller.
// Get may choose to ignore the pool and treat it as empty.
// Callers should not assume any relation between values passed to Put and
// the values returned by Get.
//
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
// Get从Pool中拿到任意一个对象,将其从Pool中删除,并将其返回给调用者。
// Get可能会忽略Pool中已经存在的对象,返回一个nil。调用方不应假定传递给Put的值与Get返回的值之间存在任何关系。
//
// 如果Get返回nil,而p.New为非nil,则Get返回调用p.New的结果。
func (p *Pool) Get() any {
if race.Enabled {
race.Disable()
}
// 获取当前线程的pid,将当前的goroutine固定到P,禁用抢占
// 此时可以优先从P的本地缓存中获取Pool对象
l, pid := p.pin()
// 判断P的本地缓存中是否有值
x := l.private
l.private = nil
// 如果P本地缓存中的变量为nil
if x == nil {
// Try to pop the head of the local shard. We prefer
// the head over the tail for temporal locality of
// reuse.
// 如果P本地缓存中的变量是空的,从共享缓存的头取出一个
x, _ = l.shared.popHead()
// 如果还是为空,说明本地缓存和共享缓存中都没有值
if x == nil {
// 从其他线程缓存列表中取
x = p.getSlow(pid)
}
}
// 将当前的goroutine和P解除固定
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
// 如果竞态检测打开,并且x不为nil,则将x的地址进行静态检测
race.Acquire(poolRaceAddr(x))
}
}
// 如果x为空,并且此时本地缓存、共享缓存、其他P缓存中都没有值,而且p.New返回值不为空,则返回p.New()
if x == nil && p.New != nil {
x = p.New()
}
return x
}

func (p *Pool) getSlow(pid int) any {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// Try to steal one element from other procs.
// 尝试从其他P中窃取一个元素。
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i+1)%int(size))
// 从其他P的共享缓存队列尾部获取一个对象
if x, _ := l.shared.popTail(); x != nil {
// 获取到不为nil,就返回
return x
}
}

// Try the victim cache. We do this after attempting to steal
// from all primary caches because we want objects in the
// victim cache to age out if at all possible.
// 尝试在二级缓存中获取。
// 在尝试从所有主缓存(本地缓存,共享缓存,其他P的缓存)中获取数据后执行此操作,因为二级缓存中的数据可能已经GC掉了,或者说老化了
size = atomic.LoadUintptr(&p.victimSize)
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}

// Mark the victim cache as empty for future gets don't bother
// with it.
// 前面便利了所有P的二级缓存,都没有值,则将二级缓存中的数据清空,以防止后面再次从二级缓存中获取数据时造成干扰
atomic.StoreUintptr(&p.victimSize, 0)

return nil
}

// pin pins the current goroutine to P, disables preemption and
// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
// pin将当前的goroutine固定到P,禁用其他的P抢占G,并将Pool和P的id返回。
// 调用方在处理完Pools后,必须调用runtime_procUnpin()。
func (p *Pool) pin() (*poolLocal, int) {
// 通过运行时获取pid
pid := runtime_procPin()
// In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
// 在pinSlow中,我们存储到local,然后存储到localSize,这里我们以相反的顺序加载。
// 由于我们禁用了抢占,GC不能在两者之间发生。
// 因此,在这里我们必须观察到localSize至少和localSize一样大。
// 我们可以观察到一个新的/更大的局部,它很好(我们必须观察它的零初始化性)。
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
return p.pinSlow()
}

func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
// G固定时无法锁定互斥锁。
runtime_procUnpin()
allPoolsMu.Lock()
defer allPoolsMu.Unlock()
// 再次将G与P绑定
pid := runtime_procPin()
// poolCleanup won't be called while we are pinned.
// 锁定时不会调用poolCleanup。
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
}
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}

可以看到,Get()方法返回时是返回Pool中任意一个对象,没有顺序。并且与存放到Pool中的对象并没有一定的关联。

Get()之后,对象会从Pool中删除,因此,使用完之后,还需要再次放到Pool中。

如果Pool中没有对象,也就是本地缓存、共享缓存、其他P的共享缓存、二级缓存中都没有对象,而且p.New()不为空,那么会调用p.New()新生成一个;如果没有指定p.New(),也就是p.New()为空,那么返回nil

适用场景

当多个goroutine都需要创建同一个对象的时候,如果goroutine过多,可能导致对象的创建数目剧增。而对象又是占用内存的,进而导致的就是内存回收的GC压力陡增。造成并发大-占用内存大-GC缓慢-处理并发能力降低-并发更大这样的恶性循环。

在这个时候,就可以使用对象池,每个goroutine不再单独创建对象,而是从对象池中获取一个对象(如果对象池中已经有的话,使用完之后,再放回对象池)

更加优化的一种使用方式,可参考推荐阅读中:justforfunc #37: sync.Pool from the pool

推荐阅读

golang sync.Pool试用说明及注意事项

[译] CockroachDB GC优化总结

Golang 优化之路——临时对象池

How did I improve latency by 700% using sync.Pool

How to implement Memory Pooling in Golang

sync: Pool example suggests incorrect usage #23199

sync: avoid clearing the full Pool on every GC #22950

Mastering Go Programming : Syncs and Locks | packtpub.com

justforfunc #37: sync.Pool from the pool

sync.Pool from the pool

SREcon17 Asia/Australia: Golang’s Garbage

关于Go Module的争吵