前言

准确的时间对于任何一个正在运行的应用非常重要,但是在分布式系统中我们很难保证各个节点的绝对时间一致,哪怕通过 NTP 这种标准的对时协议也只能把各个节点上时间的误差控制在毫秒级,所以准确的相对时间在分布式系统中显得更为重要,本节会分析用于获取相对时间的计时器的设计与实现原理。

设计原理

Go 语言从实现计时器到现在经历过很多个版本的迭代,到最新的版本为止,计时器的实现分别经历了以下几个过程:

  1. Go 1.9 版本之前,所有的计时器由全局唯一的四叉堆维护;
  2. Go 1.10 ~ 1.13,全局使用 64 个四叉堆维护全部的计时器,每个处理器(P)创建的计时器会由对应的四叉堆维护;
  3. Go 1.14 版本之后,每个处理器单独管理计时器并通过网络轮询器触发;

我们在这一节会分别介绍计时器在不同版本的不同设计,梳理计时器实现的演进过程。

全局四叉堆

Go 1.10 之前的计时器都使用最小四叉堆实现,所有的计时器都会存储在如下所示的结构体 runtime.timers:093adee 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var timers struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
	t            []*timer
}

这个结构体中的字段 t 就是最小四叉堆,运行时创建的所有计时器都会加入到四叉堆中。runtime.timerproc:093adee Goroutine 会运行时间驱动的事件,运行时会在发生以下事件时唤醒计时器:

  • 四叉堆中的计时器到期;
  • 四叉堆中加入了触发时间更早的新计时器;

然而全局四叉堆共用的互斥锁对计时器的影响非常大,计时器的各种操作都需要获取全局唯一的互斥锁,这会严重影响计时器的性能.

分片四叉堆

Go 1.10 将全局的四叉堆分割成了 64 个更小的四叉堆。在理想情况下,四叉堆的数量应该等于处理器的数量,但是这需要实现动态的分配过程,所以经过权衡最终选择初始化 64 个四叉堆,以牺牲内存占用的代价换取性能的提升。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
const timersLen = 64

var timers [timersLen]struct {
	timersBucket
}

type timersBucket struct {
	lock         mutex
	gp           *g
	created      bool
	sleeping     bool
	rescheduling bool
	sleepUntil   int64
	waitnote     note
	t            []*timer
}

如果当前机器上的处理器 P 的个数超过了 64,多个处理器上的计时器就可能存储在同一个桶中。每一个计时器桶都由一个运行 runtime.timerproc:76f4fd8 函数的 Goroutine 处理。

将全局计时器分片的方式,虽然能够降低锁的粒度,提高计时器的性能,但是 runtime.timerproc:76f4fd8 造成的处理器和线程之间频繁的上下文切换却成为了影响计时器性能的首要因素.

依赖netpoller.netpollBreak

  • 调整:
    • Timerheap和GMP中的P绑定
    • 去除唤醒 goroutine: timerproc
  • 检查:
    • 检查 timer 到期在特殊函数 checkTimers 中进行
    • 检查 timer 操作移至调度循环中进行
  • 工作窃取:
    • 在 work-stealing 中,会从其它 P 那里偷 timer
  • 兜底:
    • runtime.sysmon 中会为 timer 未被触发(timeSleepUntil)兜底,启动新线程

在最新版本的实现中,计时器桶已经被移除,所有的计时器都以最小四叉堆的形式存储在处理器 runtime.p 中。

处理器 runtime.p 中与计时器相关的有以下字段:

  • timersLock — 用于保护计时器的互斥锁;
  • timers — 存储计时器的最小四叉堆;
  • numTimers — 处理器中的计时器数量;
  • adjustTimers — 处理器中处于 timerModifiedEarlier 状态的计时器数量;
  • deletedTimers — 处理器中处于 timerDeleted 状态的计时器数量;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
type p struct {
    ...
    // 互斥锁
    timersLock mutex
    // 存储计时器的最小四叉堆
    timers []*timer
    // 计时器数量
    numTimers uint32
    // 处于 timerModifiedEarlier 状态的计时器数量
    adjustTimers uint32
    // 处于 timerDeleted 状态的计时器数量
    deletedTimers uint32
    ...
}

原本用于管理计时器的 runtime.timerproc:76f4fd8 也已经被移除,目前计时器都交由处理器的网络轮询器和调度器触发,这种方式能够充分利用本地性、减少上下文的切换开销,也是目前性能最好的实现方式。

time.Sleep 在老版本中会创建一个 goroutine,在 1.14(包含)之后不会创建 goroutine 了。

来看1个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package main

import (
	"fmt"
	"runtime"
	"time"
)

func main() {
    runtime.GOMAXPROCS(1)
    for i := 0; i < 10; i++ {
        i := i
        go func() {
            fmt.Println(i)
        }()
    }

    time.Sleep(time.Hour)
}

这一次,你还能正确回答 i 的输出顺序是什么吗?

我们直接揭晓答案。

当我们用 go1.13 运行时:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
$ go1.13.8 run main.go

0
1
2
3
4
5
6
7
8

而当我们用 go1.14 及之后的版本运行时:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
$ go1.14 run main.go

9
0
1
2
3
4
5
6
7
8

go 1.13 的 time 包会生产一个名字叫 timerproc 的 goroutine 出来,它专门用于唤醒挂在 timer 上的时间未到期的 goroutine;因此这个 goroutine 会把 runnext 上的 goroutine 挤出去。因此输出顺序就是:0, 1, 2, 3, 4, 5, 6, 7, 8, 9。

而 go 1.14 把这个唤醒的 goroutine 干掉了,取而代之的是,在调度循环的各个地方、sysmon 里都是唤醒 timer 的代码,timer 的唤醒更及时了,但代码也更难看懂了。

数据结构

timer

runtime.timer 是 Go 语言计时器的内部表示,每一个计时器都存储在对应处理器的最小四叉堆中,下面是运行时计时器对应的结构体:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type timer struct {
	pp puintptr

	when     int64
	period   int64
	f        func(interface{}, uintptr)
	arg      interface{}
	seq      uintptr
	nextwhen int64
	status   uint32
}
  • when — 当前计时器被唤醒的时间;
  • period — 唤醒间隔时间,定时器为Timer数据类型时,此字段值为 0 时,否则为 Ticker 数据类型.
  • f — 唤醒后执行的函数,不能为 闭包匿名函数
  • arg — 计时器被唤醒时调用 f 传入的参数;
  • nextWhen — 当定时器状态为 timerModifiedEarlier 或 timerModifiedLater 时,需要使用此字段值刷新为 when 字段
  • status — 计时器的状态;
  • pp - 当前对应的处理器P的指针

Timer

Timer 类型代表单次时间事件。当 Timer 到期时,当时的时间会被发送给 C (channel),除非 Timer 是被 AfterFunc 函数创建的。

注意:Timer 的实例必须通过 NewTimer 或 AfterFunc 获得。

然而这里的 runtime.timer 只是计时器运行时的私有结构体,对外暴露的计时器使用 time.Timer 结构体:

1
2
3
4
5
6
7
8
9
// src/runtime/sleep.go
// The Timer type represents a single event.
// When the Timer expires, the current time will be sent on C,
// unless the Timer was created by AfterFunc.
// A Timer must be created with NewTimer or AfterFunc.
type Timer struct {
    C <-chan Time
    r runtimeTimer
}

time.Timer 计时器必须通过 time.NewTimer、time.AfterFunc 或者 time.After 函数创建。 当计时器失效时,订阅计时器 Channel 的 Goroutine 会收到计时器失效的时间。

C 已经解释了,我们看看 runtimeTimer。它定义在 sleep.go 文件中,必须和 runtime 包中 time.go 文件中的 timer 必须保持一致:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
	tb uintptr
	i  int

	when   int64
	period int64
	f      func(interface{}, uintptr) // NOTE: must not be closure
	arg    interface{}
	seq    uintptr
}

四叉小顶堆

四叉堆高度上比二叉堆要矮一些。一个节点的所有(最多有4个)孩子节点都比这个节点要大。一个节点的(只有一个)父节点一定比当前节点小。下面是填好值之后的一个典型的四叉堆:

和二叉堆一样,对于一个节点的要求只有和其父节点以及子节点之间的大小关系。相邻节点之间没有任何关系。

向上调整

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Heap maintenance algorithms.
// These algorithms check for slice index errors manually.
// Slice index error can happen if the program is using racy
// access to timers. We don't want to panic here, because
// it will cause the program to crash with a mysterious
// "panic holding locks" message. Instead, we panic while not
// holding a lock.

func siftupTimer(t []*timer, i int) {
	if i >= len(t) {
		badTimer()
	}
    // 先暂存当前刚插入到数组尾部的节点
	when := t[i].when
	if when <= 0 {
		badTimer()
	}
	tmp := t[i]
    // 从当前插入节点的父节点开始
    // 如果最新插入的那个节点的触发时间要比父节点的触发时间更早
    // 那么就把这个父节点下移
	for i > 0 {
		p := (i - 1) / 4 // parent
		if when >= t[p].when {
			break
		}
		t[i] = t[p]
		i = p
	}
    // 如果发生过移动,用最新插入的节点
    // 覆盖掉最后一个下移的父节点
	if tmp != t[i] {
		t[i] = tmp
	}
}

向下调整

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 在所有孩子节点中先找出最小的那一个,如果最小的比当前要下移的节点还要大,那么就 break。反之,则将最小的节点上移,然后再判断这个最小节点的 4 个子节点是否都比要下移的节点大。
func siftdownTimer(t []*timer, i int) {
	n := len(t)
	if i >= n {
		badTimer()
	}
	when := t[i].when
	if when <= 0 {
		badTimer()
	}
	tmp := t[i]
	for {
		c := i*4 + 1 // left child
		c3 := c + 2  // mid child
		if c >= n {
			break
		}
		w := t[c].when
		if c+1 < n && t[c+1].when < w {
			w = t[c+1].when
			c++
		}
		if c3 < n {
			w3 := t[c3].when
			if c3+1 < n && t[c3+1].when < w3 {
				w3 = t[c3+1].when
				c3++
			}
			if w3 < w {
				w = w3
				c = c3
			}
		}
		if w >= when {
			break
		}
		t[i] = t[c]
		i = c
	}
	if tmp != t[i] {
		t[i] = tmp
	}
}

状态机

运行时使用状态机的方式处理全部的计时器,其中包括 10 种状态和几种操作。由于 Go 语言的计时器需要同时支持增加、删除、修改和重置等操作,所以它的状态非常复杂,目前会包含以下 10 种可能:

状态 解释
timerNoStatus 还没有设置状态
timerWaiting 等待计时器启动,定时器在P堆中
timerRunning 定时器运行中,只很短暂的时间持有此状态
timerDeleted 定时器删除状态,后期不会运行,但仍会存在于P堆中
timerRemoving 正在移除,只有很短暂的时间持有此状态
timerRemoved 已移除,不在P堆中
timerModifying 正在修改中,只有很短暂的时间持有此状态
timerModifiedEarlier 定时器已修改为较早的时间,此时新的when值存储于 nextwhen 字段中。在P堆中,但有可以存储在错误的地方
timerModifiedLater 定时器修改为相同或较晚的状态,此时新的when值存储于 nextwhen 字段中。在P堆中,但有可以存储在错误的地方
timerMoving 定时器已修改并正在动

上述表格已经展示了不同状态的含义,但是我们还需要展示一些重要的信息,例如状态的存在时间、计时器是否在堆上等:

  • timerRunning、timerRemoving、timerModifying 和 timerMoving — 停留的时间都比较短;
  • timerWaiting、timerRunning、timerDeleted、timerRemoving、timerModifying、timerModifiedEarlier、timerModifiedLater 和 timerMoving — 计时器在处理器的堆上;
  • timerNoStatus 和 timerRemoved — 计时器不在堆上;
  • timerModifiedEarlier 和 timerModifiedLater — 计时器虽然在堆上,但是可能位于错误的位置上,需要重新排序;

当我们操作计时器时,运行时会根据状态的不同而做出反应,所以在分析计时器时会将状态作为切入点分析其实现原理。计时器的状态机中包含如下所示的 7 种不同操作,它们分别承担了不同的职责:

  • runtime.addtimer — 向当前处理器增加新的计时器;
  • runtime.deltimer — 将计时器标记成 timerDeleted 删除处理器中的计时器;
  • runtime.modtimer — 网络轮询器会调用该函数修改计时器10;
  • runtime.cleantimers — 清除队列头中的计时器,能够提升程序创建和删除计时器的性能;
  • runtime.adjusttimers — 调整处理器持有的计时器堆,包括移动会稍后触发的计时器、删除标记为 timerDeleted 的计时器;
  • runtime.runtimer — 检查队列头中的计时器,在其准备就绪时运行该计时器;

我们在这里会依次分析计时器的上述 7 个不同操作。

addtimer

当我们调用 time.NewTimer 增加新的计时器时,会执行程序中的 runtime.addtimer 函数根据以下的规则处理计时器:

  • timerNoStatus -> timerWaiting
  • 其他状态 -> 崩溃:不合法的状态
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// addtimer adds a timer to the current P.
// This should only be called with a newly created timer.
// That avoids the risk of changing the when field of a timer in some P's heap,
// which could cause the heap to become unsorted.
func addtimer(t *timer) {
    // addtimer 会对 timer 被唤醒的时间 when 进行校验,以及校验 status 必须是新出初始化的 timer;
    // 定时器被唤醒的时间的时间不能为负数
	// when must be positive. A negative value will cause runtimer to
	// overflow during its delta calculation and never expire other runtime
	// timers. Zero will cause checkTimers to fail to notice the timer.
	if t.when <= 0 {
		throw("timer when must be positive")
	}
	if t.period < 0 {
		throw("timer period must be non-negative")
	}
    // 状态必须为初始化
	if t.status != timerNoStatus {
		throw("addtimer called with initialized timer")
	}
    // 设置为等待调度
	t.status = timerWaiting

	when := t.when

	// Disable preemption while using pp to avoid changing another P's heap.
	mp := acquirem()
    // 获取当前 P
	pp := getg().m.p.ptr()
    // 接着会在加锁后调用 cleantimers 对 P 中对应的 timer 列表的头节点进行清理工作,清理完后调用 doaddtimer 将 timer 加入到 P 的最小堆中,并释放锁;
	lock(&pp.timersLock)
    // 清理 P 的 timer 列表头中的 timer
	cleantimers(pp)
    // 将 timer 加入到 P 的最小堆中
	doaddtimer(pp, t)
	unlock(&pp.timersLock)
    // 唤醒 netpoller 中休眠的线程
    // 调用 wakeNetPoller 唤醒 netpoller 中休眠的线程。
	wakeNetPoller(when)

	releasem(mp)
}

操作顺序为

  1. 检查定时器状态和当前P的状态(初始化状态)是否满足条件
  2. 获取当前G所在的P
  3. 加P加 timerLock 锁
  4. 调用 cleantimers() 函数清除P队列头中的 timers,并将 timer 添加的P的最小堆中
  5. 解 timerLock 锁
  6. 调用 wakeNetPoller 函数,唤醒网络轮询器中休眠的线程,检查计时器被唤醒的时间(when)是否在当前轮询预期运行的时间(pollerPollUntil)内,若是则唤醒。

每次增加新的计时器都会中断正在阻塞的轮询,触发调度器检查是否有计时器到期,我们会在后面详细介绍计时器的触发过程。

doaddtimer

doaddtimer 函数实际上很简单,主要是将 timer 与 P 设置关联关系,并将 timer 加入到 P 的 timer 列表中,并维护 timer 列表最小堆的顺序。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// doaddtimer adds t to the current P's heap.
// The caller must have locked the timers for pp.
func doaddtimer(pp *p, t *timer) {
	// Timers rely on the network poller, so make sure the poller
	// has started.
    // Timers 依赖于 netpoller
    // 所以如果 netpoller 没有启动,需要启动一下
	if netpollInited == 0 {
		netpollGenericInit()
	}
    // 校验是否早已在 timer 列表中
	if t.pp != 0 {
		throw("doaddtimer: P already set in timer")
	}
    // 设置 timer 与 P 的关联
	t.pp.set(pp)
	i := len(pp.timers)
    // 将 timer 加入到 P 的 timer 列表中
	pp.timers = append(pp.timers, t)
    // 维护 timer 在 最小堆中的位置
	siftupTimer(pp.timers, i)
    // 如果 timer 是列表中头节点,需要设置一下 timer0When
	if t == pp.timers[0] {
		atomic.Store64(&pp.timer0When, uint64(t.when))
	}
	atomic.Xadd(&pp.numTimers, 1)
}

wakeNetPoller

wakeNetPoller 主要是将 timer 下次调度的时间和 netpoller 的下一次轮询时间相比,如果小于的话,调用 netpollBreak 向 netpollBreakWr 里面写入数据,立即中断netpoll.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// wakeNetPoller wakes up the thread sleeping in the network poller if it isn't
// going to wake up before the when argument; or it wakes an idle P to service
// timers and the network poller if there isn't one already.
func wakeNetPoller(when int64) {
    // 如果计时器的触发时间小于netpoller的下一次轮询时间
	if atomic.Load64(&sched.lastpoll) == 0 {
		// In findrunnable we ensure that when polling the pollUntil
		// field is either zero or the time to which the current
		// poll is expected to run. This can have a spurious wakeup
		// but should never miss a wakeup.
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > when {
            // 向netpollBreakWr里面写入数据,立即中断netpoll
			netpollBreak()
		}
	} else {
		// There are no threads in the network poller, try to get
		// one there so it can handle new timers.
		if GOOS != "plan9" { // Temporary workaround - see issue #42303.
			wakep()
		}
	}
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// netpollBreak interrupts a kevent.
func netpollBreak() {
	if atomic.Cas(&netpollWakeSig, 0, 1) {
		for {
			var b byte
			n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
			if n == 1 || n == -_EAGAIN {
				break
			}
			if n == -_EINTR {
				continue
			}
			println("runtime: netpollBreak write failed with", -n)
			throw("runtime: netpollBreak write failed")
		}
	}
}

那么它是如何做到的?我们下面先来看一个官方的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func TestNetpollBreak(t *testing.T) {
    if runtime.GOMAXPROCS(0) == 1 {
        t.Skip("skipping: GOMAXPROCS=1")
    }
    // 初始化 netpoll
    runtime.NetpollGenericInit()

    start := time.Now()
    c := make(chan bool, 2)
    go func() {
        c <- true
        // netpoll 等待时间
        runtime.Netpoll(10 * time.Second.Nanoseconds())
        c <- true
    }()
    <-c
loop:
    for {
        runtime.Usleep(100)
        // 中断netpoll 等待
        runtime.NetpollBreak()
        runtime.NetpollBreak()
        select {
        case <-c:
            break loop
        default:
        }
    }
    if dur := time.Since(start); dur > 5*time.Second {
        t.Errorf("netpollBreak did not interrupt netpoll: slept for: %v", dur)
    }
}

在上面这个例子中,首先会调用 runtime.Netpoll进行阻塞等待,然后循环调度 runtime.NetpollBreak进行中断阻塞。

runtime.netpoll

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    // 因为传入delay单位是纳秒,下面将纳秒转换成毫秒
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        waitms = 1e9
    }
    var events [128]epollevent
retry:
    // 等待文件描述符转换成可读或者可写
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    // 返回负值,那么重新调用epollwait进行等待
    if n < 0 {
        ...
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        // 如果是 NetpollBreak 中断的,那么执行 continue 跳过
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
                println("runtime: netpoll: break fd ready for", ev.events)
                throw("runtime: netpoll: break fd ready for something unexpected")
            }
            if delay != 0 {
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                atomic.Store(&netpollWakeSig, 0)
            }
            continue
        }
        ...
    }
    return toRun
}

在调用runtime.findrunnable执行抢占时,最后会传入一个时间,超时阻塞调用 netpoll,如果没有事件中断,那么循环调度会一直等待直到 netpoll 超时后才往下进行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func findrunnable() (gp *g, inheritTime bool) {
    ...
    delta := int64(-1)
    if pollUntil != 0 {
        // checkTimers ensures that polluntil > now.
        delta = pollUntil - now
    }
    ...
    // poll network
    // 休眠前再次检查 poll 网络
    if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        ...
        // 阻塞调用
        list := netpoll(delta)

    }
    ...
    // 休眠当前 M
    stopm()
    goto top
}

所以在调用 runtime.addtimer 添加 timer 的时候进行 netpoll 的中断操作可以更加灵敏的响应 timer 这类时间敏感的任务。

deltimer

runtime.deltimer 函数会标记需要删除的计时器,它会根据以下的规则处理计时器:

  • timerWaiting -> timerModifying -> timerDeleted
  • timerModifiedEarlier -> timerModifying -> timerDeleted
  • timerModifiedLater -> timerModifying -> timerDeleted
  • 其他状态 -> 等待状态改变或者直接返回

对于timer 的删除不能直接从堆中删除,因为它可能不在当前的P,而是在在其它的P堆上,所以只能将其标记为删除状态,并由持有计时器的处理器完成清除工作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
	for {
		switch s := atomic.Load(&t.status); s {
		case timerWaiting, timerModifiedLater:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp := acquirem()
			if atomic.Cas(&t.status, s, timerModifying) {
				// Must fetch t.pp before changing status,
				// as cleantimers in another goroutine
				// can clear t.pp of a timerDeleted timer.
				tpp := t.pp.ptr()
				if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
					badTimer()
				}
				releasem(mp)
				atomic.Xadd(&tpp.deletedTimers, 1)
				// Timer was not yet run.
				return true
			} else {
				releasem(mp)
			}
		case timerModifiedEarlier:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp := acquirem()
			if atomic.Cas(&t.status, s, timerModifying) {
				// Must fetch t.pp before setting status
				// to timerDeleted.
				tpp := t.pp.ptr()
				atomic.Xadd(&tpp.adjustTimers, -1)
				if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
					badTimer()
				}
				releasem(mp)
				atomic.Xadd(&tpp.deletedTimers, 1)
				// Timer was not yet run.
				return true
			} else {
				releasem(mp)
			}
		case timerDeleted, timerRemoving, timerRemoved:
			// Timer was already run.
			return false
		case timerRunning, timerMoving:
			// The timer is being run or moved, by a different P.
			// Wait for it to complete.
			osyield()
		case timerNoStatus:
			// Removing timer that was never added or
			// has already been run. Also see issue 21874.
			return false
		case timerModifying:
			// Simultaneous calls to deltimer and modtimer.
			// Wait for the other call to complete.
			osyield()
		default:
			badTimer()
		}
	}
}

modtimer

runtime.modtimer 会修改已经存在的计时器,它会根据以下的规则处理计时器:

  • timerWaiting -> timerModifying -> timerModifiedXX
  • timerModifiedXX -> timerModifying -> timerModifiedYY
  • timerNoStatus -> timerModifying -> timerWaiting
  • timerRemoved -> timerModifying -> timerWaiting
  • timerDeleted -> timerModifying -> timerWaiting
  • 其他状态 -> 等待状态改变

modtimer 进入到 for 循环后会根据不同的状态做状态设置以及必要字段的处理;如果是 timer 已被删除,那么需要重新添加到 timer 列表中;如果 timer 修改后的时间小于修改前的时间,将状态设置为 timerModifiedEarlier,修改时间提前,还需要触发 netpoll 中断。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
	if when <= 0 {
		throw("timer when must be positive")
	}
	if period < 0 {
		throw("timer period must be non-negative")
	}

	status := uint32(timerNoStatus)
	wasRemoved := false
	var pending bool
	var mp *m
loop:
	for {
        // 修改 timer 状态
		switch status = atomic.Load(&t.status); status {
		case timerWaiting, timerModifiedEarlier, timerModifiedLater:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()
			if atomic.Cas(&t.status, status, timerModifying) {
				pending = true // timer not yet run
				break loop
			}
			releasem(mp)
		case timerNoStatus, timerRemoved:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()

			// Timer was already run and t is no longer in a heap.
			// Act like addtimer.
			if atomic.Cas(&t.status, status, timerModifying) {
				wasRemoved = true
				pending = false // timer already run or stopped
				break loop
			}
			releasem(mp)
		case timerDeleted:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()
			if atomic.Cas(&t.status, status, timerModifying) {
				atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
				pending = false // timer already stopped
				break loop
			}
			releasem(mp)
		case timerRunning, timerRemoving, timerMoving:
			// The timer is being run or moved, by a different P.
			// Wait for it to complete.
			osyield()
		case timerModifying:
			// Multiple simultaneous calls to modtimer.
			// Wait for the other call to complete.
			osyield()
		default:
			badTimer()
		}
	}

	t.period = period
	t.f = f
	t.arg = arg
	t.seq = seq
    // 如果 timer 已被删除,那么需要重新添加到 timer 列表中
	if wasRemoved {
		t.when = when
		pp := getg().m.p.ptr()
		lock(&pp.timersLock)
		doaddtimer(pp, t)
		unlock(&pp.timersLock)
		if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
			badTimer()
		}
		releasem(mp)
		wakeNetPoller(when)
	} else {
		// The timer is in some other P's heap, so we can't change
		// the when field. If we did, the other P's heap would
		// be out of order. So we put the new when value in the
		// nextwhen field, and let the other P set the when field
		// when it is prepared to resort the heap.
		t.nextwhen = when

		newStatus := uint32(timerModifiedLater)
        // 如果修改后的时间小于修改前的时间,将状态设置为 timerModifiedEarlier
		if when < t.when {
			newStatus = timerModifiedEarlier
		}

		tpp := t.pp.ptr()

		// Update the adjustTimers field.  Subtract one if we
		// are removing a timerModifiedEarlier, add one if we
		// are adding a timerModifiedEarlier.
		adjust := int32(0)
		if status == timerModifiedEarlier {
			adjust--
		}
		if newStatus == timerModifiedEarlier {
			adjust++
			updateTimerModifiedEarliest(tpp, when)
		}
		if adjust != 0 {
			atomic.Xadd(&tpp.adjustTimers, adjust)
		}

		// Set the new status of the timer.
		if !atomic.Cas(&t.status, timerModifying, newStatus) {
			badTimer()
		}
		releasem(mp)

		// If the new status is earlier, wake up the poller.
        // 如果修改时间提前,那么触发 netpoll 中断
		if newStatus == timerModifiedEarlier {
			wakeNetPoller(when)
		}
	}

	return pending
}

如果待修改的计时器已经被删除,那么该函数会调用 runtime.doaddtimer 创建新的计时器。在正常情况下会根据修改后的时间进行不同的处理:

  • 如果修改后的时间大于或者等于修改前时间,设置计时器的状态为 timerModifiedLater;
  • 如果修改后的时间小于修改前时间,设置计时器的状态为 timerModifiedEarlier 并调用 runtime.netpollBreak 触发调度器的重新调度;

因为修改后的时间会影响计时器的处理,所以用于修改计时器的 runtime.modtimer 也是状态机中最复杂的函数了。

cleantimers

runtime.cleantimers 函数会根据状态清理处理器队列头中的计时器,该函数会遵循以下的规则修改计时器的触发时间:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting

cleantimers 函数中使用了一个无限循环来获取头节点。如果头节点的状态是 timerDeleted ,那么需要从 timer 列表中删除;如果头节点的状态是 timerModifiedEarlier 或 timerModifiedLater ,表示头节点的触发的时间被修改到了更早或更晚的时间,那么就先从timer队列中删除再重新添加。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// cleantimers cleans up the head of the timer queue. This speeds up
// programs that create and delete timers; leaving them in the heap
// slows down addtimer. Reports whether no timer problems were found.
// The caller must have locked the timers for pp.
func cleantimers(pp *p) {
	gp := getg()
	for {
        // 调度器列表为空,直接返回
		if len(pp.timers) == 0 {
			return
		}

		// This loop can theoretically run for a while, and because
		// it is holding timersLock it cannot be preempted.
		// If someone is trying to preempt us, just return.
		// We can clean the timers later.
        // 如果当前 G 被抢占了,直接返回
		if gp.preemptStop {
			return
		}
        // 获取第一个 timer
		t := pp.timers[0]
		if t.pp.ptr() != pp {
			throw("cleantimers: bad p")
		}
		switch s := atomic.Load(&t.status); s {
		case timerDeleted:
            // 设置 timer 的状态
			if !atomic.Cas(&t.status, s, timerRemoving) {
				continue
			}
            // 删除第一个 timer
			dodeltimer0(pp)
            // 删除完毕后重置状态为 timerRemoved
			if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
				badTimer()
			}
			atomic.Xadd(&pp.deletedTimers, -1)
        // timer 被修改到了更早或更晚的时间
		case timerModifiedEarlier, timerModifiedLater:
            // 将 timer 状态设置为 timerMoving
			if !atomic.Cas(&t.status, s, timerMoving) {
				continue
			}
			// Now we can change the when field.
            // 重新设置 when 字段
			t.when = t.nextwhen
			// Move t to the right position.
            // 在列表中删除后重新加入
			dodeltimer0(pp)
			doaddtimer(pp, t)
			if s == timerModifiedEarlier {
				atomic.Xadd(&pp.adjustTimers, -1)
			}
            // 设置状态为 timerWaiting
			if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
				badTimer()
			}
		default:
			// Head of timers does not need adjustment.
			return
		}
	}
}

runtime.cleantimers 函数只会处理计时器状态为 timerDeleted、timerModifiedEarlier 和 timerModifiedLater 的情况:

  • 如果计时器的状态为 timerDeleted;
    • 将计时器的状态修改成 timerRemoving;
    • 调用 runtime.dodeltimer0 删除四叉堆顶上的计时器;
    • 将计时器的状态修改成 timerRemoved;
  • 如果计时器的状态为 timerModifiedEarlier 或者 timerModifiedLater;
    • 将计时器的状态修改成 timerMoving;
    • 使用计时器下次触发的时间 nextWhen 覆盖 when;
    • 调用 runtime.dodeltimer0 删除四叉堆顶上的计时器;
    • 调用 runtime.doaddtimer 将计时器加入四叉堆中;
    • 将计时器的状态修改成 timerWaiting;

runtime.cleantimers 会删除已经标记的计时器,修改状态为 timerModifiedXX 的计时器。

adjusttimers

runtime.adjusttimersruntime.cleantimers 的作用相似,它们都会删除堆中的计时器并修改状态为 timerModifiedEarliertimerModifiedLater 的计时器的时间,它们也会遵循相同的规则处理计时器状态:

  • timerDeleted -> timerRemoving -> timerRemoved
  • timerModifiedXX -> timerMoving -> timerWaiting
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// adjusttimers looks through the timers in the current P's heap for
// any timers that have been modified to run earlier, and puts them in
// the correct place in the heap. While looking for those timers,
// it also moves timers that have been modified to run later,
// and removes deleted timers. The caller must have locked the timers for pp.
func adjusttimers(pp *p, now int64) {
	if atomic.Load(&pp.adjustTimers) == 0 {
		if verifyTimers {
			verifyTimerHeap(pp)
		}
		// There are no timers to adjust, so it is safe to clear
		// timerModifiedEarliest. Do so in case it is stale.
		// Everything will work if we don't do this,
		// but clearing here may save future calls to adjusttimers.
		atomic.Store64(&pp.timerModifiedEarliest, 0)
		return
	}

	// If we haven't yet reached the time of the first timerModifiedEarlier
	// timer, don't do anything. This speeds up programs that adjust
	// a lot of timers back and forth if the timers rarely expire.
	// We'll postpone looking through all the adjusted timers until
	// one would actually expire.
	if first := atomic.Load64(&pp.timerModifiedEarliest); first != 0 {
		if int64(first) > now {
			if verifyTimers {
				verifyTimerHeap(pp)
			}
			return
		}

		// We are going to clear all timerModifiedEarlier timers.
		atomic.Store64(&pp.timerModifiedEarliest, 0)
	}

	var moved []*timer
loop:
	for i := 0; i < len(pp.timers); i++ {
		t := pp.timers[i]
		if t.pp.ptr() != pp {
			throw("adjusttimers: bad p")
		}
		switch s := atomic.Load(&t.status); s {
		case timerDeleted:
			if atomic.Cas(&t.status, s, timerRemoving) {
				dodeltimer(pp, i)
				if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
					badTimer()
				}
				atomic.Xadd(&pp.deletedTimers, -1)
				// Look at this heap position again.
				i--
			}
		case timerModifiedEarlier, timerModifiedLater:
			if atomic.Cas(&t.status, s, timerMoving) {
				// Now we can change the when field.
				t.when = t.nextwhen
				// Take t off the heap, and hold onto it.
				// We don't add it back yet because the
				// heap manipulation could cause our
				// loop to skip some other timer.
				dodeltimer(pp, i)
				moved = append(moved, t)
				if s == timerModifiedEarlier {
					if n := atomic.Xadd(&pp.adjustTimers, -1); int32(n) <= 0 {
						break loop
					}
				}
				// Look at this heap position again.
				i--
			}
		case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving:
			badTimer()
		case timerWaiting:
			// OK, nothing to do.
		case timerModifying:
			// Check again after modification is complete.
			osyield()
			i--
		default:
			badTimer()
		}
	}

	if len(moved) > 0 {
		addAdjustedTimers(pp, moved)
	}

	if verifyTimers {
		verifyTimerHeap(pp)
	}
}

与 runtime.cleantimers 不同的是,上述函数可能会遍历处理器堆中的全部计时器(包含退出条件),而不是只修改四叉堆顶部。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// dodeltimer removes timer i from the current P's heap.
// We are locked on the P when this is called.
// It reports whether it saw no problems due to races.
// The caller must have locked the timers for pp.
func dodeltimer(pp *p, i int) {
	if t := pp.timers[i]; t.pp.ptr() != pp {
		throw("dodeltimer: wrong P")
	} else {
		t.pp = 0
	}
	last := len(pp.timers) - 1
    // 把 timer[i] 替换为 timer[last]
	if i != last {
		pp.timers[i] = pp.timers[last]
	}
    // 删除 timer[last],并缩小 slice
	pp.timers[last] = nil
	pp.timers = pp.timers[:last]
    // 判断是不是删的最后一个
    // 如果不是的话,需要重新调整堆
	if i != last {
		// Moving to i may have moved the last timer to a new parent,
		// so sift up to preserve the heap guarantee.
        // 最后一个节点当前来的分叉可能并不是它那个分叉
        // 所以向上走或者向下走都是有可能的
        // 即使是二叉堆,也是有这种可能的
		siftupTimer(pp.timers, i)
		siftdownTimer(pp.timers, i)
	}
	if i == 0 {
		updateTimer0When(pp)
	}
	atomic.Xadd(&pp.numTimers, -1)
}

runtimer

runtime.runtimer 函数会检查处理器四叉堆上最顶上的计时器,该函数也会处理计时器的删除以及计时器时间的更新,它会遵循以下的规则处理计时器:

  • timerNoStatus -> 崩溃:未初始化的计时器
  • timerWaiting
    • -> timerWaiting
    • -> timerRunning -> timerNoStatus
    • -> timerRunning -> timerWaiting
  • timerModifying -> 等待状态改变
  • timerModifiedXX -> timerMoving -> timerWaiting
  • timerDeleted -> timerRemoving -> timerRemoved
  • timerRunning -> 崩溃:并发调用该函数
  • timerRemoved、timerRemoving、timerMoving -> 崩溃:计时器堆不一致

timer 的运行是交给 runtime.runtimer函数执行的,这个函数会检查 P 上最小堆的最顶上的 timer 的状态,根据状态做不同的处理。

runtimer 里面会启动一个 for 循环,不停的检查 P 的 timer 列表的第一个元素的状态。

  • 如果该 timer 处于 timerWaiting,那么判断当前的时间大于 timer 执行的时间,则调用 runOneTimer 执行;
  • 如果该 timer 处于 timerDeleted,表示该 timer 是需要被删除的,那么调用 dodeltimer0 删除最小堆的第一个 timer ,并修改其状态;
  • 如果该 timer 状态是 timerModifiedEarlier 、timerModifiedLater,那么表示该 timer 的执行时间被修改过,需要重新调整它在最小堆中的位置,所以先调用 dodeltimer0 删除该 timer,再调用 doaddtimer 将该 timer 重新添加到最小堆。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// runtimer examines the first timer in timers. If it is ready based on now,
// it runs the timer and removes or updates it.
// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
// when the first timer should run.
// The caller must have locked the timers for pp.
// If a timer is run, this will temporarily unlock the timers.
//go:systemstack
func runtimer(pp *p, now int64) int64 {
	for {
        // 获取最小堆的第一个元素
		t := pp.timers[0]
		if t.pp.ptr() != pp {
			throw("runtimer: bad p")
		}
        // 获取 timer 状态
		switch s := atomic.Load(&t.status); s {
		case timerWaiting:
            // 还没到时间,返回下次执行时间
			if t.when > now {
				// Not ready to run.
				return t.when
			}
            // 修改状态为 timerRunning
			if !atomic.Cas(&t.status, s, timerRunning) {
				continue
			}
			// Note that runOneTimer may temporarily unlock
			// pp.timersLock.
            // 运行该 timer
			runOneTimer(pp, t, now)
			return 0

		case timerDeleted:
			if !atomic.Cas(&t.status, s, timerRemoving) {
				continue
			}
            // 删除最小堆的第一个 timer
			dodeltimer0(pp)
			if !atomic.Cas(&t.status, timerRemoving, timerRemoved) {
				badTimer()
			}
			atomic.Xadd(&pp.deletedTimers, -1)
			if len(pp.timers) == 0 {
				return -1
			}
        // 需要重新移动位置的 timer
		case timerModifiedEarlier, timerModifiedLater:
			if !atomic.Cas(&t.status, s, timerMoving) {
				continue
			}
			t.when = t.nextwhen
            // 删除最小堆的第一个 timer
			dodeltimer0(pp)
            // 将该 timer 重新添加到最小堆
			doaddtimer(pp, t)
			if s == timerModifiedEarlier {
				atomic.Xadd(&pp.adjustTimers, -1)
			}
			if !atomic.Cas(&t.status, timerMoving, timerWaiting) {
				badTimer()
			}

		case timerModifying:
			// Wait for modification to complete.
			osyield()

		case timerNoStatus, timerRemoved:
			// Should not see a new or inactive timer on the heap.
			badTimer()
		case timerRunning, timerRemoving, timerMoving:
			// These should only be set when timers are locked,
			// and we didn't do it.
			badTimer()
		default:
			badTimer()
		}
	}
}

runOneTimer

runOneTimer 会根据 period 是否大于0判断该 timer 是否需要反复执行,如果是的话需要重新调整 when 下次执行时间后重新调整该 timer 在堆中的位置。一次性 timer 的话会执行 dodeltimer0 删除该 timer ,最后运行 timer 中的函数;

如果处理器四叉堆顶部的计时器没有到触发时间会直接返回,否则调用 runtime.runOneTimer 运行堆顶的计时器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// runOneTimer runs a single timer.
// The caller must have locked the timers for pp.
// This will temporarily unlock the timers while running the timer function.
//go:systemstack
func runOneTimer(pp *p, t *timer, now int64) {
	if raceenabled {
		ppcur := getg().m.p.ptr()
		if ppcur.timerRaceCtx == 0 {
			ppcur.timerRaceCtx = racegostart(funcPC(runtimer) + sys.PCQuantum)
		}
		raceacquirectx(ppcur.timerRaceCtx, unsafe.Pointer(t))
	}
    // 需要被执行的函数
	f := t.f
    // 被执行函数的参数
	arg := t.arg
	seq := t.seq
    // 表示该 timer 为 ticker,需要再次触发
	if t.period > 0 {
		// Leave in heap but adjust next time to fire.
        // 放入堆中并调整触发时间
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		if t.when < 0 { // check for overflow.
			t.when = maxWhen
		}
		siftdownTimer(pp.timers, 0)
		if !atomic.Cas(&t.status, timerRunning, timerWaiting) {
			badTimer()
		}
		updateTimer0When(pp)
	} else {
		// Remove from heap.
		dodeltimer0(pp)
		if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {
			badTimer()
		}
	}

	if raceenabled {
		// Temporarily use the current P's racectx for g0.
		gp := getg()
		if gp.racectx != 0 {
			throw("runOneTimer: unexpected racectx")
		}
		gp.racectx = gp.m.p.ptr().timerRaceCtx
	}

	unlock(&pp.timersLock)

	f(arg, seq)

	lock(&pp.timersLock)

	if raceenabled {
		gp := getg()
		gp.racectx = 0
	}
}

根据计时器的 period 字段,上述函数会做出不同的处理:

  • 如果 period 字段大于 0;
    • 修改计时器下一次触发的时间并更新其在堆中的位置;
    • 将计时器的状态更新至 timerWaiting;
    • 调用 runtime.updateTimer0When 函数设置处理器的 timer0When 字段;
  • 如果 period 字段小于或者等于 0;
    • 调用 runtime.dodeltimer0 函数删除计时器;
    • 将计时器的状态更新至 timerNoStatus;

更新计时器之后,上述函数会运行计时器中存储的函数并传入触发时间等参数。

触发时机

我们在上一小节已经分析了计时器状态机中的 10 种状态以及几种操作。这里将分析器的触发过程,Go 语言会在两个模块触发计时器,运行计时器中保存的函数:

  • 调度器调度时会检查处理器中的计时器是否准备就绪;
  • 系统监控会检查是否有未执行的到期计时器;

我们将依次分析上述这两个触发过程。

调度器

runtime.checkTimers 是调度器用来运行处理器中计时器的函数,它会在发生以下情况时被调用:

  • 调度器调用 runtime.schedule 执行调度时;
  • 调度器调用 runtime.findrunnable 获取可执行的 Goroutine 时;
  • 调度器调用 runtime.findrunnable 从其他处理器窃取计时器时;

runtime.schedule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
func schedule() {
    _g_ := getg()
    ...
top:
    pp := _g_.m.p.ptr()
    ...
    // 检查是否有可执行 timer 并执行
    checkTimers(pp, 0)
    var gp *g
    ...
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }
    ...
    execute(gp, inheritTime)
}

runtime.findrunnable

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run, handoffp must start
	// an M.

top:
	_p_ := _g_.m.p.ptr()
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	if _p_.runSafePointFn != 0 {
		runSafePointFn()
	}

	now, pollUntil, _ := checkTimers(_p_, 0)
    ...
}

checkTimers 中主要做了这么几件事:

  1. 检查是否有timer, 如果没有需要执行的计时器时,直接返回;如果下一个要执行的 timer 没有到期并且需要删除的计时器较少(四分之一)时也会直接返回;
  2. 调用 adjusttimers 进行 timer 列表的调整,主要是维护 timer 列表的最小堆的顺序;
  3. 调用 runtime.runtimer查找堆中是否存在需要执行的timer, runtime.runtimer上面已经讲过了,这里不再赘述;
  4. 如果当前 Goroutine 的 P 和传入的 P 相同,并且需要删除的 timer 超过了 timer 列表数量的四分之一,那么调用 clearDeletedTimers 清理需要删除的 timer;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the current time or 0 if it is not known,
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
// it is always larger than the returned time.
// We pass now in and out to avoid extra calls of nanotime.
//go:yeswritebarrierrec
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
	// If it's not yet time for the first timer, or the first adjusted
	// timer, then there is nothing to do.
    // 获取 timer0 的执行时间
	next := int64(atomic.Load64(&pp.timer0When))
	nextAdj := int64(atomic.Load64(&pp.timerModifiedEarliest))
	if next == 0 || (nextAdj != 0 && nextAdj < next) {
		next = nextAdj
	}
    // 如果没有timer
	if next == 0 {
		// No timers to run or adjust.
		return now, 0, false
	}

	if now == 0 {
		now = nanotime()
	}
    // 下次执行大于当前时间,
	if now < next {
		// Next timer is not ready to run, but keep going
		// if we would clear deleted timers.
		// This corresponds to the condition below where
		// we decide whether to call clearDeletedTimers.
        // 如果需要删除的 timer 个数小于 timer列表个数的4分之1,直接返回
		if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) {
			return now, next, false
		}
	}

	lock(&pp.timersLock)

	if len(pp.timers) > 0 {
        // 如果有计时器,则调用函数 adjusttimers() 进行调用(堆维护)
		adjusttimers(pp, now)
        // 调整了堆中的计时器之后,会通过 runtime.runtimer 依次查找堆中是否存在需要执行的计时器:
        // * 如果存在,直接运行计时器;
        // * 如果不存在,获取最新计时器的触发时间;
		for len(pp.timers) > 0 {
			// Note that runtimer may temporarily unlock
			// pp.timersLock.
            // 查找堆中是否存在需要执行的 timer
			if tw := runtimer(pp, now); tw != 0 {
				if tw > 0 {
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}

	// If this is the local P, and there are a lot of deleted timers,
	// clear them out. We only do this for the local P to reduce
	// lock contention on timersLock.
    // 如果需要删除的 timer 超过了 timer 列表数量的四分之一,那么清理需要删除的 timer
    // 在 runtime.checkTimers 的最后,如果当前 Goroutine 的处理器和传入的处理器相同,并且处理器中删除的计时器是堆中计时器的 1/4 以上,就会调用 runtime.clearDeletedTimers 删除处理器全部被标记为 timerDeleted 的计时器,保证堆中靠后的计时器被删除。
	if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 {
        // runtime.clearDeletedTimers 能够避免堆中出现大量长时间运行的计时器,该函数和 runtime.moveTimers 也是唯二会遍历计时器堆的函数。
		clearDeletedTimers(pp)
	}

	unlock(&pp.timersLock)

	return now, pollUntil, ran
}

系统监控

系统监控其实就是 Go 语言的守护进程,它们能够在后台监控系统的运行状态,在出现意外情况时及时响应。它会每隔一段时间检查 Go 语言运行时状态,确保没有异常发生。我们这里不主要去讲系统监控,只抽离出其中的和 timer 相关的代码进行讲解。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func sysmon() {
    ...
	for {
        ...
		// sysmon should not enter deep sleep if schedtrace is enabled so that
		// it can print that information at the right time.
		//
		// It should also not enter deep sleep if there are any active P's so
		// that it can retake P's from syscalls, preempt long running G's, and
		// poll the network if all P's are busy for long stretches.
		//
		// It should wakeup from deep sleep if any P's become active either due
		// to exiting a syscall or waking up due to a timer expiring so that it
		// can resume performing those duties. If it wakes from a syscall it
		// resets idle and delay as a bet that since it had retaken a P from a
		// syscall before, it may need to do it again shortly after the
		// application starts work again. It does not reset idle when waking
		// from a timer to avoid adding system load to applications that spend
		// most of their time sleeping.
		now := nanotime()
		if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
			lock(&sched.lock)
			if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
				syscallWake := false
                // 获取下一次计时器启动的时间和持有该堆的P
				next, _ := timeSleepUntil()
				if next > now {
					atomic.Store(&sched.sysmonwait, 1)
					unlock(&sched.lock)
					// Make wake-up period small enough
					// for the sampling to be correct.
                    // 计算离下次启动计时器的时间
					sleep := forcegcperiod / 2
					if next-now < sleep {
						sleep = next - now
					}
                     // 休眠一段时间,唤醒后将直接执行堆中的timers
					shouldRelax := sleep >= osRelaxMinNS
					if shouldRelax {
						osRelax(true)
					}
					syscallWake = notetsleep(&sched.sysmonnote, sleep)
					mDoFixup()
					if shouldRelax {
						osRelax(false)
					}
					lock(&sched.lock)
					atomic.Store(&sched.sysmonwait, 0)
					noteclear(&sched.sysmonnote)
				}
				if syscallWake {
					idle = 0
					delay = 20
				}
			}
			unlock(&sched.lock)
		}

		lock(&sched.sysmonlock)
		// Update now in case we blocked on sysmonnote or spent a long time
		// blocked on schedlock or sysmonlock above.
		now = nanotime()

		// trigger libc interceptors if needed
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
        // 如果超过 10ms 没有 poll,则 poll 一下网络
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            list := netpoll(0) // 非阻塞,返回 G 列表
            // G 列表不为空
            if !list.empty() {
                incidlelocked(-1)
                // 将获取到的 G 列表插入到空闲的 P 中或全局列表中
                injectglist(&list)
                incidlelocked(1)
            }
        }
        // 如果有 timer 到期
        if next < now {
            // 启动新的 M 处理 timer
            startm(nil, false)
        }
        ...
    }
}
  1. sysmon 会调用 timeSleepUntil() 函数,遍历所有的P,找出下次最先执行(时间值最小)的时间和其所在的P.
  2. 调用 notesleep() 函数休眠一段时间。待唤醒后将自动执行堆上的timers。另外还有一个 notesleepg() 函数,区域是 notesleep() 调用者是否为g0
  3. 如果超过 10ms 没有 poll,则 poll 一下网络;
  4. 如果有 timer 到期,这个时候直接启动新的 M 处理 timer;

在上述过程中 runtime.timeSleepUntil 会遍历运行时的全部处理器并查找下一个需要执行的计时器。

说明一下,在sysmon 监控线程里没有找到过期 timer 情况下的处理逻辑,只有未过期的处理逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// timeSleepUntil returns the time when the next timer should fire,
// and the P that holds the timer heap that that timer is on.
// This is only called by sysmon and checkdead.
func timeSleepUntil() (int64, *p) {
	next := int64(maxWhen)
	var pret *p

	// Prevent allp slice changes. This is like retake.
	lock(&allpLock)
	for _, pp := range allp {
		if pp == nil {
			// This can happen if procresize has grown
			// allp but not yet created new Ps.
			continue
		}

		w := int64(atomic.Load64(&pp.timer0When))
		if w != 0 && w < next {
			next = w
			pret = pp
		}

		w = int64(atomic.Load64(&pp.timerModifiedEarliest))
		if w != 0 && w < next {
			next = w
			pret = pp
		}
	}
	unlock(&allpLock)

	return next, pret
}

核心API

NewTimer

我们先看看NewTimer方法是如何创建一个Timer的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1) // 创建一个带有一个Time结构缓冲的通道
	t := &Timer{
		C: c,
		r: runtimeTimer{// 运行时定时器
			when: when(d),// 定时多久
			f:    sendTime,// Golang写入时间的回调接口
			arg:  c,// 往哪个通道写入时间
		},
	}
	startTimer(&t.r)// 启动提交定时器
	return t
}

// when is a helper function for setting the 'when' field of a runtimeTimer.
// It returns what the time will be, in nanoseconds, Duration d in the future.
// If d is negative, it is ignored. If the returned value would be less than
// zero because of an overflow, MaxInt64 is returned.
func when(d Duration) int64 {
	if d <= 0 {
		return runtimeNano()
	}
	t := runtimeNano() + int64(d)
	if t < 0 {
		t = 1<<63 - 1 // math.MaxInt64
	}
	return t
}
  • when 表示时间到时,会往 Timer.C 中发送当前时间。when 表示的时间是纳秒时间,正常通过 runtimeNano() + int64(d) 赋值。
  • f 参数的值是 sendTime,定时器时间到时,会调用 f,并将 arg 和 seq 传给 f。
  • 因为 Timer 是一次性的,所以 period 保留默认值 0。
  • runtimeTimer 结构中的 i 字段,表示在堆中的索引.

在time包中,函数f有三种类型:

  • sendTime:发送当前时间到channel或者在发送被阻塞的情况下丢弃。被time.Timer和time.Ticker使用。
  • goFunc:在goroutine中执行一些函数。被time.AfterFunc使用。
  • goroutineReady:唤醒特定的goroutine。被runtime.timeSleep使用。

NewTimer方法主要是初始化一个Timer,然后调用startTimer方法,并返回Timer。startTimer方法的真正逻辑并不在time包里面,我们可以使用dlv调试汇编代码:

1
sleep.go:94     0xd8ea09        e872c7faff              call $time.startTimer

得知startTimer实际上调用的是runtime.time.startTimer方法。也就是说time.Timer只是对runtime包中timer的一层wrap。这层自身实现的最核心功能是将底层的超时回调转换为发送channel消息。

startTimer方法会将传入的runtimeTimer转为timer,然后调用addtimer方法。

1
2
3
4
5
6
7
8
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	addtimer(t)
}

Timer在超时(timer expire)后,执行一个标准库中内置的函数:sendTime。sendTime将当前当前事件send到timer的时间Channel中.

我们看到NewTimer中创建了一个buffered channel,size = 1。正常情况下,当timer expire,t.C无论是否有goroutine在read,sendTime都可以non-block的将当前时间发送到C中;同时,我们看到sendTime还加了双保险:通过一个select判断c buffer是否已满,一旦满了,直接退出,依然不会block,这种情况在reuse active timer时可能会遇到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 时间到后,Golang自动调用sendTime接口,尝试往c通道写入时间
func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
    // because the sends are periodic.
    // 给c通道以非阻塞方式发送时间
    // 如果被用于NewTimer, 无论如何不能阻塞.
    // 如果被用于NewTicker,接收方未及时接受时间,则会丢弃掉,因为发送时间是周期性的。
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

timer.After

1
2
3
4
5
6
7
8
9
// After waits for the duration to elapse and then sends the current time
// on the returned channel.
// It is equivalent to NewTimer(d).C.
// The underlying Timer is not recovered by the garbage collector
// until the timer fires. If efficiency is a concern, use NewTimer
// instead and call Timer.Stop if the timer is no longer needed.
func After(d Duration) <-chan Time {
	return NewTimer(d).C
}

Timer.AfterFunc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// AfterFunc waits for the duration to elapse and then calls f
// in its own goroutine. It returns a Timer that can
// be used to cancel the call using its Stop method.
func AfterFunc(d Duration, f func()) *Timer {
	t := &Timer{
		r: runtimeTimer{
			when: when(d),
			f:    goFunc,
			arg:  f,
		},
	}
	startTimer(&t.r)
	return t
}

func goFunc(arg interface{}, seq uintptr) {
	go arg.(func())()
}

注意:从AfterFunc源码可以看到,外面传入的f参数并非直接赋值给了内部的f,而是作为wrapper function:goFunc的arg传入的。而goFunc则是启动了一个新的goroutine来执行那个外部传入的f。这是因为timer expire对应的事件处理函数的执行是在go runtime内唯一的timer events maintenance goroutine: timerproc中。为了不block timerproc的执行,必须启动一个新的goroutine。

timer.Tick

1
2
3
4
5
6
func Tick(d Duration) <-chan Time {
    if d <= 0 {
        return nil
    }
    return NewTicker(d).C
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// NewTicker 会返回一个 Ticker 对象,其 channel 每隔 period 时间
// 会收到一个时间值
// 如果 receiver 接收慢了,Ticker 会把不需要的 tick drop 掉
// d 必须比 0 大,否则 panic
// Stop ticker 才能释放相关的资源
// NewTicker returns a new Ticker containing a channel that will send
// the time on the channel after each tick. The period of the ticks is
// specified by the duration argument. The ticker will adjust the time
// interval or drop ticks to make up for slow receivers.
// The duration d must be greater than zero; if not, NewTicker will
// panic. Stop the ticker to release associated resources.
func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	// Give the channel a 1-element time buffer.
	// If the client falls behind while reading, we drop ticks
	// on the floor until the client catches up.
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

可以看到, Ticker 和 Timer 的 r 成员就只差在 period 这一个字段上,每隔一个 period 就往 channel 里发数据的就是 Ticker,而 fire and disappear 的就是 Timer。

Timer.Reset

对于 timer.Reset() 函数,对应的是resetTimer

这与修改操作对应的是同一个 modtimer() 函数。

对于这个函数的一般会被网络轮询、timer.Ticker.Reset 或 timer.Timer.Reset 函数调用。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Reset changes the timer to expire after duration d.
// It returns true if the timer had been active, false if the timer had
// expired or been stopped.
//
// For a Timer created with NewTimer, Reset should be invoked only on
// stopped or expired timers with drained channels.
//
// If a program has already received a value from t.C, the timer is known
// to have expired and the channel drained, so t.Reset can be used directly.
// If a program has not yet received a value from t.C, however,
// the timer must be stopped and—if Stop reports that the timer expired
// before being stopped—the channel explicitly drained:
//
// 	if !t.Stop() {
// 		<-t.C
// 	}
// 	t.Reset(d)
//
// This should not be done concurrent to other receives from the Timer's
// channel.
//
// Note that it is not possible to use Reset's return value correctly, as there
// is a race condition between draining the channel and the new timer expiring.
// Reset should always be invoked on stopped or expired channels, as described above.
// The return value exists to preserve compatibility with existing programs.
//
// For a Timer created with AfterFunc(d, f), Reset either reschedules
// when f will run, in which case Reset returns true, or schedules f
// to run again, in which case it returns false.
// When Reset returns false, Reset neither waits for the prior f to
// complete before returning nor does it guarantee that the subsequent
// goroutine running f does not run concurrently with the prior
// one. If the caller needs to know whether the prior execution of
// f is completed, it must coordinate with f explicitly.
func (t *Timer) Reset(d Duration) bool {
	if t.r.f == nil {
		panic("time: Reset called on uninitialized Timer")
	}
	w := when(d)
	return resetTimer(&t.r, w)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// resetTimer resets an inactive timer, adding it to the heap.
//go:linkname resetTimer time.resetTimer
// Reports whether the timer was modified before it was run.
func resetTimer(t *timer, when int64) bool {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	return resettimer(t, when)
}
// resettimer resets the time when a timer should fire.
// If used for an inactive timer, the timer will become active.
// This should be called instead of addtimer if the timer value has been,
// or may have been, used previously.
// Reports whether the timer was modified before it was run.
func resettimer(t *timer, when int64) bool {
	return modtimer(t, when, t.period, t.f, t.arg, t.seq)
}
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
	if when <= 0 {
		throw("timer when must be positive")
	}
	if period < 0 {
		throw("timer period must be non-negative")
	}

	status := uint32(timerNoStatus)
	wasRemoved := false
	var pending bool
	var mp *m
loop:
	for {
        // 先是用一个for自旋修改定时器状态
		switch status = atomic.Load(&t.status); status {
		case timerWaiting, timerModifiedEarlier, timerModifiedLater:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()
			if atomic.Cas(&t.status, status, timerModifying) {
				pending = true // timer not yet run
				break loop
			}
			releasem(mp)
		case timerNoStatus, timerRemoved:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()

			// Timer was already run and t is no longer in a heap.
			// Act like addtimer.
			if atomic.Cas(&t.status, status, timerModifying) {
				wasRemoved = true
				pending = false // timer already run or stopped
				break loop
			}
			releasem(mp)
		case timerDeleted:
			// Prevent preemption while the timer is in timerModifying.
			// This could lead to a self-deadlock. See #38070.
			mp = acquirem()
			if atomic.Cas(&t.status, status, timerModifying) {
				atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
				pending = false // timer already stopped
				break loop
			}
			releasem(mp)
		case timerRunning, timerRemoving, timerMoving:
			// The timer is being run or moved, by a different P.
			// Wait for it to complete.
			osyield()
		case timerModifying:
			// Multiple simultaneous calls to modtimer.
			// Wait for the other call to complete.
			osyield()
		default:
			badTimer()
		}
	}

	t.period = period
	t.f = f
	t.arg = arg
	t.seq = seq
    // 如果已从P中移除,重新加入到P中 timerModifiying => timerWaiting
	if wasRemoved {
		t.when = when
		pp := getg().m.p.ptr()
		lock(&pp.timersLock)
		doaddtimer(pp, t)
		unlock(&pp.timersLock)
		if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
			badTimer()
		}
		releasem(mp)
		wakeNetPoller(when)
	} else {
		// The timer is in some other P's heap, so we can't change
		// the when field. If we did, the other P's heap would
		// be out of order. So we put the new when value in the
		// nextwhen field, and let the other P set the when field
		// when it is prepared to resort the heap.
		t.nextwhen = when
        // 如果修改后的时间<修改之前的时间,则修改状态为 timerModifiedEarlier
		newStatus := uint32(timerModifiedLater)
		if when < t.when {
			newStatus = timerModifiedEarlier
		}

		tpp := t.pp.ptr()

		// Update the adjustTimers field.  Subtract one if we
		// are removing a timerModifiedEarlier, add one if we
		// are adding a timerModifiedEarlier.
		adjust := int32(0)
		if status == timerModifiedEarlier {
			adjust--
		}
		if newStatus == timerModifiedEarlier {
			adjust++
			updateTimerModifiedEarliest(tpp, when)
		}
		if adjust != 0 {
			atomic.Xadd(&tpp.adjustTimers, adjust)
		}

		// Set the new status of the timer.
		if !atomic.Cas(&t.status, timerModifying, newStatus) {
			badTimer()
		}
		releasem(mp)

		// If the new status is earlier, wake up the poller.
        // 如果新状态提前 timerModifiedEarlier,则调用 wakeNetPooler 唤醒网络轮询器中休眠的线程,检查计时器被唤醒的时间(when)是否在当前轮询预期运行的时间(pollerPollUntil)内,若是则唤醒。
		if newStatus == timerModifiedEarlier {
			wakeNetPoller(when)
		}
	}

	return pending
}

如果当前定时器已从P堆中删除,则重新加入P堆中;

如果修改后的时间提前了,则修改状态为 timerModifiedEarlier,同时唤醒netpool中休眠的线程。

Timer.Stop & Ticker.Stop

1
2
3
4
5
6
// Stop turns off a ticker. After Stop, no more ticks will be sent.
// Stop does not close the channel, to prevent a concurrent goroutine
// reading from the channel from seeing an erroneous "tick".
func (t *Ticker) Stop() {
	stopTimer(&t.r)
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or been stopped.
// Stop does not close the channel, to prevent a read from the channel succeeding
// incorrectly.
//
// To ensure the channel is empty after a call to Stop, check the
// return value and drain the channel.
// For example, assuming the program has not received from t.C already:
//
// 	if !t.Stop() {
// 		<-t.C
// 	}
//
// This cannot be done concurrent to other receives from the Timer's
// channel or other calls to the Timer's Stop method.
//
// For a timer created with AfterFunc(d, f), if t.Stop returns false, then the timer
// has already expired and the function f has been started in its own goroutine;
// Stop does not wait for f to complete before returning.
// If the caller needs to know whether f is completed, it must coordinate
// with f explicitly.
func (t *Timer) Stop() bool {
	if t.r.f == nil {
		panic("time: Stop called on uninitialized Timer")
	}
	return stopTimer(&t.r)
}

Timer 和 Ticker 都是调用的 stopTimer。

1
2
3
4
5
6
// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
	return deltimer(t)
}

deltimer 在上面也看到过了。

小结

Go 语言的计时器在并发编程起到了非常重要的作用,它能够为我们提供比较准确的相对时间,基于它的功能,标准库中还提供了定时器、休眠等接口能够我们在 Go 语言程序中更好地处理过期和超时等问题。

标准库中的计时器在大多数情况下是能够正常工作并且高效完成任务的,但是在遇到极端情况或者性能敏感场景时,它可能没有办法胜任,而在 10ms 的这个粒度中,作者在社区中也没有找到能够使用的计时器实现,一些使用时间轮算法的开源库也不能很好地完成这个任务。

参考

Go中定时器实现原理及源码解析

6.3 计时器

Runtime: Golang 定时器实现原理及源码解析

迷惑的 goroutine 执行顺序