信号与软中断
信号机制是 UNIX、类 UNIX 甚至其他 POSIX 兼容系统上规定的一种进程异步通信的限制形式。 用于提醒某个事件的发生状态。 信号被定义为整数,产生信号的条件包括用户使用某些按键组合(比如 Control + C)、 硬件异常、kill 信号等等。
这些信号通常有三种不同的处理方式:忽略、捕获或者执行系统的默认行为。 忽略与捕获处理无法处理 SIGKILL 和 SIGSTOP,默认处理通常为停止进程。 而对于捕获处理而言,当信号发生时,操作系统将中断用户代码,并保存其执行的上下文,切换到内核空间 并重新切换到用户空间来执行预先设置好的信号处理回调。当回调执行完毕之后,会重新切换回内核 空间,并从中断的位置进行恢复,如图所示。

系统调用 sigaltstack 可以用于定义一个备用的信号栈来获取一个存在的额外信号栈的状态。 一个额外的信号栈会在信号处理执行中进行使用。
每个进程都包含一个信号屏蔽字(signal mask),规定了当前要阻塞递送到该进程的信号集。 对于每种可能的信号,屏蔽字中都有一位与之对应。 对于某种信号,若其对应位置已设置,则它当前是被阻塞的。 如果要检测和修改当前信号屏蔽字,则需要调用 sigprocmask 系统调用来进行。通过 _SIG_SETMASK 可以直接设置想要的屏蔽字,并获得原先的屏蔽字。
每个线程都有自己独立的signal mask,但所有线程共享进程的signal action。这意味着,你可以在线程中调用pthread_sigmask(不是sigmask)来决定本线程阻塞哪些信号。但你不能调用sigaction来指定单个线程的信号处理方式。如果在某个线程中调用了sigaction处理某个信号,那么这个进程中的未阻塞这个信号的线程在收到这个信号都会按同一种方式处理这个信号。所以每个线程不能按自己的方式处理信号。
在Linux中的posix线程模型中,线程拥有独立的进程号,可以通过getpid()得到线程的进程号,而线程号保存在pthread_t的值中。而主线程的进程号就是整个进程的进程号,因此向主进程发送信号只会将信号发送到主线程中去。如果主线程设置了信号屏蔽,则信号会投递到一个可以处理的线程中去。
-
如果是异常产生的信号(比如程序错误,像SIGPIPE、SIGEGV这些),则只有产生异常的线程收到并处理。
-
如果是用pthread_kill产生的内部信号,则只有pthread_kill参数中指定的目标线程收到并处理。
-
如果是外部使用kill命令产生的信号,通常是SIGINT、SIGHUP等job control信号,则会遍历所有线程,直到找到一个不阻塞该信号的线程,然后调用它来处理。(一般从主线程找起),注意只有一个线程能收到。
Go 程序对信号的默认行为
Go 语言实现了自己的运行时,因此,对信号的默认处理方式和普通的 C 程序不太一样。
- SIGBUS(总线错误), SIGFPE(算术错误)和 SIGSEGV(段错误)称为同步信号,它们在程序执行错误时触发,而不是通过 os.Process.Kill 之类的触发。通常,Go 程序会将这类信号转为 run-time panic。
- SIGHUP(挂起), SIGINT(中断)或 SIGTERM(终止)默认会使得程序退出。
- SIGQUIT, SIGILL, SIGTRAP, SIGABRT, SIGSTKFLT, SIGEMT 或 SIGSYS 默认会使得程序退出,同时生成 stack dump。
- SIGTSTP, SIGTTIN 或 SIGTTOU,这是 shell 使用的,作业控制的信号,执行系统默认的行为。
- SIGPROF(性能分析定时器,记录 CPU 时间,包括用户态和内核态), Go 运行时使用该信号实现 runtime.CPUProfile。
- 其他信号,Go 捕获了,但没有做任何处理。
信号可以被忽略或通过掩码阻塞(屏蔽字 mask)。忽略信号通过 signal.Ignore,没有导出 API 可以直接修改阻塞掩码,虽然 Go 内部有实现 sigprocmask 等。Go 中的信号被 runtime 控制,在使用时和 C 是不太一样的。
改变信号的默认行为
这就是 os/signal 包的功能。
Notify 改变信号处理,可以改变信号的默认行为;Ignore 可以忽略信号;Reset 重置信号为默认行为;Stop 则停止接收信号,但并没有重置为默认行为。
SIGPIPE
文档中对这个信号单独进行了说明。如果 Go 程序往一个 broken pipe 写数据,内核会产生一个 SIGPIPE 信号。
如果 Go 程序没有为 SIGPIPE 信号调用 Notify,对于标准输出或标准错误(文件描述符 1 或 2),该信号会使得程序退出;但其他文件描述符对该信号是啥也不做,当然 write 会返回错误 EPIPE。
如果 Go 程序为 SIGPIPE 调用了 Notify,不论什么文件描述符,SIGPIPE 信号都会传递给 Notify channel,当然 write 依然会返回 EPIPE。
也就是说,默认情况下,Go 的命令行程序跟传统的 Unix 命令行程序行为一致;但当往一个关闭的网络连接写数据时,传统 Unix 程序会 crash,但 Go 程序不会。
cgo 注意事项
如果非 Go 代码使用信号相关功能,需要仔细阅读掌握 os/signal 包中相关文档:Go programs that use cgo or SWIG 和 Non-Go programs that call Go code
处理函数的初始化
调度循环中讨论过了 M 的生命周期,M 可以在两种情况下被创建:
-
程序运行之初的 M0,无需创建已经存在的系统线程,只需对其进行初始化即可。其函数调用链如下所示:
1
2
3
4
5
6
|
schedinit
↳ mcommoninit
↳ mpreinit
↳ msigsave
↳ initSigmask
↳ mstart
|
``
-
需要时创建的 M,某些特殊情况下一定会创建一个新的 M 并进行初始化,而后创建系统线程。这些情况包括:
- startm 时没有空闲 m
- startTemplateThread 时
- startTheWorldWithSema 时 p 如果没有 m
- main 时创建系统监控
- oneNewExtraM 时
其调用链为:
1
2
3
4
5
6
7
|
newm
↳ allocm
↳ mcommoninit
↳ mpreinit
↳ newm1
↳ newosproc
↳ mstart
|
``
在 mcommoninit 里,会在一个父线程(或引导时的主线程)上调用 mpreinit,并最终会为一个 M 创建 gsignal,是一个在 M 上用于处理信号的 Goroutine。因此,除了 g0 外,其实第一个创建的 g 应该是它, 但是它并没有设置 Goid (Goroutine ID):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
func mcommoninit(mp *m) {
...
// 初始化 gsignal,用于处理 m 上的信号。
mpreinit(mp)
// gsignal 的运行栈边界处理
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
}
...
}
// 从一个父线程上进行调用(引导时为主线程),可以分配内存
func mpreinit(mp *m) {
mp.gsignal = malg(32 * 1024) // OS X 需要 >= 8K,此处创建处理 singnal 的 g
mp.gsignal.m = mp // 指定 gsignal 拥有的 m
}
|
获取原始信号屏蔽字
在调度器的初始化的阶段,initSigmask 目标旨在记录主线程 M0 创建之初的屏蔽字 sigmask:
1
2
3
4
5
6
7
8
9
|
func schedinit() {
_g_ := getg()
...
mcommoninit(_g_.m)
...
msigsave(_g_.m)
initSigmask = _g_.m.sigmask
...
}
|
其中 msigsave 通过 sigprocmask 这个系统调用将当前 m0 的屏蔽字保存到 mp.sigmask 上:
1
2
3
4
5
6
7
8
|
const _SIG_SETMASK = 3
// msigsave 将当前线程的信号屏蔽字保存到 mp.sigmask。
//go:nosplit
//go:nowritebarrierrec
func msigsave(mp *m) {
sigprocmask(_SIG_SETMASK, nil, &mp.sigmask)
}
|
sigprocmask 的本质为系统调用,其返回值通过 old 交付给调用者:
1
2
3
4
5
6
7
8
|
type sigset uint32
//go:nosplit
//go:nowritebarrierrec
func sigprocmask(how int32, new, old *sigset) {
rtsigprocmask(how, new, old, int32(unsafe.Sizeof(*new)))
}
//go:noescape
func rtsigprocmask(how int32, new, old *sigset, size int32)
|
rtsigprocmask 在 Linux 上由汇编直接包装 rt_sigprocmask 调用:
1
2
3
4
5
6
7
8
9
10
11
|
TEXT runtime·rtsigprocmask(SB),NOSPLIT,$0-28
MOVL how+0(FP), DI
MOVQ new+8(FP), SI
MOVQ old+16(FP), DX
MOVL size+24(FP), R10
MOVL $SYS_rt_sigprocmask, AX
SYSCALL
CMPQ AX, $0xfffffffffffff001
JLS 2(PC)
MOVL $0xf1, 0xf1 // crash
RET
|
注意,rt_sigprocmask 只适用于单个线程的调用,多线程上的调用时未定义行为, 不过初始化阶段的此时还未创建其他线程,因此此调用时安全的。
在 Darwin 系统中,所有的信号处理函数均通过 pthread_sigmask 来完成:
1
2
3
4
5
6
|
//go:nosplit
//go:cgo_unsafe_args
func sigprocmask(how uint32, new *sigset, old*sigset) {
libcCall(unsafe.Pointer(funcPC(sigprocmask_trampoline)), unsafe.Pointer(&how))
}
func sigprocmask_trampoline()
|
1
2
3
4
5
6
7
8
9
10
11
12
|
TEXT runtime·sigprocmask_trampoline(SB),NOSPLIT,$0
PUSHQ BP
MOVQ SP, BP
MOVQ 8(DI), SI // arg 2 new
MOVQ 16(DI), DX // arg 3 old
MOVL 0(DI), DI // arg 1 how
CALL libc_pthread_sigmask(SB)
TESTL AX, AX
JEQ 2(PC)
MOVL $0xf1, 0xf1 // crash
POPQ BP
RET
|
msigsave 执行完毕后,sigmask 最后保存到 initSigmask 这一全局变量中, 用于初始化新创建的 M 的信号屏蔽字:
1
2
3
4
5
6
7
8
|
// 用于新创建的 M 的信号掩码 signal mask 的值。
var initSigmask sigset
func schedinit() {
...
initSigmask = _g_.m.sigmask
...
}
|
用于当新创建 M 时(newm),将 M 的 sigmask 进行设置。
初始化信号栈
在进入 mstart 后,调用链关系就变成了:
1
2
3
4
5
6
7
8
|
mstart
↳ mstart1
↳ minit
↳ mstartm0 (仅当 m0 调用)
↳ schedule
↳ mexit
↳ sigblock
↳ unminit
|
mstart1 会调用 minit 进行初始化:
1
2
3
4
5
6
7
8
|
func minit() {
minitSignals()
...
}
func minitSignals() {
minitSignalStack()
minitSignalMask()
}
|
M 在初始化过程中,会判定当前线程是否设置了备用信号栈, 正常情况下一个新创建的 M 是没有备用信号栈的。 如果没有,则会将 m.gsignal 的执行栈设置为备用信号栈,用于处理产生的信号。
另一种情况是,当使用 cgo 时,非 Go 线程可能调用 Go 代码, 而这时用户态的 C 代码可能已经为非 Go 线程设置了信号栈,这时的替换必须小心。 因此如果 M 已经存在了备用信号栈,则会将现有的信号栈保存到 m.goSigStack 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
type stackt struct { // 信号栈
ss_sp *byte
ss_flags int32
pad_cgo_0 [4]byte
ss_size uintptr
}
// 如果没有为线程设置备用信号栈(正常情况),则将备用信号栈设置为 gsignal 栈。
// 如果为线程设置了备用信号栈(非 Go 线程设置备用信号栈然后调用 Go 函数的情况),
// 则将 gsignal 栈设置为备用信号栈。
// 如果没有使用 cgo 我们还设置了额外的 gsignal 信号栈(无论其是否已经被设置)
// 记录在 newSigstack 中做出的选择,
// 以便可以在 unminit 中撤消。
func minitSignalStack() {
_g_ := getg()
// 获取原有的信号栈
var st stackt
sigaltstack(nil, &st)
if st.ss_flags&_SS_DISABLE != 0 {
// 如果禁用了当前的信号栈
// 则将 gsignal 的执行栈设置为备用信号栈
signalstack(&_g_.m.gsignal.stack)
_g_.m.newSigstack = true
} else {
// 否则将 m 的 gsignal 栈设置为从 sigaltstack 返回的备用信号栈
setGsignalStack(&st, &_g_.m.goSigStack)
_g_.m.newSigstack = false
}
}
// 将 s 设置为备用信号栈,此方法仅在信号栈被禁用时调用
//go:nosplit
func signalstack(s *stack) {
st := stackt{ss_size: s.hi - s.lo}
setSignalstackSP(&st, s.lo)
sigaltstack(&st, nil)
}
//go:nosplit
func setSignalstackSP(s*stackt, sp uintptr) {
*(*uintptr)(unsafe.Pointer(&s.ss_sp)) = sp
}
// setGsignalStack 将当前 m 的 gsignal 栈设置为从 sigaltstack 系统调用返回的备用信号堆栈。
// 它将旧值保存在 *old 中以供 restoreGsignalStack 使用。
// 如果非 Go 代码设置了,则在处理信号时使用备用栈。
//go:nosplit
//go:nowritebarrierrec
func setGsignalStack(st *stackt, old *gsignalStack) {
g := getg()
if old != nil {
old.stack = g.m.gsignal.stack
old.stackguard0 = g.m.gsignal.stackguard0
old.stackguard1 = g.m.gsignal.stackguard1
old.stktopsp = g.m.gsignal.stktopsp
}
stsp := uintptr(unsafe.Pointer(st.ss_sp))
g.m.gsignal.stack.lo = stsp
g.m.gsignal.stack.hi = stsp + st.ss_size
g.m.gsignal.stackguard0 = stsp +_StackGuard
g.m.gsignal.stackguard1 = stsp + _StackGuard
}
|
初始化信号屏蔽字
当设置好信号栈后,会开始对 M 设置信号的屏蔽字,通过 sigmask 来获得当前 M 的屏蔽字,而后通过遍历所有运行时信号表来对屏蔽字进行初始化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
func minitSignalMask() {
nmask := getg().m.sigmask
// 遍历整个信号表
for i := range sigtable {
// 判断某个信号是否为不可阻止的信号,
// 如果是不可阻止的信号,则删除对应的屏蔽字所在位
if !blockableSig(uint32(i)) {
sigdelset(&nmask, i)
}
}
// 重新设置屏蔽字
sigprocmask(_SIG_SETMASK, &nmask, nil)
}
// 判断某个信号是否为不可阻止的信号
// 1. 当信号是非阻塞信号,则不可阻止
// 2. 当改程序为模块时,则可阻止
// 3. 当信号为 Kill 或 Throw 时,可阻止,否则不可阻止
func blockableSig(sig uint32) bool {
flags := sigtable[sig].flags
if flags&_SigUnblock != 0 {
return false
}
if isarchive || islibrary {
return true
}
return flags&(_SigKill|_SigThrow) == 0
}
func sigdelset(mask *sigset, i int) {
*mask &^= 1 << (uint32(i) - 1)
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type sigTabT struct {
flags int32
name string
}
var sigtable = [...]sigTabT{
/*0*/ {0, "SIGNONE: no trap"},
/*1*/ {_SigNotify +_SigKill, "SIGHUP: terminal line hangup"},
...
/*63*/ {_SigNotify, "signal 63"},
/* 64 */ {_SigNotify, "signal 64"},
}
const (
_SigNotify = 1 << iota // let signal.Notify have signal, even if from kernel
_SigKill // if signal.Notify doesn't take it, exit quietly
_SigThrow // if signal.Notify doesn't take it, exit loudly
_SigPanic // if the signal is from the kernel, panic
_SigDefault // if the signal isn't explicitly requested, don't monitor it
_SigGoExit // cause all runtime procs to exit (only used on Plan 9).
_SigSetStack // add SA_ONSTACK to libc handler
_SigUnblock // always unblock; see blockableSig
_SigIgn // _SIG_DFL action is to ignore the signal
)
|
信号处理
万事俱备,只欠东风。信号处理相关的初始化已经完成,包括了信号的屏蔽字、信号栈等。 正式进入调度循环之前,在 M0 上将调用 mstartm0,进而调用 initsig 初始化信号,针对每个信号进行单独处理:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
//go:yeswritebarrierrec
func mstartm0() {
...
initsig(false)
}
const (
_NSIG = 32
_SI_USER = 0 /* empirically true, but not what headers say */
_SIG_BLOCK = 1
_SIG_UNBLOCK = 2
_SIG_SETMASK = 3
_SS_DISABLE = 4
)
// Initialize signals.
// Called by libpreinit so runtime may not be initialized.
//go:nosplit
//go:nowritebarrierrec
// 循环注册信号处理程序
func initsig(preinit bool) {
if !preinit {
// It's now OK for signal handlers to run.
signalsOK = true
}
// For c-archive/c-shared this is called by libpreinit with
// preinit == true.
if (isarchive || islibrary) && !preinit {
return
}
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}
// We don't need to use atomic operations here because
// there shouldn't be any other goroutines running yet.
// 此时不需要原子操作,因为此时没有其他运行的 Goroutine
fwdSig[i] = getsig(i)
// 检查该信号是否需要设置 signal handler
if !sigInstallGoHandler(i) {
// Even if we are not installing a signal handler,
// set SA_ONSTACK if necessary.
// 即使不设置 signal handler,在必要时设置 SA_ONSTACK
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
sigInitIgnored(i)
}
continue
}
handlingSig[i] = 1
// 注册信号对应的回调方法
setsig(i, funcPC(sighandler))
}
}
|
对于一个需要设置 sighandler 的信号,会通过 setsig 来设置信号对应的动作(action):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
//go:nosplit
//go:nowritebarrierrec
// sigtramp注册为处理程序
func setsig(i uint32, fn uintptr) {
var sa usigactiont
sa.sa_flags =_SA_SIGINFO |_SA_ONSTACK |_SA_RESTART
sa.sa_mask = ^uint32(0)
if fn == funcPC(sighandler) {
if iscgo {
fn = funcPC(cgoSigtramp)
} else {
fn = funcPC(sigtramp)
}
}
*(*uintptr)(unsafe.Pointer(&sa.__sigaction_u)) = fn
sigaction(i, &sa, nil)
}
//go:nosplit
//go:cgo_unsafe_args
func sigaction(sig uint32, new *usigactiont, old *usigactiont) {
libcCall(unsafe.Pointer(funcPC(sigaction_trampoline)), unsafe.Pointer(&sig))
}
|
1
2
3
4
5
6
7
8
9
10
11
|
// sigaction->sysSigaction->rt_sigaction
// 调用rt_sigaction系统调用,注册处理程序
TEXT runtime·rt_sigaction(SB),NOSPLIT,$0-36
MOVQ sig+0(FP), DI
MOVQ new+8(FP), SI
MOVQ old+16(FP), DX
MOVQ size+24(FP), R10
MOVL $SYS_rt_sigaction, AX
SYSCALL
MOVL AX, ret+32(FP)
RET
|
以上逻辑主要作用就是循环注册 _NSIG(32) 个信号处理程序,其实都是 sigtramp 函数。操作系统内核在收到信号后会调用此函数。
值得注意的是这里有一个特殊处理,当 fn 为 sighandler 时候, 产生信号后的动作并非直接调用 sighandler,而是被替换为了 sigtramp:
1
2
3
4
5
6
7
8
9
|
TEXT runtime·sigtramp(SB),NOSPLIT,$72
...
MOVQ DX, ctx-56(SP)
MOVQ SI, info-64(SP)
MOVQ DI, signum-72(SP)
MOVQ $runtime·sigtrampgo(SB), AX
CALL AX
...
RET
|
进而调用 sigtrampgo。这样的处理方式是因为, sighandler 会将产生的信号交给对应的 g ,此时还无法决定究竟谁来进行处理。 因此,当信号发生时,而 sigtrampgo 会被调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// sigtrampgo is called from the signal handler function, sigtramp,
// written in assembly code.
// This is called by the signal handler, and the world may be stopped.
//
// It must be nosplit because getg() is still the G that was running
// (if any) when the signal was delivered, but it's (usually) called
// on the gsignal stack. Until this switches the G to gsignal, the
// stack bounds check won't work.
//
//go:nosplit
//go:nowritebarrierrec
func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
//sigfwdgo 用于约定该信号是否应该由 Go 进行处理, 如果不由 Go 进行处理(例如 cgo)则将其转发到 Go 代码之前设置的 handler 上。
if sigfwdgo(sig, info, ctx) {
return
}
c := &sigctxt{info, ctx}
g := sigFetchG(c)
setg(g)
if g == nil {
if sig == _SIGPROF {
sigprofNonGoPC(c.sigpc())
return
}
if sig == sigPreempt && preemptMSupported && debug.asyncpreemptoff == 0 {
// This is probably a signal from preemptM sent
// while executing Go code but received while
// executing non-Go code.
// We got past sigfwdgo, so we know that there is
// no non-Go signal handler for sigPreempt.
// The default behavior for sigPreempt is to ignore
// the signal, so badsignal will be a no-op anyway.
if GOOS == "darwin" {
atomic.Xadd(&pendingPreemptSignals, -1)
}
return
}
c.fixsigcode(sig)
badsignal(uintptr(sig), c)
return
}
setg(g.m.gsignal)
// If some non-Go code called sigaltstack, adjust.
var gsignalStack gsignalStack
setStack := adjustSignalStack(sig, g.m, &gsignalStack)
if setStack {
g.m.gsignal.stktopsp = getcallersp()
}
if g.stackguard0 == stackFork {
signalDuringFork(sig)
}
c.fixsigcode(sig)
sighandler(sig, info, ctx, g)
setg(g)
if setStack {
restoreGsignalStack(&gsignalStack)
}
}
|
sigtramp是入口,sighandler根据不同信号调用处理程序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
//go:nowritebarrierrec
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp*g) {
_g_ := getg()
c := &sigctxt{info, ctxt}
// profile 时钟超时
if sig == _SIGPROF {
sigprof(c.sigpc(), c.sigsp(), c.siglr(), gp, _g_.m)
return
}
if sig == _SIGTRAP && testSigtrap != nil && testSigtrap(info, (*sigctxt)(noescape(unsafe.Pointer(c))), gp) {
return
}
// 用户信号
if sig == _SIGUSR1 && testSigusr1 != nil && testSigusr1(gp) {
return
}
if sig == sigPreempt {
// 可能是一个抢占信号
doSigPreempt(gp, c)
// 即便这是一个抢占信号,它也可能与其他信号进行混合,因此我们
// 继续进行处理。
}
flags := int32(_SigThrow)
if sig < uint32(len(sigtable)) {
flags = sigtable[sig].flags
}
if flags&_SigPanic != 0 && gp.throwsplit {
// 我们无法安全的 sigpanic 因为它可能造成栈的增长,因此忽略它
flags = (flags &^ _SigPanic) | _SigThrow
}
...
if c.sigcode() != _SI_USER && flags&_SigPanic != 0 {
// 产生 panic 的信号
...
c.preparePanic(sig, gp)
return
}
// 对用户注册的信号进行转发
if c.sigcode() == _SI_USER || flags&_SigNotify != 0 {
if sigsend(sig) {
return
}
}
// 设置为可忽略的用户信号
if c.sigcode() == _SI_USER && signal_ignored(sig) {
return
}
// 处理 KILL 信号
if flags&_SigKill != 0 {
dieFromSignal(sig)
}
// 非 THROW,返回
if flags&_SigThrow == 0 {
return
}
// 处理一些直接 panic 的情况
...
}
|
注意,在信号处理中,当信号为 sigPreempt 时,将触发运行时的异步抢占机制.
函数 sigsend 会将用户信号发送到信号队列 sig 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
var sig struct {
note note
mask [(_NSIG + 31) / 32]uint32
wanted [(_NSIG + 31) / 32]uint32
ignored [(_NSIG + 31) / 32]uint32
recv [(_NSIG + 31) / 32]uint32
state uint32
delivering uint32
inuse bool
}
func sigsend(s uint32) bool {
bit := uint32(1) << uint(s&31)
if !sig.inuse || s >= uint32(32*len(sig.wanted)) {
return false
}
atomic.Xadd(&sig.delivering, 1)
// We are running in the signal handler; defer is not available.
if w := atomic.Load(&sig.wanted[s/32]); w&bit == 0 {
atomic.Xadd(&sig.delivering, -1)
return false
}
// Add signal to outgoing queue.
for {
mask := sig.mask[s/32]
if mask&bit != 0 {
atomic.Xadd(&sig.delivering, -1)
return true // signal already in queue
}
if atomic.Cas(&sig.mask[s/32], mask, mask|bit) {
break
}
}
// Notify receiver that queue has new bit.
Send:
for {
switch atomic.Load(&sig.state) {
default:
throw("sigsend: inconsistent state")
case sigIdle:
if atomic.Cas(&sig.state, sigIdle, sigSending) {
break Send
}
case sigSending:
// notification already pending
break Send
case sigReceiving:
if atomic.Cas(&sig.state, sigReceiving, sigIdle) {
notewakeup(&sig.note)
break Send
}
}
}
atomic.Xadd(&sig.delivering, -1)
return true
}
|
用户信号的接收方是通过 os/signal 完成的,我们随后讨论。
辅 M 线程
辅 M 是一个用于服务非 Go 线程(cgo 产生的线程)回调的 M。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
//go:yeswritebarrierrec
func mstartm0() {
// 创建一个额外的 M 服务 non-Go 线程(cgo 调用中产生的线程)的回调,并且只创建一个
// windows 上也需要额外 M 来服务 syscall.NewCallback 产生的回调,见 issue #6751
if (iscgo || GOOS == "windows") && !cgoHasExtraM {
cgoHasExtraM = true
newextram()
}
initsig(false)
}
// newextram 分配一个 m 并将其放入 extra 列表中
// 它会被工作中的本地 m 调用,因此它能够做一些调用 schedlock 和 allocate 类似的事情。
func newextram() {
c := atomic.Xchg(&extraMWaiters, 0)
if c > 0 {
for i := uint32(0); i < c; i++ {
oneNewExtraM()
}
} else {
// 确保至少有一个额外的 M
mp := lockextra(true)
unlockextra(mp)
if mp == nil {
oneNewExtraM()
}
}
}
// onNewExtraM 分配一个 m 并将其放入 extra list 中
func oneNewExtraM() {
mp := allocm(nil, nil)
gp := malg(4096)
gp.sched.pc = funcPC(goexit) + sys.PCQuantum
gp.sched.sp = gp.stack.hi
gp.sched.sp -= 4 * sys.RegSize
gp.sched.lr = 0
gp.sched.g = guintptr(unsafe.Pointer(gp))
gp.syscallpc = gp.sched.pc
gp.syscallsp = gp.sched.sp
gp.stktopsp = gp.sched.sp
gp.gcscanvalid = true
gp.gcscandone = true
casgstatus(gp, _Gidle, _Gdead)
gp.m = mp
mp.curg = gp
mp.lockedInt++
mp.lockedg.set(gp)
gp.lockedm.set(mp)
gp.goid = int64(atomic.Xadd64(&sched.goidgen, 1))
...
// 给垃圾回收器使用
allgadd(gp)
atomic.Xadd(&sched.ngsys, +1)
// 将 m 添加到 extra m 链表中
mnext := lockextra(true)
mp.schedlink.set(mnext)
extraMCount++
unlockextra(mp)
}
|
对 os/signal 包的支持
原理
在初始化阶段,signal产生一个goroutine,该 goroutine 循环运行并充当处理信号的使用者。此循环将一直休眠直到得到通知。这是第一步:

然后,当信号到达程序时,信号处理程序将其委托给称为gsignal的特殊 goroutine。此 goroutine 是使用固定的且无法增长的较大堆栈(32k,以满足不同操作系统的要求)创建的。每个线程(用 M 表示)都有一个内部gsignal goroutine 来处理信号。这是更新的图:

gsignal 分析信号以检查其是否可处理,并唤醒睡眠的goroutine并将信号发送到队列:

SIGBUS或SIGFPE之类的同步信号无法管理,将转换为 panic
然后,此循环 goroutine 可以对其进行处理。它首先查找已预订此事件的 channel,并将信号推送给他们:

锁定或阻塞gsignal会使信号处理陷入困境。由于其固定大小,它也无法分配内存。这就是为什么在信号处理链中具有两个独立的 goroutine 的重要性:一个在信号到达时立即将它们排队,另一个在同一队列中循环处理它们。
现在,我们可以使用新组件来更新第一部分的插图:

源码
我们已经看到了用户注册的信号会通过 sigsend 进行发送,这就是我们使用 os/signal 包的核心。
在使用 os/signal 后,会调用 signal.init 函数,懒惰的注册一个用户端的信号处理循环(当调用 Notify 时启动):
1
2
3
4
5
6
7
8
9
10
11
12
13
|
var (
watchSignalLoopOnce sync.Once
watchSignalLoop func()
)
func init() {
signal_enable(0) // 首次调用,进行初始化
watchSignalLoop = loop
}
func loop() {
for {
process(syscall.Signal(signal_recv()))
}
}
|
这个 signal_enable 和 signal_recv 用于激活运行时的信号队列,并从中接受信号:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
// 启用运行时信号队列
//go:linkname signal_enable os/signal.signal_enable
func signal_enable(s uint32) {
if !sig.inuse {
// The first call to signal_enable is for us
// to use for initialization. It does not pass
// signal information in m.
sig.inuse = true // enable reception of signals; cannot disable
noteclear(&sig.note)
return
}
if s >= uint32(len(sig.wanted)*32) {
return
}
w := sig.wanted[s/32]
w |= 1 << (s & 31)
atomic.Store(&sig.wanted[s/32], w)
i := sig.ignored[s/32]
i &^= 1 << (s & 31)
atomic.Store(&sig.ignored[s/32], i)
sigenable(s)
}
// 从信号队列中接受信号
//go:linkname signal_recv os/signal.signal_recv
func signal_recv() uint32 {
for {
// Serve any signals from local copy.
for i := uint32(0); i <_NSIG; i++ {
if sig.recv[i/32]&(1<<(i&31)) != 0 {
sig.recv[i/32] &^= 1 << (i & 31)
return i
}
}
// Wait for updates to be available from signal sender.
Receive:
for {
switch atomic.Load(&sig.state) {
default:
throw("signal_recv: inconsistent state")
case sigIdle:
if atomic.Cas(&sig.state, sigIdle, sigReceiving) {
notetsleepg(&sig.note, -1)
noteclear(&sig.note)
break Receive
}
case sigSending:
if atomic.Cas(&sig.state, sigSending, sigIdle) {
break Receive
}
}
}
// Incorporate updates from sender into local copy.
for i := range sig.mask {
sig.recv[i] = atomic.Xchg(&sig.mask[i], 0)
}
}
}
|
当接受到信号后,信号 sig 会被发送到用户在 Ignore/Notify/Stop 上所注册的 channel 上:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
func process(sig os.Signal) {
n := signum(sig)
if n < 0 {
return
}
handlers.Lock()
defer handlers.Unlock()
for c, h := range handlers.m {
if h.want(n) {
// 发送
// send but do not block for it
select {
case c <- sig:
default:
}
}
}
// Stop 的处理
// Avoid the race mentioned in Stop.
for _, d := range handlers.stopping {
if d.h.want(n) {
select {
case d.c <- sig:
default:
}
}
}
}
|
例如 signal.Notify,将信号 channel 注册到 handler 全局变量中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
var handlers struct {
sync.Mutex
m map[chan<- os.Signal]*handler
ref [numSig]int64
stopping []stopping
}
func Notify(c chan<- os.Signal, sig ...os.Signal) {
if c == nil {
panic("os/signal: Notify using nil channel")
}
watchSignalLoopOnce.Do(func() {
if watchSignalLoop != nil {
go watchSignalLoop()
}
})
handlers.Lock()
defer handlers.Unlock()
h := handlers.m[c]
if h == nil {
if handlers.m == nil {
handlers.m = make(map[chan<- os.Signal]*handler)
}
h = new(handler)
handlers.m[c] = h // 保存到 handler 中
}
add := func(n int) {
if n < 0 {
return
}
if !h.want(n) {
h.set(n)
if handlers.ref[n] == 0 {
enableSignal(n)
}
handlers.ref[n]++
}
}
if len(sig) == 0 {
for n := 0; n < numSig; n++ {
add(n)
}
} else {
for _, s := range sig {
add(signum(s))
}
}
}
|
signal API
Ignore 函数
1
|
func Ignore(sig ...os.Signal)
|
忽略一个、多个或全部(不提供任何信号)信号。如果程序接收到了被忽略的信号,则什么也不做。对一个信号,如果先调用 Notify,再调用 Ignore,Notify 的效果会被取消;如果先调用 Ignore,在调用 Notify,接着调用 Reset/Stop 的话,会回到 Ingore 的效果。注意,如果 Notify 作用于多个 chan,则 Stop 需要对每个 chan 都调用才能起到该作用。
Notify 函数
1
|
func Notify(c chan<- os.Signal, sig ...os.Signal)
|
类似于绑定信号处理程序。将输入信号转发到 chan c。如果没有列出要传递的信号,会将所有输入信号传递到 c;否则只传递列出的输入信号。
channel c 缓存如何决定?因为 signal 包不会为了向 c 发送信息而阻塞(就是说如果发送时 c 阻塞了,signal 包会直接放弃):调用者应该保证 c 有足够的缓存空间可以跟上期望的信号频率。对使用单一信号用于通知的 channel,缓存为 1 就足够了。
相关源码:
1
2
3
4
5
6
7
8
9
10
|
// src/os/signal/signal.go process 函数
for c, h := range handlers.m {
if h.want(n) {
// send but do not block for it
select {
case c <- sig:
default: // 保证不会阻塞,直接丢弃
}
}
}
|
可以使用同一 channel 多次调用 Notify:每一次都会扩展该 channel 接收的信号集。唯一从信号集去除信号的方法是调用 Stop。可以使用同一信号和不同 channel 多次调用 Notify:每一个 channel 都会独立接收到该信号的一个拷贝。
Stop 函数
1
|
func Stop(c chan<- os.Signal)
|
让 signal 包停止向 c 转发信号。它会取消之前使用 c 调用的所有 Notify 的效果。当 Stop 返回后,会保证 c 不再接收到任何信号。
Reset 函数
1
|
func Reset(sig ...os.Signal)
|
取消之前使用 Notify 对信号产生的效果;如果没有参数,则所有信号处理都被重置。
使用示例
注:syscall 包中定义了所有的信号常量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
)
var firstSigusr1 = true
func main() {
// 忽略 Control-C (SIGINT)
// os.Interrupt 和 syscall.SIGINT 是同义词
signal.Ignore(os.Interrupt)
c1 := make(chan os.Signal, 2)
// Notify SIGHUP
signal.Notify(c1, syscall.SIGHUP)
// Notify SIGUSR1
signal.Notify(c1, syscall.SIGUSR1)
go func() {
for {
switch <-c1 {
case syscall.SIGHUP:
fmt.Println("sighup, reset sighup")
signal.Reset(syscall.SIGHUP)
case syscall.SIGUSR1:
if firstSigusr1 {
fmt.Println("first usr1, notify interrupt which had ignore!")
c2 := make(chan os.Signal, 1)
// Notify Interrupt
signal.Notify(c2, os.Interrupt)
go handlerInterrupt(c2)
}
}
}
}()
select {}
}
func handlerInterrupt(c <-chan os.Signal) {
for {
switch <-c {
case os.Interrupt:
fmt.Println("signal interrupt")
}
}
}
|
编译后运行,先后给该进程发送如下信号:SIGINT、SIGUSR1、SIGINT、SIGHUP、SIGHUP,看输出是不是和你预期的一样。
小结
由于调度器在 Go 程序运行时的特殊地位,以及在进行跨语言调用时需要 cgo 的支持, 运行时信号处理相对而言还是较为复杂的,需要一套完整的机制来对各种情况进行处理, 甚至对用户态代码的 os/signal 进行支持。
转载: 6.6 信号处理机制