阅读sync.Pool文档
This commit is contained in:
@@ -310,6 +310,404 @@ ok git.kazusa.red/asahi/fuzz-demo 10.360s
|
|||||||
> #### new interesting
|
> #### new interesting
|
||||||
> `new interesting`指会扩充code coverage的用例输入,在fuzz test刚开始时,new interesting数量通常会因发现新的代码路径快速增加,然后,会随着时间的推移逐渐减少
|
> `new interesting`指会扩充code coverage的用例输入,在fuzz test刚开始时,new interesting数量通常会因发现新的代码路径快速增加,然后,会随着时间的推移逐渐减少
|
||||||
|
|
||||||
|
## Go Sync
|
||||||
|
### sync.Pool
|
||||||
|
`sync.Pool`为golang标准库中的实现,用于降低allocation和减少垃圾回收。
|
||||||
|
|
||||||
|
### sync.Pool使用示例
|
||||||
|
golang中`sync.Pool`使用示例如下:
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobStateFresh JobState = iota
|
||||||
|
JobStateRunning
|
||||||
|
JobStateRecycled
|
||||||
|
)
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
state JobState
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) Run() {
|
||||||
|
switch j.state {
|
||||||
|
case JobStateRecycled:
|
||||||
|
fmt.Println("this job came from the pool")
|
||||||
|
case JobStateFresh:
|
||||||
|
fmt.Println("this job just got allocated")
|
||||||
|
}
|
||||||
|
|
||||||
|
j.state = JobStateRunning
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
pool := &sync.Pool{
|
||||||
|
New: func() any {
|
||||||
|
return &Job{state: JobStateFresh}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// get a job from the pool
|
||||||
|
job := pool.Get().(*Job)
|
||||||
|
|
||||||
|
// run it
|
||||||
|
job.Run()
|
||||||
|
|
||||||
|
// put it back in the pool
|
||||||
|
job.state = JobStateRecycled
|
||||||
|
pool.Put(job)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Pool和垃圾回收
|
||||||
|
`sync.Pool`对象实际是由两部分组成:
|
||||||
|
- `local pool`
|
||||||
|
- `victim pool`
|
||||||
|
|
||||||
|
调用Pool中的方法时,实际行为如下:
|
||||||
|
- `Put`:调用Put方法时,会将对象添加到`local`
|
||||||
|
- `Get`:调用Get时,首先会从`local`中查找,如果`local`中未找到,那么会从`victim`中查找,如果victim中仍然不存在,那么则是会调用`New`
|
||||||
|
|
||||||
|
在`sync.Pool`中,`local`被用作primary cache,victim则被用作victim cache。
|
||||||
|
|
||||||
|
#### poolCleanup
|
||||||
|
poolCleanUp方法实现如下:
|
||||||
|
```go
|
||||||
|
func poolCleanup() {
|
||||||
|
// This function is called with the world stopped, at the beginning of a garbage collection.
|
||||||
|
// It must not allocate and probably should not call any runtime functions.
|
||||||
|
|
||||||
|
// Because the world is stopped, no pool user can be in a
|
||||||
|
// pinned section (in effect, this has all Ps pinned).
|
||||||
|
|
||||||
|
// Drop victim caches from all pools.
|
||||||
|
for _, p := range oldPools {
|
||||||
|
p.victim = nil
|
||||||
|
p.victimSize = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Move primary cache to victim cache.
|
||||||
|
for _, p := range allPools {
|
||||||
|
p.victim = p.local
|
||||||
|
p.victimSize = p.localSize
|
||||||
|
p.local = nil
|
||||||
|
p.localSize = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// The pools with non-empty primary caches now have non-empty
|
||||||
|
// victim caches and no pools have primary caches.
|
||||||
|
oldPools, allPools = allPools, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
allPoolsMu Mutex
|
||||||
|
|
||||||
|
// allPools is the set of pools that have non-empty primary
|
||||||
|
// caches. Protected by either 1) allPoolsMu and pinning or 2)
|
||||||
|
// STW.
|
||||||
|
allPools []*Pool
|
||||||
|
|
||||||
|
// oldPools is the set of pools that may have non-empty victim
|
||||||
|
// caches. Protected by STW.
|
||||||
|
oldPools []*Pool
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
runtime_registerPoolCleanup(poolCleanup)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
#### allPools & oldPools
|
||||||
|
所有被实例化的`sync.Pool`对象,`在修生变化时`,都会将其自身注册到`allPools`静态变量中。
|
||||||
|
|
||||||
|
其中,`allPools`引用了所有`local`(primary cache)不为空的pool实例,而`oldPools`则引用了所有`victim`(victim cache)不为空的pool实例。
|
||||||
|
|
||||||
|
在init方法中,将poolCleanup注册到了runtime,在STW的上线文中,poolCleanup将会`在垃圾回收之前`被runtime调用。
|
||||||
|
|
||||||
|
poolCleanup方法逻辑比较简单,具体如下:
|
||||||
|
- 将`victim`丢弃,并且将`local`转移到`victim`,最后将`local`置为空
|
||||||
|
- 将静态变量中`allPools`的值转移到`oldPools`,并且将`oldPools`的值置为空
|
||||||
|
|
||||||
|
这代表如果pool中的对象如果长期未被访问,那么将会从pool中被淘汰。
|
||||||
|
|
||||||
|
> `poolCleanUp`方法在STW时会被调用,第一次STW时,未使用对象会从local移动到victim,而第二次STW,则是会从victim中被丢弃,之后被后续的垃圾回收清理。
|
||||||
|
|
||||||
|
### Proc Pining
|
||||||
|
关于`sync.Pool`,其实际结构如下:
|
||||||
|
```go
|
||||||
|
type Pool struct {
|
||||||
|
noCopy noCopy
|
||||||
|
|
||||||
|
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
|
||||||
|
localSize uintptr // size of the local array
|
||||||
|
|
||||||
|
victim unsafe.Pointer // local from previous cycle
|
||||||
|
victimSize uintptr // size of victims array
|
||||||
|
|
||||||
|
// New optionally specifies a function to generate
|
||||||
|
// a value when Get would otherwise return nil.
|
||||||
|
// It may not be changed concurrently with calls to Get.
|
||||||
|
New func() any
|
||||||
|
}
|
||||||
|
```
|
||||||
|
#### per-P
|
||||||
|
关于调度的actor,其存在如下角色:
|
||||||
|
- goroutine:`G's`
|
||||||
|
- machines:`M's`代表系统线程
|
||||||
|
- processor:`P's`代表处理器物理线程
|
||||||
|
|
||||||
|
其中,`goroutine`由操作系统线程执行,而操作系统线程在执行时需要获取实际的cpu物理线程。
|
||||||
|
|
||||||
|
在gouroutine运行时,存在一些`safe-point`,在`safe-point`goroutine可以在`clean`状态被停止。故而,`抢占只能发生在safe-point`。
|
||||||
|
|
||||||
|
`proc pinning`会禁止抢占,在pinning后,P(物理线程)将会被独占,在`unpin`发生之前,goroutine会一直执行,并不会被停止,甚至不会被GC停止。`unpin之前,P无法被其他goroutine使用`。
|
||||||
|
|
||||||
|
一旦`pinned`后,execution flow在P上不会被中断,`这也意味着在访问threadlocal数据时无需加锁`。
|
||||||
|
|
||||||
|
如下是围绕Pinning的逻辑:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// pin pins the current goroutine to P, disables preemption and
|
||||||
|
// returns poolLocal pool for the P and the P's id.
|
||||||
|
// Caller must call runtime_procUnpin() when done with the pool.
|
||||||
|
func (p *Pool) pin() (*poolLocal, int) {
|
||||||
|
pid := runtime_procPin()
|
||||||
|
// In pinSlow we store to local and then to localSize, here we load in opposite order.
|
||||||
|
// Since we've disabled preemption, GC cannot happen in between.
|
||||||
|
// Thus here we must observe local at least as large localSize.
|
||||||
|
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
|
||||||
|
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
|
||||||
|
l := p.local // load-consume
|
||||||
|
if uintptr(pid) < s {
|
||||||
|
return indexLocal(l, pid), pid
|
||||||
|
}
|
||||||
|
return p.pinSlow()
|
||||||
|
}
|
||||||
|
|
||||||
|
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
|
||||||
|
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
|
||||||
|
return (*poolLocal)(lp)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### local & localSize
|
||||||
|
- `local`:local是一个由`poolLocal`对象组成`c-style`数组
|
||||||
|
- `localSize`:localSize是`local`数组的大小
|
||||||
|
- `poolLocal`: local数组中的每个poolLocal都关联一个给定的P
|
||||||
|
- `runtime_procPin`:该方法会返回`pin`锁关联的processor id,processor id从0开始依次加1,直到`GOMAXPROCS`
|
||||||
|
|
||||||
|
分析上述`indexLocal`方法的逻辑,其根据processor id的值,计算了pid关联poolLocal对象地址的偏移量,并返回poolLocal对象的指针。这令我们可以并发安全的访问poolLocal对象而无需加锁,`只需要pinned并且直接访问threadlocal变量`。
|
||||||
|
|
||||||
|
#### PinSlow
|
||||||
|
`pinSlow`方法是针对`pin`的fallback方法,其代表我们针对local数组大小的假设是错误的,本次绑定的P其并没有对应的poolLocal。
|
||||||
|
|
||||||
|
代码进入到pinSlow有如下可能:
|
||||||
|
- `GOMAXPROCS`被更新过,从而有了额外可用的P
|
||||||
|
- 该pool对象是新创建的
|
||||||
|
|
||||||
|
pinSlow的代码如下:
|
||||||
|
```go
|
||||||
|
func (p *Pool) pinSlow() (*poolLocal, int) {
|
||||||
|
// Retry under the mutex.
|
||||||
|
// Can not lock the mutex while pinned.
|
||||||
|
runtime_procUnpin()
|
||||||
|
allPoolsMu.Lock()
|
||||||
|
defer allPoolsMu.Unlock()
|
||||||
|
pid := runtime_procPin()
|
||||||
|
// poolCleanup won't be called while we are pinned.
|
||||||
|
s := p.localSize
|
||||||
|
l := p.local
|
||||||
|
if uintptr(pid) < s {
|
||||||
|
return indexLocal(l, pid), pid
|
||||||
|
}
|
||||||
|
if p.local == nil {
|
||||||
|
allPools = append(allPools, p)
|
||||||
|
}
|
||||||
|
// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
|
||||||
|
size := runtime.GOMAXPROCS(0)
|
||||||
|
local := make([]poolLocal, size)
|
||||||
|
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
|
||||||
|
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
|
||||||
|
return &local[pid], pid
|
||||||
|
}
|
||||||
|
```
|
||||||
|
当处于`pinned`状态时,无法获取针对`allPools`变量的锁,这样有可能会导致死锁。
|
||||||
|
|
||||||
|
> 如果在处于pinned状态的情况下获取锁,那么此时锁可能被其他goroutine持有,而持有锁的goroutine可能正在等待我们释放P
|
||||||
|
|
||||||
|
故而,在pinSlow中,首先`unpin`,然后获取锁,并且在获取锁之后重新进入`pin`状态
|
||||||
|
|
||||||
|
在重新进入pin状态并且获取到allPoolsMu的锁之后,首先会检测目前pid是否有关联的poolLocal对象,如果有,则直接返回,这通常在如下场景下发生:
|
||||||
|
- 在阻塞获取allPoolsMu锁时,其他goroutinue已经为我们扩充了local数组的大小
|
||||||
|
- 我们不再绑定在之前的P上了,我们可能绑定在另一个pid小于local数组大小的P上
|
||||||
|
|
||||||
|
如果目前pool对象其local数组为空,那么其会先将pool实例注册到allPools中,然后执行如下逻辑:
|
||||||
|
- 创建一个新的poolLocal slice,slice大小和GOMAXPROCS相同,并将新创建slice的头一个元素地址存储到`p.local`中
|
||||||
|
- 将slice大小存储在`p.localSize`中
|
||||||
|
|
||||||
|
### Pool Local
|
||||||
|
`poolLocal`结构如下:
|
||||||
|
```go
|
||||||
|
// Local per-P Pool appendix.
|
||||||
|
type poolLocalInternal struct {
|
||||||
|
private any // Can be used only by the respective P.
|
||||||
|
shared poolChain // Local P can pushHead/popHead; any P can popTail.
|
||||||
|
}
|
||||||
|
|
||||||
|
type poolLocal struct {
|
||||||
|
poolLocalInternal
|
||||||
|
|
||||||
|
// Prevents false sharing on widespread platforms with
|
||||||
|
// 128 mod (cache line size) = 0 .
|
||||||
|
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
|
||||||
|
}
|
||||||
|
```
|
||||||
|
> 对于poolLocalIntenral中的poolChain,local P可以执行pushHead/popHead逻辑,而任何P都可以执行popTail逻辑
|
||||||
|
|
||||||
|
### pool的Put/Get
|
||||||
|
#### Put
|
||||||
|
其中,Put相关逻辑如下:
|
||||||
|
```go
|
||||||
|
// Put adds x to the pool.
|
||||||
|
func (p *Pool) Put(x any) {
|
||||||
|
if x == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if race.Enabled {
|
||||||
|
if fastrandn(4) == 0 {
|
||||||
|
// Randomly drop x on floor.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
race.ReleaseMerge(poolRaceAddr(x))
|
||||||
|
race.Disable()
|
||||||
|
}
|
||||||
|
l, _ := p.pin()
|
||||||
|
if l.private == nil {
|
||||||
|
l.private = x
|
||||||
|
} else {
|
||||||
|
l.shared.pushHead(x)
|
||||||
|
}
|
||||||
|
runtime_procUnpin()
|
||||||
|
if race.Enabled {
|
||||||
|
race.Enable()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
其核心逻辑如下:
|
||||||
|
- pin,并获取poolLocal
|
||||||
|
- 如果poolLocal中private为空,将item放到private中
|
||||||
|
- 如果private不为空,将其放入shared中,LIFO
|
||||||
|
- 然后unpin
|
||||||
|
|
||||||
|
#### Get
|
||||||
|
Get的相关逻辑如下:
|
||||||
|
```go
|
||||||
|
// Get selects an arbitrary item from the Pool, removes it from the
|
||||||
|
// Pool, and returns it to the caller.
|
||||||
|
// Get may choose to ignore the pool and treat it as empty.
|
||||||
|
// Callers should not assume any relation between values passed to Put and
|
||||||
|
// the values returned by Get.
|
||||||
|
//
|
||||||
|
// If Get would otherwise return nil and p.New is non-nil, Get returns
|
||||||
|
// the result of calling p.New.
|
||||||
|
func (p *Pool) Get() any {
|
||||||
|
if race.Enabled {
|
||||||
|
race.Disable()
|
||||||
|
}
|
||||||
|
l, pid := p.pin()
|
||||||
|
x := l.private
|
||||||
|
l.private = nil
|
||||||
|
if x == nil {
|
||||||
|
// Try to pop the head of the local shard. We prefer
|
||||||
|
// the head over the tail for temporal locality of
|
||||||
|
// reuse.
|
||||||
|
x, _ = l.shared.popHead()
|
||||||
|
if x == nil {
|
||||||
|
x = p.getSlow(pid)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
runtime_procUnpin()
|
||||||
|
if race.Enabled {
|
||||||
|
race.Enable()
|
||||||
|
if x != nil {
|
||||||
|
race.Acquire(poolRaceAddr(x))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if x == nil && p.New != nil {
|
||||||
|
x = p.New()
|
||||||
|
}
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
```
|
||||||
|
以下是pool的Get核心流程:
|
||||||
|
- pin, 并且获取poolLocal
|
||||||
|
- 将private清空,并且判断之前private是否有值,如果有值,将使用该值
|
||||||
|
- 如果private之前没有值,那么对shared执行pop操作,LIFO,如果pop操作获取的值不为空,使用该值
|
||||||
|
- 如果对shared执行LIFO pop操作的也为空,那么会执行slow path的getSlow方法
|
||||||
|
- 如果在getSlow仍然未获取到值的情况下,会调用`New`方法来获取值
|
||||||
|
|
||||||
|
> #### LIFO
|
||||||
|
> 对于poolLocal的shared队列,其使用的是LIFO,最后添加到队列的元素会被最先弹出。这代表我们希望使用最新分配的对象,旧分配的对象会随着`STW`被逐渐淘汰。
|
||||||
|
|
||||||
|
##### slow path
|
||||||
|
在调用Get方法时,slow path仅当private和shared都为空时被触发,这代表当前threadlocal pool为空。
|
||||||
|
|
||||||
|
在触发slow path场景下,会尝试从其他P中窃取对象,如果在窃取仍然失败的场景下,才会去`victim`中进行查找。
|
||||||
|
|
||||||
|
Get方法中slow path实现如下:
|
||||||
|
```go
|
||||||
|
func (p *Pool) getSlow(pid int) any {
|
||||||
|
// See the comment in pin regarding ordering of the loads.
|
||||||
|
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
|
||||||
|
locals := p.local // load-consume
|
||||||
|
// Try to steal one element from other procs.
|
||||||
|
for i := 0; i < int(size); i++ {
|
||||||
|
l := indexLocal(locals, (pid+i+1)%int(size))
|
||||||
|
if x, _ := l.shared.popTail(); x != nil {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try the victim cache. We do this after attempting to steal
|
||||||
|
// from all primary caches because we want objects in the
|
||||||
|
// victim cache to age out if at all possible.
|
||||||
|
size = atomic.LoadUintptr(&p.victimSize)
|
||||||
|
if uintptr(pid) >= size {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
locals = p.victim
|
||||||
|
l := indexLocal(locals, pid)
|
||||||
|
if x := l.private; x != nil {
|
||||||
|
l.private = nil
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
for i := 0; i < int(size); i++ {
|
||||||
|
l := indexLocal(locals, (pid+i)%int(size))
|
||||||
|
if x, _ := l.shared.popTail(); x != nil {
|
||||||
|
return x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark the victim cache as empty for future gets don't bother
|
||||||
|
// with it.
|
||||||
|
atomic.StoreUintptr(&p.victimSize, 0)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
- 首先,会尝试对`pool.local`数组中所有的poolLocal对象都调用popTail方法,如果任一方法返回值不为空,那么将会使用该返回的值。`窃取操作会尝试窃取尾部的对象,这是最先被创建的对象`。
|
||||||
|
- 如果在local中未能找到和窃取到对象,那么会从victim中进行查找
|
||||||
|
- 首先,获取victim中当前pid对象的poolLocal对象,检查poolLocal对象private是否不为空,如果不为空,使用该值并将victim.private清空
|
||||||
|
- 如果private为空,那么则对victim中所有P关联的poolLocal对象执行popTail操作,如果任何一个pop操作返回不为空,那么使用返回的对象
|
||||||
|
- 如果所有victim中的poolLocal对象都返回为空,那么会将victim中`p.victimSize`标识为空,后续再次执行slow path时,如果感知到victimSize为空,那么便不会再次查找victim
|
||||||
|
|
||||||
|
|
||||||
## syntax
|
## syntax
|
||||||
### iota
|
### iota
|
||||||
`iota`关键字代表连续的整数变量,`0, 1, 2`,每当`const`关键字出现时,其重置为0
|
`iota`关键字代表连续的整数变量,`0, 1, 2`,每当`const`关键字出现时,其重置为0
|
||||||
|
|||||||
Reference in New Issue
Block a user