sync.Pool的坑

内存泄漏

取出来的 bytes.Buffer 在使用的时候,我们可以往这个元素中增加大量的 byte 数据,这会导致底层的 byte slice 的容量可能会变得很大。这个时候,即使 Reset 再放回到池子中,这些 byte slice 的容量不会改变,所占的空间依然很大。而且,因为 Pool 回收的机制,这些大的 Buffer 可能不被回收,而是会一直占用很大的空间,这属于内存泄漏的问题

在使用 sync.Pool 回收 buffer 的时候,一定要检查回收的对象的大小。如果 buffer 太大,就不要回收了,否则就太浪费了;

内存浪费

除了内存泄漏以外,还有一种浪费的情况,就是池子中的 buffer 都比较大,但在实际使用的时候,很多时候只需要一个小的 buffer,这也是一种浪费现象;

要做到物尽其用,尽可能不浪费的话,我们可以将 buffer 池分成几层。首先,小于 512 byte 的元素的 buffer 占一个池子;其次,小于 1K byte 大小的元素占一个池子;再次,小于 4K byte 大小的元素占一个池子。这样分成几个池子以后,就可以根据需要,到所需大小的池子中获取 buffer 了;

bytebufferpool

在这类对象中,比较特殊的一类是字节缓冲(底层一般是字节切片)。在做字符串拼接时,为了拼接的高效,我们通常将中间结果存放在一个字节缓冲。在拼接完成之后,再从字节缓冲中生成结果字符串。在收发网络包时,也需要将不完整的包暂时存放在字节缓冲中。

Go 标准库中的类型bytes.Buffer封装字节切片,提供一些使用接口。我们知道切片的容量是有限的,容量不足时需要进行扩容。而频繁的扩容容易造成性能抖动。

快速使用

本文代码使用 Go Modules。

创建目录并初始化:

1
2
mkdir bytebufferpool && cd bytebufferpool
go mod init github.com/darjun/go-daily-lib/bytebufferpool

安装bytebufferpool库:

1
go get -u github.com/PuerkitoBio/bytebufferpool

典型的使用方式先通过bytebufferpool提供的Get()方法获取一个bytebufferpool.Buffer对象,然后调用这个对象的方法写入数据,使用完成之后再调用bytebufferpool.Put()将对象放回对象池中。例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
  "fmt"

  "github.com/valyala/bytebufferpool"
)

func main() {
  b := bytebufferpool.Get()
  b.WriteString("hello")
  b.WriteByte(',')
  b.WriteString(" world!")

  fmt.Println(b.String())

  bytebufferpool.Put(b)
}

直接调用bytebufferpool包的Get()和Put()方法,底层操作的是包中默认的对象池:

1
2
3
4
5
// bytebufferpool/pool.go
var defaultPool Pool

func Get() *ByteBuffer { return defaultPool.Get() }
func Put(b*ByteBuffer) { defaultPool.Put(b) }

我们当然可以根据实际需要创建新的对象池,将相同用处的对象放在一起(比如我们可以创建一个对象池用于辅助接收网络包,一个用于辅助拼接字符串):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
  joinPool := new(bytebufferpool.Pool)
  b := joinPool.Get()
  b.WriteString("hello")
  b.WriteByte(',')
  b.WriteString(" world!")

  fmt.Println(b.String())

  joinPool.Put(b)
}

bytebufferpool没有提供具体的创建函数,不过可以使用new创建。

源码分析

结构体

在将对象放回池中时,会根据当前切片的容量进行相应的处理。bytebufferpool将大小分为 20 个区间:

1
| < 2^6 | 2^6 ~ 2^7-1 | ... | > 2^25 |

如果容量小于 2^6,则属于第一个区间。如果处于 2^6 和 2^7-1 之间,则落在第二个区间。依次类推。执行足够多的放回次数后,bytebufferpool会重新校准,计算处于哪个区间容量的对象最多。将defaultSize设置为该区间的上限容量,第一个区间的上限容量为 2^6,第二区间为 2^7,最后一个区间为 2^26。后续通过Get()请求对象时,若池中无空闲对象,创建一个新对象时,直接将容量设置为defaultSize。这样基本可以避免在使用过程中的切片扩容,从而提升性能。下面结合代码来理解:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const (
    // 定位数据索引位置,使用位操作性能比较高效
    // 容量最小值取 2^6 = 64,因为这就是 64 位计算机上 CPU 缓存行的大小。这个大小的数据可以一次性被加载到 CPU 缓存行中,再小就无意义了
    minBitSize = 6 // 2**6=64 is a CPU cache line size
    // 数组索引个数 0~19
    steps      = 20
    // 最小缓存对象 和 最大缓存对象大小
    minSize = 1 << minBitSize
    maxSize = 1 << (minBitSize + steps - 1)
    // 校准阈值, 这里指的调用次数
    calibrateCallsThreshold = 42000
    // 百分比,校准数据基数
    maxPercentile           = 0.95
)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Pool represents byte buffer pool.
//
// Distinct pools may be used for distinct types of byte buffers.
// Properly determined byte buffer types with their own pools may help reducing
// memory waste.
type Pool struct {
    calls       [steps]uint64
    calibrating uint64
    defaultSize uint64
    maxSize     uint64
    pool sync.Pool
}
var defaultPool Pool

我们可以看到,bytebufferpool内部使用了标准库中的对象sync.Pool。

这里的steps就是上面所说的区间,一共 20 份。calls数组记录放回对象容量落在各个区间的次数。

字段解释

  • calls 缓存对象大小调用次数统计,steps 就是我们上面定义的常量。主要用来统计每类缓存大小的调用次数。steps 具体的值会使用一个index() 函数通过位操作的方式计算出来它在这个数组的索引位置;
  • calibrating 校标标记。0 表示未校准,1表示正在校准。校准完成需要从1恢复到0;
  • defaultSize 缓存对象默认大小。我们知道当从 pool 中获取缓存对象时,如果池中没有对象可取,会通过调用 一个 New() 函数创建一个新对象返回,这时新创建的对象大小为 defaultSize。当然这里没有使用New() 函数,而是直接创建了一个 指定默认大小的 ByteBuffer;
  • maxSize 允许放入pool池中的最大对象大小,只有<maxSize 的对象才允许放放池中 这里的变量 defaultPool 是一个全局的 Pool 对象。

Get

对于取对象很简单

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Get returns new byte buffer with zero length.
//
// The byte buffer may be returned to the pool via Put after the use
// in order to minimize GC overhead.
func (p *Pool) Get() *ByteBuffer {
    // 直接从 p.pool 中调用原始方法 Get() 读取,如果结果 != nil,则说明当前池中读取到了数据(当pool未设置New 方法的时候会返回nil),则直接返回对象 *ByteBuffer 即可;
    v := p.pool.Get()
    if v != nil {
        return v.(*ByteBuffer)
    }
    // 如果结果等于 nil ,则说明池中已无对象可用且未定义New方法,这时直接创建一个 p.defaultSize 大小的 *ByteBuffer 对象并返回
    return &ByteBuffer{
        B: make([]byte, 0, atomic.LoadUint64(&p.defaultSize)),
    }
}

Put

调用Pool.Put()将对象放回时,首先计算切片容量落在哪个区间,增加calls数组中相应元素的值:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// bytebufferpool/pool.go
func (p *Pool) Put(b*ByteBuffer) {
  // 对象在数据中的位置
  // 调用 index() 函数,根据对象长度计算其在 p.calls 数组中的索引位置;
  idx := index(len(b.B))
  // 校准条件
  // 将当前数组索引位置存放的值原子操作+1,(即更新相同位置对象的调用次数,注意:更新操作并没有放在Get),如果 次数>calibrateCallsThreshold(42000) ,则进行校准操作;
  if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
    p.calibrate()
  }
  // 是否需要放入pool池中,还是直接丢弃交给GC
  maxSize := int(atomic.LoadUint64(&p.maxSize))
  // 如果要放回的对象容量大于 maxSize,则不放回
  // 原子读取当前允许放入pool池中的对象大小。如果等于0或小于 maxSize ,则先 b.Reset() 重置对象,再将其放入pool池中; 否则将交由GC 来操作;
  if maxSize == 0 || cap(b.B) <= maxSize {
    b.Reset()
    p.pool.Put(b)
  }
}

上面在放入池中的时候,为什么这里还要判断大小才能放回pool池中呢?

主要原因是因为这里用的是一个 切片 数据类型,虽然在放入前执行了b.Reset,但这只是将切片里的内容进行了清除,但这个切片对象仍然是处于引用状态,并没有真正释放内存。如果一个对象从pool 取出来以后,经过一系列操作后,导致这个切片非常非常的大,这时再将其放入pool池中,此切片会一直占用着很大一块的内部,导致内存泄漏。

如果calls数组该元素超过指定值calibrateCallsThreshold=42000(说明距离上次校准,放回对象到该区间的次数已经达到阈值了,42000 应该就是个经验数字),则调用Pool.calibrate()执行校准操作:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// bytebufferpool/pool.go
func (p *Pool) calibrate() {
  // 避免并发放回对象触发 `calibrate`
  // 先判断当前是否处于正在校准状态,如果是则直接终止,否则继续执行。
  if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
    return
  }

  // step 1.统计并排序
  // calls数组记录了放回对象到对应区间的次数。按照这个次数从大到小排序。注意:minSize << i表示区间i的上限容量。
  // 先声明一个长度为20(steps) 的 callSize 切片类型变量 callSizes ,然后将 p.calls 中统计的调用次数依次放入对象 callSize.calls 中,同时根据其索引位置计算对应存放对象的大小(minSize<<i),汇总所有的调用次数到变量 callsSum。
  a := make(callSizes, 0, steps)
  var callsSum uint64
  for i := uint64(0); i < steps; i++ {
    calls := atomic.SwapUint64(&p.calls[i], 0)
    callsSum += calls
    a = append(a, callSize{
      calls: calls,
      size:  minSize << i,
    })
  }
  // 其次sort.Sort(a) 根据对象访问次数 callSize.calls 从高到低排序。
  sort.Sort(a)

  // step 2.计算 defaultSize 和 maxSize
  // defaultSize很好理解,取排序后的第一个size即可。maxSize值记录放回次数超过 95% 的多个对象容量的最大值。它的作用是防止将使用较少的大容量对象放回对象池,从而占用太多内存。这里就可以理解Pool.Put()方法后半部分的逻辑了:
  // 取出调用次数最高的对象大小,赋值给 defaultSize,作为下次创建对象时初始大小;同时从调用次数最高的95%的访问量中获取最大对象的大小,并保存到 maxSize, 作为下次存放pool池中的判断条件。
  defaultSize := a[0].size
  maxSize := defaultSize

  maxSum := uint64(float64(callsSum) * maxPercentile)
  callsSum = 0
  for i := 0; i < steps; i++ {
    if callsSum > maxSum {
      break
    }
    callsSum += a[i].calls
    size := a[i].size
    if size > maxSize {
      maxSize = size
    }
  }

  // step 3.保存对应值
  // 后续通过Pool.Get()获取对象时,若池中无空闲对象,新创建的对象默认容量为defaultSize。这样的容量能满足绝大多数情况下的使用,避免使用过程中的切片扩容。
  // 最后就是更新全局变量值,恢复校准状态为0。
  atomic.StoreUint64(&p.defaultSize, defaultSize)
  atomic.StoreUint64(&p.maxSize, maxSize)

  atomic.StoreUint64(&p.calibrating, 0)
}

当然这个库缺点也很明显,由于大部分使用的容量都小于defaultSize,会有部分内存浪费。

总结

  • 从pool池中读取的对象为 ByteBuffer,它有自己的一系列方法,并实现了io.Reader 和 io.Writer 接口;
  • 从 pool 中读取的对象初始化有可能大小为0(Put时对象已经重置),也有可能为 defaultSize(新创建指定);
  • 每从 pool 中读取次数超出 42000 时,将进行一次校准操作。此时会根据对象的使用频繁情况和大小来决定下次创建新对象的初始化大小与对象是否需要放入 pool 池中;
  • 获取对象在数据中的索引位置是根据对象的长度,即len()函数,而存放对象时,则根据对象的容量,即cap() 函数;

在我们日常开发中,如果要使用 sync.Pool 池的话,一定要考虑到对象的大小的情况,千万不要不计一切的都把对象都放入池中,当大对象过多时,很容易生成内存泄漏。

参考

Go 每日一库之 bytebufferpool