/

Go语言开源库 - Cron库解析

该篇文章首发于boyn.top,转载请声明

什么是Cron?

顾名思义,这是一个使用Go语言编写的定时任务调度库,他有以下特性:

  1. 支持Cron表达式或者Quartz表达式进行任务调度

  2. 运行时动态添加,删除和获取任务信息

  3. 使用包装模式,增加了拦截器的功能,可以实现类似于AOP的功能(默认有recover和log)

这个库的代码量并不算多,并且写的很好,结构优美,数据结构定义合理.

下面,我们会从3个方面来解析这个库:

  1. 一个任务是怎么被执行的

  2. 动态添加,删除是怎么实现的

  3. 包装模式是怎么实现的

①. 一个任务是怎么被执行的

我们先来看官网的基本调度例子:

1
2
3
4
c := cron.New()
c.AddFunc("30 * * * *", func() { fmt.Println("Every hour on the half hour") })
c.AddFunc("30 3-6,20-23 * * *", func() { fmt.Println(".. in the range 3-6am, 8-11pm") })
c.Start()

我们只需要几个语句,就可以提交定时任务并执行了.语法上十分简洁易懂,而背后的流程则是我们需要重点关注的.

在这个例子中,用到了New(),AddFunc(),Start()这三个函数,这已经概括了这个库的绝大多数执行流程了.

我们来看一下他们背后的数据结构.

首先是Cron

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Cron keeps track of any number of entries, invoking the associated func as
// specified by the schedule. It may be started, stopped, and the entries may
// be inspected while running.
type Cron struct {
entries []*Entry // 任务实体slice
chain Chain // 执行链(函数包装)
stop chan struct{} // 负责running时发送停止消息的chan
add chan *Entry // 负责running时动态增加任务的chan
remove chan EntryID // 负责running时动态删除任务的chan
snapshot chan chan []Entry // 快照chan
running bool // 是否running
logger Logger // 日志
runningMu sync.Mutex // 锁,执行任务添加,删除等操作时保证并发安全
location *time.Location // 时区
parser ScheduleParser // cron或Quartz表达式的解析器
nextID EntryID // id
jobWaiter sync.WaitGroup // 在退出时,确保所有任务都执行完成后才退出
}

在①中,我们暂时focus on entries and parser.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type Entry struct {
// ID is the cron-assigned ID of this entry, which may be used to look up a
// snapshot or remove it.
ID EntryID
// Schedule on which this job should be run.
Schedule Schedule
// Next time the job will run, or the zero time if Cron has not been
// started or this entry's schedule is unsatisfiable
Next time.Time
// Prev is the last time this job was run, or the zero time if never.
Prev time.Time
// WrappedJob is the thing to run when the Schedule is activated.
WrappedJob Job
// Job is the thing that was submitted to cron.
// It is kept around so that user code that needs to get at the job later,
// e.g. via Entries() can do so.
Job Job
}

每一个Entry都有唯一的ID和调度器,WrappedJob是被包装了Chain后的Job结构,Job是我们传入的函数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
return c.AddJob(spec, FuncJob(cmd))
}
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
schedule, err := c.parser.Parse(spec)
if err != nil {
return 0, err
}
return c.Schedule(schedule, cmd), nil
}
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
c.runningMu.Lock()
defer c.runningMu.Unlock()
c.nextID++
entry := &Entry{
ID: c.nextID,
Schedule: schedule,
WrappedJob: c.chain.Then(cmd),
Job: cmd,
}
if !c.running {
c.entries = append(c.entries, entry)
} else {
c.add <- entry
}
return entry.ID
}

这一段是添加定时任务时发生的事情.

AddFunc只是做了一些封装,AddJob中负责解析表达式并将产生的调度器与Job一起传给Schedule函数

Schedule函数将调度器,Id,包装链(c.chanin.Then(cmd))这个语句我们可以暂时地把他当做将Job包装几层后产生的New Job,将这些结构合并成为一个entry,当cron不在运行状态时,就直接将entry添加到c的entry数组中,如果在运行状态中,就动态地添加(②中会说明).添加完成后,返回对应的id

这样,一个任务就被添加到entry数组中了.接下来我们看看是怎么执行的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (c *Cron) run() {
// Figure out the next activation times for each entry.
now := c.now()
for _, entry := range c.entries {
entry.Next = entry.Schedule.Next(now)
}
for {
// Determine the next entry to run.
sort.Sort(byTime(c.entries))
// 为了看着方便删去了一些边界条件
timer := time.NewTimer(c.entries[0].Next.Sub(now))
for {
select {
case now = <-timer.C:
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
}
// 动态添加部分暂时删除,后面会给出完整函数
break
}
}
}

我们调用Start()方法,实质上会创建一个新的协程来执行包私有的run方法.

首先遍历entries,计算出他们下一次被调度的时间

随后进入一个while true的循环中,先根据被调度的时间顺序进行排序,然后设定一个timer指示第一个被调度程序的时间,随后进入select中,当timer到期后,遍历entries列表,如果e.Next(下一次被调度时间)小于等于当前时间的,那么就会再用一个协程来启动这个任务,并更新这个任务的下一次调度时间.注意在这里为了保证同一时间多个任务到期,会遍历整个数组,但是当任一任务的下一次调度时间大于当前时间,就会停止遍历,这是由于任务数组是有序的.

在一次调度过程完成后,会再次将任务排序,然后设定timer进入下一次调度循环中.这样,一个任务被执行的流程就完成了.

②. 动态添加,删除任务等功能是怎么实现的.

这个的实现,就涉及到Cron结构中的几个channel了

1
2
3
4
stop    chan struct{} // 负责running时发送停止消息的chan
add chan *Entry // 负责running时动态增加任务的chan
remove chan EntryID // 负责running时动态删除任务的chan
snapshot chan chan []Entry // 快照chan

这部分的结构同样在我们上面说到的run方法中被cue到.

我们上面的run方法是不完整的,在while true的循环中省略了一部分关于这里要说到的动态添加 删除的代码.所以我们来看看完整版是怎么样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
for {
select {
case now = <-timer.C:
now = now.In(c.location)
c.logger.Info("wake", "now", now)
// Run every entry whose next time was less than now
for _, e := range c.entries {
if e.Next.After(now) || e.Next.IsZero() {
break
}
c.startJob(e.WrappedJob)
e.Prev = e.Next
e.Next = e.Schedule.Next(now)
c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
}
case newEntry := <-c.add:
timer.Stop()
now = c.now()
newEntry.Next = newEntry.Schedule.Next(now)
c.entries = append(c.entries, newEntry)
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
case replyChan := <-c.snapshot:
replyChan <- c.entrySnapshot()
continue
case <-c.stop:
timer.Stop()
c.logger.Info("stop")
return
case id := <-c.remove:
timer.Stop()
now = c.now()
c.removeEntry(id)
c.logger.Info("removed", "entry", id)
}
break
}

这里的逻辑还是十分清晰的,在select中,除了timer,还会接受add,snapshot,stop,remove这4个channel的消息

  • Add: 停止该次循环的timer,获取当前的时间后,计算新节点的调度时间并加入到entries中

  • Snapshot: 获取快照,这里接受的是一个channel,用于接受回复的消息,我们取到快照数组后,将数组通过这个channel传回调用的函数中.

  • Stop: 停止timer并返回

  • Remove: 停止该次循环的timer,并遍历entries数组将对应id的job移除

③. 包装模式是怎么实现的

在①的添加任务中,我们有这样的一个语句

1
c.chain.Then(cmd)

这个语句就是我们实现包装模式的关键

我们来看一下Then是怎么做的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Then decorates the given job with all JobWrappers in the chain.
//
// This:
// NewChain(m1, m2, m3).Then(job)
// is equivalent to:
// m1(m2(m3(job)))
func (c Chain) Then(j Job) Job {
for i := range c.wrappers {
j = c.wrappers[len(c.wrappers)-i-1](j)
}
return j
}
// Recover panics in wrapped jobs and log them with the provided logger.
func Recover(logger Logger) JobWrapper {
return func(j Job) Job {
return FuncJob(func() {
defer func() {
if r := recover(); r != nil {
const *size* = 64 << 10
buf := make([]byte, *size*)
buf = buf[:runtime.Stack(buf, false)]
err, ok := r.(error)
if !ok {
err = fmt.Errorf("%v", r)
}
logger.Error(err, "panic", "stack", "...\n"+string(buf))
}
}()
j.Run()
})
}
}

Then函数会倒序地调用每一个包装器,每个包装器返回一个参数和返回值都是Job的函数,返回值就是包装后的Job,我们以Recover这个包装为例,它增加了defer func()中进行recover的功能.

关于图片和转载

知识共享许可协议
本作品采用知识共享署名 4.0 国际许可协议进行许可。 转载时请注明原文链接,图片在使用时请保留图片中的全部内容,可适当缩放并在引用处附上图片所在的文章链接,图片使用 Sketch 进行绘制,你可以在 技术文章配图指南 一文中找到画图的方法和素材。