Cron表达式

基本cron格式:

1
2
3
4
5
6
7
8
# cron格式說明
# ┌──分鐘(0 - 59)
# │ ┌──小時(0 - 23)
# | │ ┌──日(1 - 31)
# | | | ┌─月(1 - 12)
# | | | | ┌─星期(0 - 7,星期日=0或7)
# | | | | |
# * * * * * 被執行的命令

注:

  1. 在“星期域”(第五个域),0和7都被视为星期日。
  2. 不很直观的用法:如果日期和星期同时被设置,那么其中的一个条件被满足时,指令便会被运行。
  3. 前5个域称之分时日月周,可方便个人记忆。

从第六个域起,指明要运行的命令。

特殊符号说明:

  • 星号(*) 表示 cron 表达式能匹配该字段的所有值。如在第5个字段使用星号(month),表示每个月

  • 斜线(/) 表示增长间隔,如第1个字段(minutes) 值是 3-59/15,表示每小时的第3分钟开始执行一次,之后每隔 15 分钟执行一次(即 3、18、33、48 这些时间点执行),这里也可以表示为:3/15

  • 逗号(,) 用于枚举值,如第6个字段值是 MON,WED,FRI,表示 星期一、三、五 执行

  • 连字号(-) 表示一个范围,如第3个字段的值为 9-17 表示 9am 到 5pm 直接每个小时(包括9和17)

  • 问号(?) 只用于 日(Day of month) 和 星期(Day of week),表示不指定值,可以用于代替 *

总结:

预定义模式:

举例:

每分钟执行一次命令:

1
* * * * * yourCommand

每小时的第2和第10分钟执行:

1
2,10 * * * * yourCommand

每天上午9点到12点的第2和第10分钟执行:

1
2,10 9-12 * * * yourCommand

每隔两天的上午9点到12点的第2和第10分钟执行:

1
2,10 9-12 */2 * * yourCommand

每周一上午9点到12点的第2和第10分钟执行:

1
2,10 9-12 * * 1 yourCommand

代码使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
    "time"
    "log"

    "github.com/robfig/cron"

    "github.com/EDDYCJY/go-gin-example/models"
)

func main() {
    log.Println("Starting...")

    c := cron.New()
    c.AddFunc("* * * * * *", func() {
        log.Println("Run models.CleanAllTag...")
        models.CleanAllTag()
    })
    c.AddFunc("* * * * * *", func() {
        log.Println("Run models.CleanAllArticle...")
        models.CleanAllArticle()
    })

    c.Start()

    t1 := time.NewTimer(time.Second * 10)
    for {
        select {
        case <-t1.C:
            t1.Reset(time.Second * 10)
        }
    }
}

在这段程序中,我们做了如下的事情

  1. cron.New() 会根据本地时间创建一个新(空白)的 Cron job runner

  2. c.AddFunc()

    AddFunc 会向 Cron job runner 添加一个 func ,以按给定的时间表运行.首先解析时间表,如果填写有问题会直接 err,无误则将 func 添加到 Schedule 队列中等待执行

  3. c.Start()

    在当前执行的程序中启动 Cron 调度程序。其实这里的主体是 goroutine + for + select + timer 的调度控制

设计思路

基本类型

Cron

cron:包含一系列要执行的实体;支持暂停【stop】;添加实体等

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
// Cron保持任意数量的条目的轨道,调用相关的func时间表指定。它可以被启动,停止和条目,可运行的同时进行检查。
type Cron struct {
	entries   []*Entry
	chain     Chain
	stop      chan struct{}// 控制 Cron 实例暂停
	add       chan *Entry // 当 Cron 已经运行了,增加新的 Entity 是通过 add 这个 channel 实现的
	remove    chan EntryID
	snapshot  chan chan []Entry // 获取当前所有 entity 的快照
	running   bool// 当已经运行时为true;否则为false
	logger    Logger //日志
	runningMu sync.Mutex
	location  *time.Location// 所在地区(新增属性)
	parser    Parser
	nextID    EntryID
	jobWaiter sync.WaitGroup
}

注意:

  1. Cron 结构没有导出任何成员。
  2. 有一个成员 stop,类型是 struct{},即空结构体。

Entry

Entry:调度实体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
type Entry struct {
    // The schedule on which this job should be run.
    // 负责调度当前 Entity 中的 Job 执行
    Schedule Schedule

    // The next time the job will run. This is the zero time if Cron has not been
    // started or this entry's schedule is unsatisfiable
    // Job 下一次执行的时间
    Next time.Time

    // The last time this job was run. This is the zero time if the job has never
    // been run.
    // 上一次执行时间
    Prev time.Time

    // The Job to run.
    // 要执行的 Job
    Job Job
}

// Entry consists of a schedule and the func to execute on that schedule.
type Entry struct {
	// ID is the cron-assigned ID of this entry, which may be used to look up a
	// snapshot or remove it.
	ID EntryID
    //负责调度当前 Entity 中的 Job 执行
	// Schedule on which this job should be run.
	Schedule Schedule
    // Job 下一次执行的时间
	// Next time the job will run, or the zero time if Cron has not been
	// started or this entry's schedule is unsatisfiable
	Next time.Time
    // 上一次执行时间
	// Prev is the last time this job was run, or the zero time if never.
	Prev time.Time

	// WrappedJob is the thing to run when the Schedule is activated.
	WrappedJob Job
    // 要执行的 Job
	// Job is the thing that was submitted to cron.
	// It is kept around so that user code that needs to get at the job later,
	// e.g. via Entries() can do so.
	Job Job
}

Job

Job:每一个实体包含一个需要运行的Job

这是一个接口,只有一个方法:run

1
2
3
type Job interface {
    Run()
}

由于 Entity 中需要 Job 类型,因此,我们希望定期运行的任务,就需要实现 Job 接口。同时,由于 Job接口只有一个无参数无返回值的方法,为了使用方便,作者提供了一个类型:

1
2
3
4
// A wrapper that turns a func() into a cron.Job
type FuncJob func()

func (f FuncJob) Run() { f() }

它通过简单的实现 Run() 方法来实现 Job 接口,这样,任何无参数无返回值的函数,通过强制类型转换为 FuncJob,就可以当作 Job 来使用了,AddFunc 方法 就是这么做的。所以需要修改带参数功能的job时从此处下手

Schedule

Schedule:每个实体包含一个调度器(Schedule)

负责调度 Job 的执行。它也是一个接口。

1
2
3
4
5
6
7
// The Schedule describes a job's duty cycle.
type Schedule interface {
	// Return the next activation time, later than the given time.
    // Next is invoked initially, and then each time the job is run.
    // 返回同一 Entity 中的 Job 下一次执行的时间
	Next(time.Time) time.Time
}

Schedule 的具体实现通过解析 Cron 表达式得到。

库中提供了 Schedule 的两个具体实现,分别是 SpecSchedule 和 ConstantDelaySchedule。

SpecSchedule

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// SpecSchedule specifies a duty cycle (to the second granularity), based on a
// traditional crontab specification. It is computed initially and stored as bit sets.
type SpecSchedule struct {
    // 表达式中锁表明的,秒,分,时,日,月,周,每个都是uint64
    // Dom:Day of Month,Dow:Day of week
    Second, Minute, Hour, Dom, Month, Dow uint64
}

// bounds provides a range of acceptable values (plus a map of name to value).
// 定义了表达式的结构体
type bounds struct {
    min, max uint
    names    map[string]uint
}


// The bounds for each field.
// 这样就能看出各个表达式的范围
var (
       seconds = bounds{0, 59, nil}
       minutes = bounds{0, 59, nil}
       hours   = bounds{0, 23, nil}
       dom     = bounds{1, 31, nil}
       months  = bounds{1, 12, map[string]uint{
              "jan": 1,
              "feb": 2,
              "mar": 3,
              "apr": 4,
              "may": 5,
              "jun": 6,
              "jul": 7,
              "aug": 8,
              "sep": 9,
              "oct": 10,
              "nov": 11,
              "dec": 12,
       }}
       dow = bounds{0, 6, map[string]uint{
              "sun": 0,
              "mon": 1,
              "tue": 2,
              "wed": 3,
              "thu": 4,
              "fri": 5,
              "sat": 6,
       }}
)

const (
       // Set the top bit if a star was included in the expression.
       starBit = 1 << 63
)

从开始介绍的 Cron 表达式可以容易得知各个字段的意思,同时,对各种表达式的解析也会最终得到一个 SpecSchedule 的实例。库中的 Parse 返回的其实就是 SpecSchedule 的实例(当然也就实现了 Schedule 接口)。

看了上面的东西肯定有人疑惑为什么秒分时这些都是定义了unit64,以及定义了一个常量starBit = 1 « 63这种写法,这是逻辑运算符。表示二进制1向左移动63位。原因如下:

cron表达式是用来表示一系列时间的,而时间是无法逃脱自己的区间的:

  • 分,秒 0 - 59
  • 时 0 - 23
  • 天/月 0 - 31
  • 天/周 0 - 6
  • 月0 - 11

这些本质上都是一个点集合,或者说是一个整数区间。 那么对于任意的整数区间 ,可以描述cron的如下部分规则。

  • * | ? 任意 , 对应区间上的所有点。 ( 额外注意 日/周 , 日 / 月 的相互干扰。)
  • 纯数字 , 对应一个具体的点。
  • / 分割的两个数字 a , b, 区间上符合 a + n * b 的所有点 ( n >= 0 )。
  • - 分割的两个数字, 对应这两个数字决定的区间内的所有点。
  • L | W 需要对于特定的时间特殊判断, 无法通用的对应到区间上的点。

至此, robfig/cron为什么不支持 L | W的原因已经明了了。去除这两条规则后, 其余的规则其实完全可以使用点的穷举来通用表示。 考虑到最大的区间也不过是60个点,那么使用一个uint64的整数的每一位来表示一个点便很合适了。所以定义unit64不为过

下面是go中cron表达式的方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/*
   ------------------------------------------------------------
   第64位标记任意 , 用于 日/周 , 日 / 月 的相互干扰。
   63 - 0 为 表示区间 [63 , 0] 的 每一个点。
   ------------------------------------------------------------

   假设区间是 0 - 63 , 则有如下的例子 :

   比如  0/3 的表示如下 : (表示每隔两位为1)
   * / ?
   +---+--------------------------------------------------------+
   | 0 | 1 0 0 1 0 0 1  ~~  ~~                    1 0 0 1 0 0 1 |
   +---+--------------------------------------------------------+
        63 ~ ~                                           ~~ 0

   比如  2-5 的表示如下 : (表示从右往左2-5位上都是1)
   * / ?
   +---+--------------------------------------------------------+
   | 0 | 0 0 0 0 ~  ~      ~~            ~    0 0 0 1 1 1 1 0 0 |
   +---+--------------------------------------------------------+
        63 ~ ~                                           ~~ 0

  比如  * 的表示如下 : (表示所有位置上都为1)
   * / ?
   +---+--------------------------------------------------------+
   | 1 | 1 1 1 1 1 ~  ~                  ~    1 1 1 1 1 1 1 1 1 |
   +---+--------------------------------------------------------+
        63 ~ ~                                           ~~ 0
*/

ConstantDelaySchedule

1
2
3
4
5
// ConstantDelaySchedule represents a simple recurring duty cycle, e.g. "Every 5 minutes".
// It does not support jobs more frequent than once a second.
type ConstantDelaySchedule struct {
	Delay time.Duration
}

这是一个简单的循环调度器,如:每 5 分钟。注意,最小单位是秒,不能比秒还小,比如 毫秒。

通过 Every 函数可以获取该类型的实例,如:

1
constDelaySchedule := Every(5e9)

得到的是一个每 5 秒执行一次的调度器。

Cron

实例化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func New(opts ...Option) *Cron {
	c := &Cron{
		entries:   nil,
		chain:     NewChain(),
		add:       make(chan *Entry),
		stop:      make(chan struct{}),
		snapshot:  make(chan chan []Entry),
		remove:    make(chan EntryID),
		running:   false,
		runningMu: sync.Mutex{},
		logger:    DefaultLogger,
		location:  time.Local,
		parser:    standardParser,
	}
	for _, opt := range opts {
		opt(c)
	}
	return c
}

// Option represents a modification to the default behavior of a Cron.
type Option func(*Cron)

可见实例化时,成员使用的基本是默认值;

成员方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 将 job 加入 Cron 中
// 如上所述,该方法只是简单的通过 FuncJob 类型强制转换 cmd,然后调用 AddJob 方法
func (c *Cron) AddFunc(spec string, cmd func()) error

// 将 job 加入 Cron 中
// 通过 Parse 函数解析 cron 表达式 spec 的到调度器实例(Schedule),之后调用 c.Schedule 方法
func (c *Cron) AddJob(spec string, cmd Job) error

// 获取当前 Cron 总所有 Entities 的快照
func (c *Cron) Entries() []*Entry

// 通过两个参数实例化一个 Entity,然后加入当前 Cron 中
// 注意:如果当前 Cron 未运行,则直接将该 entity 加入 Cron 中;
// 否则,通过 add 这个成员 channel 将 entity 加入正在运行的 Cron 中
func (c *Cron) Schedule(schedule Schedule, cmd Job)

// 新启动一个 goroutine 运行当前 Cron
func (c *Cron) Start()

// 通过给 stop 成员发送一个 struct{}{} 来停止当前 Cron,同时将 running 置为 false
// 从这里知道,stop 只是通知 Cron 停止,因此往 channel 发一个值即可,而不关心值是多少
// 所以,成员 stop 定义为空 struct
func (c *Cron) Stop()

AddFunc

从AddFunc函数说起带参数任务的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
	return c.AddJob(spec, FuncJob(cmd))
}

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec)
	if err != nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}

AddFunc 含有两个参数,第一个是 cron表达式,这个不解释,第二个是func()类型参数cmd 即无参数无返回类型函数,下一步中直接将此参数强制转换为FuncJob类型,并调用AddJob函数

FuncJob类型:

1
2
3
4
// FuncJob is a wrapper that turns a func() into a cron.Job
type FuncJob func()

func (f FuncJob) Run() { f() }

由上述代码可知FuncJob为自定义类型,真实类型为 func(),此类型实现了一个Run()方法

AddJob

首先 AddJob 函数的传入参数为一个string类型的cron表达式和一个Job类型的cmd参数,但在AddFunc函数中,我们传入的第二个参数为FuncJob类型,所以Job类型应该是一个接口,在解析了cron表达式无错误以后,调用Schedule方法将cmd添加进了调度器

Job 类型:

1
2
3
4
// Job is an interface for submitted cron jobs.
type Job interface {
    Run()
}

由此可知,Job是带有一个Run方法的接口类型,经过代码分析可以指定,cron定时调度时间到达时,将调用此方法,也就是意味着,任何实现了Run方法的实例,都可以作为AddJob函数的cmd参数,而Run方法所实现的内容就是你定时调度所需执行的任务(AddFunc函数只能添加无参数无返回的任务,太鸡肋了),接下来我们就来实现一个带参数的任务添加

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//定义一个类型 包含一个int类型参数和函数体
type funcIntJob struct {
    num   int
    function func(int)
}

//实现这个类型的Run()方法 使得可以传入Job接口
func (this *funcIntJob) Run() {
    if nil != this.function {
        this.function(this.num)
    }
}

//非必须  返回一个urlServeJob指针
func newfuncIntJob(num int, function funcInt) *urlServeJob {
    instance := &funcIntJob{
        num:   num,
        function: function,
    }
    return instance
}

//示例任务
func shownum(num int){
  fmt.Println(num)
}

func main(){
  var c = cron.New()
  job := newfuncIntJob(3, shownum)
  spec := "*/5 * * * * ?"
    c.AddJob(spec, job)
  c.Start()
  defer c.Stop()
  select{}
}

Parse

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
// 将字符串解析成为SpecSchedule 。 SpecSchedule符合Schedule接口
// Parse returns a new crontab schedule representing the given spec.
// It returns a descriptive error if the spec is not valid.
// It accepts crontab specs and features configured by NewParser.
func (p Parser) Parse(spec string) (Schedule, error) {

	if len(spec) == 0 {
		return nil, fmt.Errorf("empty spec string")
	}

	// Extract timezone if present
	var loc = time.Local
	if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") {
		var err error
		i := strings.Index(spec, " ")
		eq := strings.Index(spec, "=")
		if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil {
			return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err)
		}
		spec = strings.TrimSpace(spec[i:])
	}

    // Handle named schedules (descriptors), if configured
    // 直接处理特殊的特殊的字符串
	if strings.HasPrefix(spec, "@") {
		if p.options&Descriptor == 0 {
			return nil, fmt.Errorf("parser does not accept descriptors: %v", spec)
		}
		return parseDescriptor(spec, loc)
	}

    // Split on whitespace.
    // cron利用空白拆解出独立的items。
	fields := strings.Fields(spec)

	// Validate & fill in any omitted or optional fields
	var err error
	fields, err = normalizeFields(fields, p.options)
	if err != nil {
		return nil, err
	}
    // 抽象出filed函数,方便下面调用
	field := func(field string, r bounds) uint64 {
		if err != nil {
			return 0
		}
		var bits uint64
		bits, err = getField(field, r)
		return bits
	}

	var (
		second     = field(fields[0], seconds)
		minute     = field(fields[1], minutes)
		hour       = field(fields[2], hours)
		dayofmonth = field(fields[3], dom)
		month      = field(fields[4], months)
		dayofweek  = field(fields[5], dow)
	)
	if err != nil {
		return nil, err
	}
    // 返回所需要的SpecSchedule
	return &SpecSchedule{
		Second:   second,
		Minute:   minute,
		Hour:     hour,
		Dom:      dayofmonth,
		Month:    month,
		Dow:      dayofweek,
		Location: loc,
	}, nil
}

// 解析items 。
// getField returns an Int with the bits set representing all of the times that
// the field represents or error parsing field value.  A "field" is a comma-separated
// list of "ranges".
func getField(field string, r bounds) (uint64, error) {
	var bits uint64
     // items利用 "," 拆解出 item 。
    ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' })
	for _, expr := range ranges {
        // 利用穷举法一一检测
		bit, err := getRange(expr, r)
		if err != nil {
			return bits, err
		}
		bits |= bit
	}
	return bits, nil
}

// 利用穷举法一一检测
// getRange returns the bits indicated by the given expression:
//   number | number "-" number [ "/" number ]
// or error parsing range.
func getRange(expr string, r bounds) (uint64, error) {
	var (
		start, end, step uint
		rangeAndStep     = strings.Split(expr, "/")
		lowAndHigh       = strings.Split(rangeAndStep[0], "-")
		singleDigit      = len(lowAndHigh) == 1
		err              error
	)

    var extra uint64
    //是否仅有一个字符是 * 或者 ?。
	if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" {
		start = r.min
		end = r.max
		extra = starBit
	} else {
        //是否可以"-"分解为俩数字
		start, err = parseIntOrName(lowAndHigh[0], r.names)
		if err != nil {
			return 0, err
		}
		switch len(lowAndHigh) {
		case 1:
			end = start
		case 2:
			end, err = parseIntOrName(lowAndHigh[1], r.names)
			if err != nil {
				return 0, err
			}
		default:
			return 0, fmt.Errorf("too many hyphens: %s", expr)
		}
	}
    //是否可以"/"分解为俩数字
	switch len(rangeAndStep) {
	case 1:
		step = 1
	case 2:
		step, err = mustParseInt(rangeAndStep[1])
		if err != nil {
			return 0, err
		}

		// Special handling: "N/step" means "N-max/step".
		if singleDigit {
			end = r.max
		}
		if step > 1 {
			extra = 0
		}
	default:
		return 0, fmt.Errorf("too many slashes: %s", expr)
	}
    //转化为点 。
	if start < r.min {
		return 0, fmt.Errorf("beginning of range (%d) below minimum (%d): %s", start, r.min, expr)
	}
	if end > r.max {
		return 0, fmt.Errorf("end of range (%d) above maximum (%d): %s", end, r.max, expr)
	}
	if start > end {
		return 0, fmt.Errorf("beginning of range (%d) beyond end of range (%d): %s", start, end, expr)
	}
	if step == 0 {
		return 0, fmt.Errorf("step of range should be a positive number: %s", expr)
	}

	return getBits(start, end, step) | extra, nil
}

// 辅助函数 。 解析预定义的名字或者数字
// parseIntOrName returns the (possibly-named) integer contained in expr.
func parseIntOrName(expr string, names map[string]uint) (uint, error) {
	if names != nil {
		if namedInt, ok := names[strings.ToLower(expr)]; ok {
			return namedInt, nil
		}
	}
	return mustParseInt(expr)
}

// 辅助函数 。 解析预定义的名字或者数字
// mustParseInt parses the given expression as an int or returns an error.
func mustParseInt(expr string) (uint, error) {
	num, err := strconv.Atoi(expr)
	if err != nil {
		return 0, fmt.Errorf("failed to parse int from %s: %s", expr, err)
	}
	if num < 0 {
		return 0, fmt.Errorf("negative number (%d) not allowed: %s", num, expr)
	}

	return uint(num), nil
}

// 辅助函数 具体的将每个点设置好
// getBits sets all bits in the range [min, max], modulo the given step size.
func getBits(min, max, step uint) uint64 {
	var bits uint64

	// If step is 1, use shifts.
	if step == 1 {
		return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min)
	}

	// Else, use a simple loop.
	for i := min; i <= max; i += step {
		bits |= 1 << i
	}
	return bits
}
// 辅助函数 。 设置区间的点 + 任意标志
// all returns all bits within the given bounds.  (plus the star bit)
func all(r bounds) uint64 {
	return getBits(r.min, r.max, 1) | starBit
}
// 解析预定义的名字
// parseDescriptor returns a predefined schedule for the expression, or error if none matches.
func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) {
	switch descriptor {
	case "@yearly", "@annually":
		return &SpecSchedule{
			Second:   1 << seconds.min,
			Minute:   1 << minutes.min,
			Hour:     1 << hours.min,
			Dom:      1 << dom.min,
			Month:    1 << months.min,
			Dow:      all(dow),
			Location: loc,
		}, nil

	case "@monthly":
		return &SpecSchedule{
			Second:   1 << seconds.min,
			Minute:   1 << minutes.min,
			Hour:     1 << hours.min,
			Dom:      1 << dom.min,
			Month:    all(months),
			Dow:      all(dow),
			Location: loc,
		}, nil

	case "@weekly":
		return &SpecSchedule{
			Second:   1 << seconds.min,
			Minute:   1 << minutes.min,
			Hour:     1 << hours.min,
			Dom:      all(dom),
			Month:    all(months),
			Dow:      1 << dow.min,
			Location: loc,
		}, nil

	case "@daily", "@midnight":
		return &SpecSchedule{
			Second:   1 << seconds.min,
			Minute:   1 << minutes.min,
			Hour:     1 << hours.min,
			Dom:      all(dom),
			Month:    all(months),
			Dow:      all(dow),
			Location: loc,
		}, nil

	case "@hourly":
		return &SpecSchedule{
			Second:   1 << seconds.min,
			Minute:   1 << minutes.min,
			Hour:     all(hours),
			Dom:      all(dom),
			Month:    all(months),
			Dow:      all(dow),
			Location: loc,
		}, nil

	}

	const every = "@every "
	if strings.HasPrefix(descriptor, every) {
		duration, err := time.ParseDuration(descriptor[len(every):])
		if err != nil {
			return nil, fmt.Errorf("failed to parse duration %s: %s", descriptor, err)
		}
		return Every(duration), nil
	}

	return nil, fmt.Errorf("unrecognized descriptor: %s", descriptor)
}

该函数主要是将cron表达式映射为“Second, Minute, Hour, Dom, Month, Dow”6个时间维度的结构体SpecSchedule。

Schedule

接下来看Cron类型的Schedule方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Schedule adds a Job to the Cron to be run on the given schedule.
// The job is wrapped with the configured Chain.
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
	if !c.running {
		c.entries = append(c.entries, entry)
	} else {
		c.add <- entry
	}
	return entry.ID
}

这个比较好理解,根据schedule和cmd参数构建了一个Entry变量,并且将这个变量添加进Cron的entries中

只不过在没有运行的时候直接添加,运行的时候通过chan添加.

Start

调度的开始实施是从Cron.Start()函数开始的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Start the cron scheduler in its own go-routine, or no-op if already started.
func (c *Cron) Start() {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true
	go c.run()
}

东西很少,就是开了一个routine执行任务,这里cron还提供了一个使用当前routine执行的方法Run(),

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// Run the cron scheduler, or no-op if already running.
func (c *Cron) Run() {
	c.runningMu.Lock()
	if c.running {
		c.runningMu.Unlock()
		return
	}
	c.running = true
	c.runningMu.Unlock()
	c.run()
}

先不管这些,接下来重点到run()方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// run the scheduler.. this is private just due to the need to synchronize
// access to the 'running' state variable.
func (c *Cron) run() {
	c.logger.Info("start")

	// Figure out the next activation times for each entry.
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)//得到entries中的每一个entry更新下一次执行时间
		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
	}
    // 无限循环
	for {
        //通过对下一个执行时间进行排序,判断那些任务是下一次被执行的,防在队列的前面.sort是用来做排序的
		// Determine the next entry to run.
		sort.Sort(byTime(c.entries))//排序  得到最先要执行的entry

        var timer *time.Timer
        // 如果没有要执行的任务或者第一个任务的待执行时间为空,则睡眠
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			// If there are no entries yet, just sleep - it still handles new entries
			// and stop requests.
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))//时间最近要执行Entry到现在的时间差 下面唤醒select
		}

		for {
			select {
			case now = <-timer.C://时间到  执行任务
				now = now.In(c.location)//更新时间
				c.logger.Info("wake", "now", now)

				// Run every entry whose next time was less than now
				for _, e := range c.entries {
					if e.Next.After(now) || e.Next.IsZero() {
						break
					}
					c.startJob(e.WrappedJob)
					e.Prev = e.Next
					e.Next = e.Schedule.Next(now)
					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)//下一个要执行的时间
				}

			case newEntry := <-c.add://运行中添加Entry
				timer.Stop()
				now = c.now()
				newEntry.Next = newEntry.Schedule.Next(now)
				c.entries = append(c.entries, newEntry)
				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)

			case replyChan := <-c.snapshot:// 快照
				replyChan <- c.entrySnapshot()
				continue

			case <-c.stop://停止信号
				timer.Stop()
				c.logger.Info("stop")
				return

			case id := <-c.remove:
				timer.Stop()
				now = c.now()
				c.removeEntry(id)
				c.logger.Info("removed", "entry", id)
			}

			break
		}
	}
}

进入该函数,首先遍历所有任务,找到所有任务下一个要执行的时间。然后进入外层for循环,对于各个任务按照执行时间进行排序,保证离当前时间最近的先执行。再对任务列表进行判定,是否有任务如果没有,则休眠,否则初始化一个timer。

里层的for循环才是重头戏,下面主要分析这个for循环里面的任务加入和执行。

在此之前,需要了解下go标准库的timer:

  • timer用于指定在某个时间间隔后,调用函数或者表达式。
  • 使用NewTimer就可以创建一个Timer,在指定时间间隔到达后,可以通过<-timer.C接收值。

有了这个背景之后,我们再来看run函数的里层for循环。

接收到c.add信道

1
2
3
4
5
case newEntry := <-c.add:	// 添加任务
	timer.Stop()
	now = c.now()
	newEntry.Next = newEntry.Schedule.Next(now)
	c.entries = append(c.entries, newEntry)

将timer停掉,清除设置的定时功能,并以当前时间点为起点,设置添加任务的下一次执行时间,并添加到entries任务队列中。

接收到timer.C信道

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
case now = <-timer.C:
	now = now.In(c.location)
	c.logger.Info("wake", "now", no
	// Run every entry whose next time was less than now
	for _, e := range c.entries {
		if e.Next.After(now) || e.Next.IsZero() {
			break
		}
		c.startJob(e.WrappedJob)
		e.Prev = e.Next
		e.Next = e.Schedule.Next(now)
		c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
	}

当定时任务到点后,time.C就会接收到值,并新开协程执行真正需要执行的Job,之后再更新下一个要执行的任务列表。

我们进入startJob函数,该函数从函数名就可以看出,即使出现panic也可以重新recovery,保证其他任务不受影响。

1
2
3
4
5
6
7
8
// startJob runs the given job in a new goroutine.
func (c *Cron) startJob(j Job) {
	c.jobWaiter.Add(1)
	go func() {
		defer c.jobWaiter.Done()
		j.Run()
	}()
}

追根溯源,我们发现真正执行Job的是j.Run()的执行。进入这个Run函数的实现,我们看到

1
2
3
4
// Job is an interface for submitted cron jobs.
type Job interface {
	Run()
}

没错,我们要执行的任务一直从AddFunc一直往下传递,直到这里,我们通过调用Run函数,将包装的FuncJob类型的函数通过f()的形式进行执行。

Stop

1
2
3
4
5
6
7
8
9
// 结束任务
// Stop stops the cron scheduler if it is running; otherwise it does nothing.
func (c *Cron) Stop() {
    if !c.running {
        return
    }
    c.stop <- struct{}{}
    c.running = false
}

entry

Schedule.Next

这个函数主要调用了Schedule的Next方法,Schedule是一个接口,在前面我们知道,实际上在解析spec的时 候返回的变量是SpecSchedule类型,所以此处应该调用SpecSchedule的Next方法,这个方法就是上面说的 那个复杂不贴代码的方法,在网上找了个带注释的版本,反正就是得到这个entry下次执行的时间吧

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Next returns the next time this schedule is activated, greater than the given
// time.  If no time can be found to satisfy the schedule, return the zero time.
func (s *SpecSchedule) Next(t time.Time) time.Time {
	// General approach
	//
	// For Month, Day, Hour, Minute, Second:
	// Check if the time value matches.  If yes, continue to the next field.
	// If the field doesn't match the schedule, then increment the field until it matches.
	// While incrementing the field, a wrap-around brings it back to the beginning
	// of the field list (since it is necessary to re-verify previous field
	// values)

	// Convert the given time into the schedule's timezone, if one is specified.
	// Save the original timezone so we can convert back after we find a time.
	// Note that schedules without a time zone specified (time.Local) are treated
	// as local to the time provided.
	origLocation := t.Location()
	loc := s.Location
	if loc == time.Local {
		loc = t.Location()
	}
	if s.Location != time.Local {
		t = t.In(s.Location)
	}

    // Start at the earliest possible time (the upcoming second).
    // 秒级别的取整
	t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond)
    // 判断一个字段是否被累加,如果是, 那么它的下一级别的字段需要归 0 。
	// This flag indicates whether a field has been incremented.
	added := false

	// If no time is found within five years, return zero.
	yearLimit := t.Year() + 5
    // 下一级别的字段累加到重置,需要重新累加上一级别的字段的时候的goto点
    // 比如要找每个月的31号的时候, 4月是符合月份字段的规定的,但是4月的没有31号。 遍历尽4月的每一天后,只能请求重新累加月份。
WRAP:
	if t.Year() > yearLimit {
		return time.Time{}
	}
    // 月
	// Find the first applicable month.
	// If it's this month, then do nothing.
	for 1<<uint(t.Month())&s.Month == 0 {
		// If we have to add a month, reset the other parts to 0.
		if !added {
			added = true
			// Otherwise, set the date at the beginning (since the current time is irrelevant).
			t = time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, loc)
		}
		t = t.AddDate(0, 1, 0)

		// Wrapped around.
		if t.Month() == time.January {
			goto WRAP
		}
	}
    // 天 , 一次处理 天/月 和 天/周
	// Now get a day in that month.
	//
	// NOTE: This causes issues for daylight savings regimes where midnight does
	// not exist.  For example: Sao Paulo has DST that transforms midnight on
	// 11/3 into 1am. Handle that by noticing when the Hour ends up != 0.
	for !dayMatches(s, t) {
		if !added {
			added = true
			t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, loc)
		}
		t = t.AddDate(0, 0, 1)
		// Notice if the hour is no longer midnight due to DST.
		// Add an hour if it's 23, subtract an hour if it's 1.
		if t.Hour() != 0 {
			if t.Hour() > 12 {
				t = t.Add(time.Duration(24-t.Hour()) * time.Hour)
			} else {
				t = t.Add(time.Duration(-t.Hour()) * time.Hour)
			}
		}

		if t.Day() == 1 {
			goto WRAP
		}
	}
    // 时
	for 1<<uint(t.Hour())&s.Hour == 0 {
		if !added {
			added = true
			t = time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, loc)
		}
		t = t.Add(1 * time.Hour)

		if t.Hour() == 0 {
			goto WRAP
		}
	}
    // 分
	for 1<<uint(t.Minute())&s.Minute == 0 {
		if !added {
			added = true
			t = t.Truncate(time.Minute)
		}
		t = t.Add(1 * time.Minute)

		if t.Minute() == 0 {
			goto WRAP
		}
	}
    // 秒
	for 1<<uint(t.Second())&s.Second == 0 {
		if !added {
			added = true
			t = t.Truncate(time.Second)
		}
		t = t.Add(1 * time.Second)

		if t.Second() == 0 {
			goto WRAP
		}
	}

	return t.In(origLocation)
}

//一次处理 天/月 和 天/周 。 如果两者中有任意, 那么必须同时符合另一个才算是匹配
// dayMatches returns true if the schedule's day-of-week and day-of-month
// restrictions are satisfied by the given time.
func dayMatches(s *SpecSchedule, t time.Time) bool {
	var (
		domMatch bool = 1<<uint(t.Day())&s.Dom > 0
		dowMatch bool = 1<<uint(t.Weekday())&s.Dow > 0
	)
	if s.Dom&starBit > 0 || s.Dow&starBit > 0 {
		return domMatch && dowMatch
	}
	return domMatch || dowMatch
}

参考: https://segmentfault.com/a/1190000014666453 http://chuquanl.com/golang-cron%E7%AE%80%E4%BB%8B%E5%8F%8A%E4%BD%BFcron%E6%94%AF%E6%8C%81%E5%B8%A6%E5%8F%82%E6%95%B0%E4%BB%BB%E5%8A%A1%E8%B0%83%E7%94%A8/ https://juejin.im/post/5d3d79b9518825378f6cc6df