导语

异步任务,是每一位开发者都遇到过的技术名词,在任何一个稍微复杂的后台系统中,异步任务总是无法避免的,而任务队列由于其松耦合、易扩展的特性,成为了实现异步任务的可靠保证。

背景

当用户的一次请求事件发生,可能是某种数据的重复数查询,亦或是某批人群的覆盖率统计,展现到用户的是几行数字,但在透视到后端逻辑中,简单的这可能是一次mysql的联表查询或者elasticsearch的聚合,但更多情况下,是附带了一系列复杂的数据交互或者耗时的逻辑计算。当后端这种发生多次数据交互任务的情况一旦存在,为了实现每一次任务的可靠执行以及前端响应速度,任务队列的存在意义就凸显了。

场景与功能

任务队列有着广泛的适应场景:

  • 大批量的计算任务。如大量数据插入,通过拆分并分批插入任务队列,从而实现串行链式任务处理或者实现分组并行任务处理,提高系统鲁棒性,提高系统并发度;
  • 数据预处理。定期的从后端存储将数据同步到到缓存系统,从而在查询请求发生时,直接去缓存系统中查询,提高查询请求的响应速度;
  • 错误重试功能。为了提高系统的可用性,当函数处理出现错误时,我们希望可以给予其重试的机会,增强系统的可用性。

适用于任务队列的场景还有很多,同样,不同语言也有着自己著名的任务队列系统,众所周知的如python下的celery,PHP中laraval框架的Queues,都是使用度十分广泛的任务队列系统。

我们项目的技术栈为golang,因此,在我们go为基础的微服务框架中,需要存在一个类型于celery或者laraval中的任务队列系统,在经过了一系列筛选后,我们采用了machinery作为我们的任务队列系统。machinery,一个第三方开源的基于分布式消息分发的异步任务队列,有着以下这些特性:

  • 任务重试机制
  • 延迟任务支持
  • 任务回调机制
  • 任务结果记录
  • 支持Workflow模式:Chain,Group,Chord
  • 多Brokers支持:Redis, AMQP, AWS SQS
  • 多Backends支持:Redis, Memcache, AMQP, MongoDB

当前machinery在v1 stable版本,可以通过go get github.com/RichardKnop/machinery/v1获取。

架构设计

任务队列,简而言之就是一个放大的生产者消费者模型,用户请求会生成任务,任务生产者不断的向队列中插入任务,同时,队列的处理器程序充当消费者不断的消费任务。基于这种框架设计思想,我们来看下machinery的简单设计结构图例:

其中:

  • Server:业务模块,生成具体任务,可根据业务逻辑中,按交互进行拆分;
  • Broker:存储具体序列化后的任务,machinery中目前支持到Redis, AMQP,和SQS;
  • Worker:工作进程,负责消费者功能,处理具体的任务;
  • Backend:后端存储,用于存储任务执行状态的数据;

在本篇文章中,我们将对上述几个模块进行详细讲解。

Broker

machinery的broker支持多种存储介质:Redis,AMQP和SQS,本篇文章中,我们将以redis来详细介绍,其他类型的存储介质,在实现细节上由于介质的API支持不一可能略有不同,但machinery具体暴露接口类似,有兴趣的读者可以详细再阅读相关源码。

machinery的Broker实现了以下这几种接口,我们将重点介绍起着关键作用的接口:

1
2
3
4
5
6
7
GetConfig() *config.Config
SetRegisteredTaskNames(names []string)
IsTaskRegistered(name string) bool
StartConsuming(consumerTag string, concurrency int, p TaskProcessor) (bool, error)
StopConsuming()
Publish(task *tasks.Signature) error
GetPendingTasks(queue string) ([]*tasks.Signature, error)

启动和停止

当我们使用machinery时,在启动服务之后,StartConsuming()函数将以阻塞轮询的方式去Broker中获取任务并消费处理。而当服务停止之后,StopConsuming()函数将会等待一系列go程结束,以实现gracefully stop。

详细来看StartConsumin()函数,具体源码如下(不相关的代码细节已经省略)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor TaskProcessor) (bool, error) {
	...
  // 获取任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopReceivingChan:
				return
			case <-timer.C:
				if concurrencyAvailable() {
					task, err := b.nextTask(b.cnf.DefaultQueue)
					if err != nil {
						timer.Reset(timerDuration)
						continue
					}
					deliveries <- task
				}
         //并发控制逻辑
				if concurrencyAvailable() {
					timer.Reset(0)//设置timer为0,立即继续消费任务
				} else {
					timer.Reset(timerDuration)//重置timer,等待duration后再尝试消费
				}
			}
		}
	}()

  // 获取延时任务go程
	go func() {
                ...
		for {
			select {
			case <-b.stopDelayedChan:
				return
			default:
				task, err := b.nextDelayedTask(redisDelayedTasksKey)
				if err != nil {
					continue
				}

				signature := new(tasks.Signature)
				decoder := json.NewDecoder(bytes.NewReader(task))
				decoder.UseNumber()
				if err := decoder.Decode(signature); err != nil {
					log.ERROR.Print(NewErrCouldNotUnmarshaTaskSignature(task, err))
				}

				if err := b.Publish(signature); err != nil {
					log.ERROR.Print(err)
				}
			}
		}
	}()
  //执行任务消费
	if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
		return b.retry, err
	}
        ...
}

其中,

参数consumerTag在AMQP作为Broker时有意义;

参数concurrency用来实现任务并发调度的控制。

任务获取

在StartComsuming()中,分别启动了两个go程来并行处理任务,因为针对延时任务和普通任务,machinery将任务存放于两个不同的rediskey中。

  • 对于普通任务,使用nextTask()函数用来从broker中获取任务,在redis作为broker时,machinery使用了LIST类型来存储任务,而nextTask()中使用了BLPOP来阻塞式的读取任务.
  • 对于延时任务,使用nextDelayTask()函数从redis中的ZSET中,根据score来优先获取最近的任务(score为ETA的对应的unixnano值)。

具体来看nextTask()函数和nextDelayTask()函数,如下列出:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// BLPOP出LIST中的数据
func (b *RedisBroker) nextTask(queue string) (result []byte, err error) {
	conn := b.open()
	defer conn.Close()

	items, err := redis.ByteSlices(conn.Do("BLPOP", queue, 1))
	if err != nil {
		return []byte{}, err
	}

	if len(items) != 2 {
		return []byte{}, redis.ErrNil
	}

	result = items[1]

	return result, nil
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 结合WATCH,从ZSET中获取score最小的
func (b *RedisBroker) nextDelayedTask(key string) (result []byte, err error) {
   ...
   for {
      time.Sleep(time.Duration(pollPeriod) * time.Millisecond)
      if _, err = conn.Do("WATCH", key); err != nil {
         return
      }

      now := time.Now().UTC().UnixNano()

      items, err = redis.ByteSlices(conn.Do(
         "ZRANGEBYSCORE",
         key,
         0,
         now,
         "LIMIT",
         0,
         1,
      ))
      if err != nil {
         return
      }
      if len(items) != 1 {
         err = redis.ErrNil
         return
      }

      conn.Send("MULTI")
      conn.Send("ZREM", key, items[0])
      reply, err = conn.Do("EXEC")
      if err != nil {
         return
      }

      if reply != nil {
         result = items[0]
         break
      }
   }

   return
}

特别需要注意的是,由于云服务的盛行,当下的云服务基本上都涵盖了redis服务,且提供了主备方案和集群方案等,但是不论时云服务或者时公司内部的redis服务,对BLPOP的支持可能会受限,这时候我们需要更改nextTask()函数中的BLPOP为LPOP来适应:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
/*
* modified at 20180717
* use LPOP instead of BLPOP, cause L5 redis does not support BLPOP
**/
// nextTask pops next available task from the default queue
func (b *Broker) nextTask(queue string) (result []byte, err error) {
   conn := b.open()
   defer conn.Close()

   item, err := redis.Bytes(conn.Do("LPOP", queue))
   if err != nil {
      return []byte{}, err
   }
   result = item

   return result, nil
}

任务查看

在redis作为Broker时,machinery还提供了一个额外的接口实现(其他接口Broker存储介质未对该接口进行实现)GetPendingTasks()。顾名思义,GetPendingTasks()可以用来查看当前任务队列中处理pending状态,在等待被处理的任务的详细信息。

GetPendingTasks()函数,更多的可以理解为,是作者提供的“接口糖”,方便离线的对任务队列中的任务进行查看,当然,machinery中使用的几种第三方队列作为Broker,基本上都是支持这类数据的单独查看的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error) {
	...
	dataBytes, err := conn.Do("LRANGE", queue, 0, 10)
	if err != nil {
		return nil, err
	}
	results, err := redis.ByteSlices(dataBytes, err)
	if err != nil {
		return nil, err
	}

	taskSignatures := make([]*tasks.Signature, len(results))
	for i, result := range results {
		signature := new(tasks.Signature)
		decoder := json.NewDecoder(bytes.NewReader(result))
		decoder.UseNumber()
		if err := decoder.Decode(signature); err != nil {
			return nil, err
		}
		taskSignatures[i] = signature
	}
	return taskSignatures, nil
}

任务发布

Publish()接口是实现任务发布的函数,将在后续篇幅在对任务做介绍时再单独详细介绍。

Backend

Backend,同样是任务队列不可或缺的一部分,其作用主要是用来存储任务的执行结果的,machinery中支持Redis, Memcache, AMQP, MongoDB四种类型的存储介质来实现Backend。

machinery的Backend,根据其自身的功能特性,实现了以下这几种接口,与Broker类似,我们将重点介绍几个关键的接口(同样,以下接口是不同类型的Backend的实现的接口超集,并不是Redis作为介质时都有的):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// Workflow相关接口

InitGroup(groupUUID string, taskUUIDs []string) error
GroupCompleted(groupUUID string, groupTaskCount int) (bool, error)
GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error)
TriggerChord(groupUUID string) (bool, error)

// 任务状态设置接口
SetStatePending(signature *tasks.Signature) error
SetStateReceived(signature *tasks.Signature) error
SetStateStarted(signature *tasks.Signature) error
SetStateRetry(signature *tasks.Signature) error
SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error
SetStateFailure(signature *tasks.Signature, err string) error
GetState(taskUUID string) (*tasks.TaskState, error)

// Purging stored stored tasks states and group meta data
PurgeState(taskUUID string) error
PurgeGroupMeta(groupUUID string) error

Workflow

我们可以看到,第一批接口有Group和Chord相关的字眼,这就是我们在一开始提到的machinery中Workflow机制。Workflow极大的使能了任务队列的功能,使得machinery更加得心应手。关于Workflow的知识,我们将在下面的篇幅中详细介绍,这儿仅仅简单的介绍这几个接口的功能。

  • InitGroup(),顾名思义,在创建一个Group任务;

  • GroupCompleted(),检查一个Group中所有的任务是否都执行完毕;

  • GroupTaskStates(),返回一个Group中,所有任务的状态

  • TriggerChord(),当Group中任务全部执行完毕后,触发Chrod任务

State

machinery中将任务的状态进行了很详细的划分,通过接口我们就可以看到,machinery支持了以下几种任务中间态:

  • Pending,任务到达Broker
  • Received,任务从Broker中读取成功
  • Started,任务开始执行
  • Retry,任务需要重试
  • Success,任务执行成功
  • Failure,任务执行失败

下面简单列出源码中设置状态接口的使用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// SetStatePending updates task state to PENDING
func (b *RedisBackend) SetStatePending(signature *tasks.Signature) error {
   taskState := tasks.NewPendingTaskState(signature)
   return b.updateState(taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *RedisBackend) SetStateReceived(signature *tasks.Signature) error {
   taskState := tasks.NewReceivedTaskState(signature)
   return b.updateState(taskState)
}
...

Worker

Worker负责了任务队列的执行单元,是任务队列中处理任务的关键元素,也是因此,Worker的接口很少,很直接:

1
2
3
4
Launch()
LaunchAsync(errorsChan chan<- error)
Quit()
Process(signature *tasks.Signature)

启动和停止

Worker启动是通过Launch()启动了一个进程,去订阅默认的任务队列,并且处理收到的任务。LaunchAsync()是Launch()的非阻塞版本,而通过Launch()中的代码,我们发现,其实就是调用了LaunchAsync()。

在LaunchAsync()中,通过开启一个go程,实现了非阻塞式的调用了Broker的StartConsuming()函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (worker *Worker) Launch() error {
   errorsChan := make(chan error)
   worker.LaunchAsync(errorsChan)
   return <-errorsChan
}

// Launch()的非阻塞调用
func (worker *Worker) LaunchAsync(errorsChan chan<- error) {
   ...
   // broker消费者go程,同时负责与broker的断开重连等
   go func() {
      for {
         retry, err := broker.StartConsuming(worker.ConsumerTag, worker.Concurrency, worker)

         if retry {
            if worker.errorHandler != nil {
               worker.errorHandler(err)
            } else {
               log.WARNING.Printf("Broker failed with error: %s", err)
            }
         } else {
            errorsChan <- err // stop the goroutine
            return
         }
      }
   }()
   ...
   }
}

Worker停止是通过Quit()函数来实现,其调用了Broker的StopConsuming()函数,以实现gracefully stop。

1
2
3
4
// Quit tears down the running worker process
func (worker *Worker) Quit() {
   worker.server.GetBroker().StopConsuming()
}

处理

Worker中的Process()函数,将会处理在Broker中的待处理任务,并且负责了任务回调的触发功能。Process()函数的任务流程主要是:

任务检测->任务获取->任务预处理->Tracing处理->任务执行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (worker *Worker) Process(signature *tasks.Signature) error {
   ...
   //根据任务名,获取注册任务
   taskFunc, err := worker.server.GetRegisteredTask(signature.Name)
   if err != nil {
      return nil
   }

   // 更新任务状态 Received
   if err = worker.server.GetBackend().SetStateReceived(signature); err != nil {
      return fmt.Errorf("Set state received error: %s", err)
   }

   // 任务预处理,预防任务出错,导致后面影响worker的运行
   task, err := tasks.New(taskFunc, signature.Args)
   if err != nil {
      worker.taskFailed(signature, err)
      return err
   }

   // tracing处理
   taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name)
   tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature)
   task.Context = opentracing.ContextWithSpan(task.Context, taskSpan)

   // 更新任务状态 Started
   if err = worker.server.GetBackend().SetStateStarted(signature); err != nil {
      return fmt.Errorf("Set state started error: %s", err)
   }

   // 任务执行
   results, err := task.Call()
   if err != nil {
      // If a tasks.ErrRetryTaskLater was returned from the task,
      // retry the task after specified duration
      retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
      if ok {
         return worker.retryTaskIn(signature, retriableErr.RetryIn())
      }

      // Otherwise, execute default retry logic based on signature.RetryCount
      // and signature.RetryTimeout values
      if signature.RetryCount > 0 {
         return worker.taskRetry(signature)
      }

      return worker.taskFailed(signature, err)
   }

   return worker.taskSucceeded(signature, results)
}

machinery中,主要是通过反射实现了任务执行,具体的执行方式,在获取了函数之后与普通的反射无异,详细的介绍在后续篇幅介绍。关于任务执行之后的处理,有可能三种处理:

任务执行成功:

taskSucceeded(),是在一个任务被成功执行后调用,主要负责更新任务状态、触发回调函数或者chord任务中的回调函数(前提是该task是chrod的分组任务中的最后一个任务),关于chord任务,在后面关于Workflow模式中将会详细介绍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {
	// 更新任务状态
	if err := worker.server.GetBackend().SetStateSuccess(signature, taskResults); err != nil {
		return fmt.Errorf("Set state success error: %s", err)
	}
        ...
	// 回调任务
	for _, successTask := range signature.OnSuccess {
        // 当immutable为false时,传递参数
		if signature.Immutable == false {
			// Pass results of the task to success callbacks
			for _, taskResult := range taskResults {
				successTask.Args = append(successTask.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
		worker.server.SendTask(successTask)
	}
        ...
	// 触发chord任务的回掉函数
	shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
	if err != nil {
		return fmt.Errorf("Trigger chord error: %s", err)
	}
        ...
	// 针对group任务的返回值做参数传递
	for _, taskState := range taskStates {
		if !taskState.IsSuccess() {
			return nil
		}

		if signature.ChordCallback.Immutable == false {
			for _, taskResult := range taskState.Results {
				signature.ChordCallback.Args = append(signature.ChordCallback.Args, tasks.Arg{
					Type:  taskResult.Type,
					Value: taskResult.Value,
				})
			}
		}
	}
	// 发送chord任务
	_, err = worker.server.SendTask(signature.ChordCallback)
	if err != nil {
		return err
	}

	return nil
}

任务执行失败:

taskFailed(),是在一个任务执行失败(完全失败,即重试也失败)后调用。需要负责更新任务状态,并触发OnError回调函数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func (worker *Worker) taskFailed(signature *tasks.Signature, taskErr error) error {
   // 任务状态更新 Failure
   if err := worker.server.GetBackend().SetStateFailure(signature, taskErr.Error()); err != nil {
      return fmt.Errorf("Set state failure error: %s", err)
   }

   ...
   // Trigger error callbacks
   for _, errorTask := range signature.OnError {
      // Pass error as a first argument to error callbacks
      args := append([]tasks.Arg{{
         Type:  "string",
         Value: taskErr.Error(),
      }}, errorTask.Args...)
      errorTask.Args = args
      worker.server.SendTask(errorTask)
   }

   return nil
}

任务重试:

关于任务重试,machinery中提供了两种方式来实现。

  1. machinery中通过设置任务的RetryCount和RetryTimeout参数来实现。

  2. 通过返回一个ErrRetryTaskLater类型的值来制定。

由于任务重试,需要依赖于对machinery中任务数据结构的了解,我们将在之后详细介绍。

配置方案

为了掌握如何使用machinery,本文将会同时从调用代码和machinery源码来进行详细介绍,同时将会在每份代码段的初始部分分别标识出。在我们对使用方法进行介绍之前,首先通过machinery的启动配置文件来一探是如何衔接起各个工作模块的。在machinery中,支持两种配置方式,分别是:

  • 基于yaml文件的配置
  • 基于环境变量的配置

基于配置文件

对于配置文件,machinery中支持的格式为yaml,下面是一个基本的machinery的配置文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
# machinery.yaml
broker: 'redis://123456@localhost:6379'
#broker: 'amqp://guest:guest@localhost:5672/'
#broker: 'https://sqs.us-west-2.amazonaws.com/123456789012'

default_queue: 'machinery_tasks'

result_backend: 'redis://123456@localhost:6379'
#result_backend: 'memcache://localhost:11211'
#result_backend: 'mongodb://localhost:27017'

results_expire_in: 36000

amqp:
  binding_key: machinery_task
  exchange: machinery_exchange
  exchange_type: direct
  prefetch_count: 3

其中,

  • broker:broker的地址,可以根据实际使用的存储介质,分别指定Redis、AMQP或AWS SQS;
  • default_queue:broker默认存放任务的队列名称;
  • result_backend:backend配置,用来指定存放结果的介质的配置。可以根据需求,分别制定Redis、memchache或mongodb等;
  • results_expire_in:任务执行结果记录留存于backend保留时间,单位为秒;
  • amqp:为当我们使用AMQP是的详细配置信息;

上面展示的,是一个基本版的配置文件,而machinery中的所有配置,我们可以通过下方的数据结构来了解:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 源码
// machinery完整的配置结构
type Config struct {
	Broker          string       `yaml:"broker" envconfig:"BROKER"`
	DefaultQueue    string       `yaml:"default_queue" envconfig:"DEFAULT_QUEUE"`
	ResultBackend   string       `yaml:"result_backend" envconfig:"RESULT_BACKEND"`
	ResultsExpireIn int          `yaml:"results_expire_in" envconfig:"RESULTS_EXPIRE_IN"`
	AMQP            *AMQPConfig  `yaml:"amqp"`
	SQS             *SQSConfig   `yaml:"sqs"`
	Redis           *RedisConfig `yaml:"redis"`
	TLSConfig       *tls.Config
	// NoUnixSignals - when set disables signal handling in machinery
	NoUnixSignals bool            `yaml:"no_unix_signals" envconfig:"NO_UNIX_SIGNALS"`
	DynamoDB      *DynamoDBConfig `yaml:"dynamodb"`
}

调用yaml文件的配置方式,通过NewFromYaml()接口来完成,NewFromYaml()的基本逻辑中,主要是实现了一个加载配置的函数和reload配置的逻辑,而默认reload间隔reloadDelay为10s:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 源码
func NewFromYaml(cnfPath string, keepReloading bool) (*Config, error) {
   cnf, err := fromFile(cnfPath)
   if err != nil {
      return nil, err
   }
   if keepReloading {
      // 通过go程,实现定时reload配置
      go func() {
         for {
            // Delay after each request
            time.Sleep(reloadDelay)
            // Attempt to reload the config
            newCnf, newErr := fromFile(cnfPath)
            if newErr != nil {
               log.WARNING.Printf("Failed to reload config from file %s: %v", cnfPath, newErr)
               continue
            }
            *cnf = *newCnf
         }
      }()
   }
   return cnf, nil
}

基于环境变量

上面提到,machinery的配置文件方式只支持yaml,然而并不是所有项目都是采用了yaml文件,例如我们目前项目中配置文件普遍使用了toml文件。这样,为了使用machinery就必须在一个项目出现了两个配置文件(项目的toml文件和machinery的yaml文件)。然而,做技术的一般都是有强迫症的,两个配置文件存在于一个项目的确是一个十分ugly的现象,为了解决这一问题,machinery的环境变量配置模式的作用就体现出来了。

基于环境变量的源码与基于配置的结构类似,不在此列出。由于支持了基于环境变量的配置初始化,那么,我们可以将所有的配置均放在同一个配置文件中(对于我们的项目来说,即为toml文件),并在项目init阶段,将machinery的配置从toml文件中全部加载到临时环境变量并读取,从而曲线救国,解决了多配置文件的问题,简单示例如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// 调用代码
//首先将配置文件加载到内存appConfig结构中
// (具体加载到内存中的方法很多,每个项目都有自己的方式,略去)

//然后将相关配置加载到环境变量
os.Setenv("BROKER", brokerValue)
os.Setenv("REDIS_MAX_IDLE", strconv.Itoa(appConfig.GetInt("redis.maxidle"))) //从toml中读取参数
os.Setenv("REDIS_MAX_ACTIVE", strconv.Itoa(appConfig.GetInt("redis.maxactive")))
os.Setenv("REDIS_IDLE_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.idletimeout")))
os.Setenv("REDIS_READ_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.readtimeout")))
os.Setenv("REDIS_WRITE_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.writetimeout")))
os.Setenv("REDIS_CONNECT_TIMEOUT", strconv.Itoa(appConfig.GetInt("redis.connecttimeout")))

//通过NewFromEnvironment()函数加载环境变量
config.NewFromEnvironment(false)

任务

从框架设计到逻辑原理,说了这么久的任务队列,任务,才是所有任务队列中的最基础的元素。从代码层面来看,一个任务就是一个执行函数。结合上篇文章中介绍过的machinery架构,我们可以知道在 machinery中,一个典型的处理流程,即为:

  • 任务创建
  • 任务注册
  • 任务发布
  • 任务执行
  • 结果获取

在下面的篇幅中,我们分别就上述步骤,从使用方法和原理同时解析,来详细介绍在machinery中如何操作和实现这几个过程。

结构

在详细介绍machinery如何处理一次任务流程之前,我们需要首先知道,究竟在machinery中一个任务的数据结构是什么样子的。在machinery的源码世界里,我们可以理解将一个任务称作为Signature,其数据结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// 源码
type Signature struct {
        UUID           string
        Name           string
        RoutingKey     string
        ETA            *time.Time
        GroupUUID      string
        GroupTaskCount int
        Args           []Arg
        Headers        Headers
        Immutable      bool
        RetryCount     int
        RetryTimeout   int
        OnSuccess      []*Signature
        OnError        []*Signature
        ChordCallback  *Signature
}

我们介绍几个有意义的参数:

  • UUID,任务的unique ID,可以主动设定也可以由系统自行设定;
  • Name,任务的名称,用于识别任务;
  • RoutingKey,根据这个key,用于将任务扔到一个正确的队列中;
  • ETA,专用于延时任务,若该参数为nil,说明需要立即将该任务扔给worker,否则,在参数数值到来之前,该任务将一直delay;
  • GroupUUID和GroupTaskCount,用于workflow中的Group分组任务创建
  • Args,任务传递给worker时的参数列表
  • Headers,用于tracing
  • RetryCount和RetryTimeout,用于实现任务的重试机制
  • Immutable,该参数可以控制任务之间是否需要参数传递
  • OnSuccess和OnError,实现回调,workflow中的链式任务(chain),就是通过OnSuccess来实现链式调度
  • ChordCallback,workflow中的chord模式,在group内所有任务全部执行完成后,进行callback。

创建任务

任务的创建在machinery中十分简单,其实就是生成一个Signature的实例,我们看下面的示例代码中,我们创建了一个名为 “audience.DownloadFromCos”的任务,指定了该任务的重试次数为2,重试超时时间为3s,同时设定了两个参数到Args中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 业务代码
const(
    TASK_RETRY_COUNT = 2
    TASK_RETRY_TIMEOUT = 3
)

...

func buildDownloadFromCos(appId, bucketName string) tasks.Signature {
	return tasks.Signature{
		Name: "audience.DownloadFromCos",
		RetryCount: TASK_RETRY_COUNT,
		RetryTimeout: TASK_RETRY_TIMEOUT,
		Args: []tasks.Arg{
			{
				Type: "string",
				Value: appId,
			},
			{
				Type: "string",
				Value: bucketName,
			},
		},
	}
}

注册任务

当任务创建完毕后,我们需要将任务注册到broker中才可供worker识别以调用。machinery中提供了一个RegisterTasks()函数来接受任务的注册,下面的代码中,我们将上述的DownloadFromCos()和另一个ParseCosFile()任务分别以名称为“audience.DownloadFromCos”和“audience.ParseCosFile”注册到machinery中:

1
2
3
4
5
6
7
// 调用代码
// 注册任务
tasks := map[string]interface{}{
   "audience.DownloadFromCos": DownloadFromCos,
   "audience.ParseCosFile": ParseCosFile,
 }
server.RegisterTasks(tasks)

RegisterTasks()会将所有的tasks加载到成员变量registeredTasks(同样为一个map类型)中,同时将通过调用broker接口SetRegisteredTaskNames(),将tasks名称注册到broker中,从而在broker之后接收到待处理任务之后可以判断是否为合法已注册任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
// 源码
func (server *Server) RegisterTasks(namedTaskFuncs map[string]interface{}) error {
	for _, task := range namedTaskFuncs {
		if err := tasks.ValidateTask(task); err != nil {
			return err
		}
	}
	server.registeredTasks = namedTaskFuncs

	server.broker.SetRegisteredTaskNames(server.GetRegisteredTaskNames())
	return nil
}

...

// SetRegisteredTaskNames函数
func (b *Broker) SetRegisteredTaskNames(names []string) {
	b.registeredTaskNames = names
}

发布任务

通过发布任务,broker中才会收到具体的任务内容。machinery中的任务提供了多种任务类型(主要是与Workflow相关,下文会详述),不同的任务有着类似但不相同的发布方法。在当前章节中,我们将首先介绍最基本的普通任务的发布:

1
2
3
4
5
6
7
// 业务代码
// 发送一个任务,taskObj由buildDownloadFromCos()返回
asyncResult, err := machinery.SendTask(&taskObj)
if err != nil {
   log.LoggerFromContextWithCaller(ctx).Errorf(err.Error())
   return
}

SendTask()中会调用Publish()函数去发布任务到broker,我们看到Publish()函数接受一个Signature类型的变量,首先将会根据任务参数ETA去判断任务类型是实时任务还是延时任务,从而扔到不同的任务队列中。一旦任务被扔到Broker中,worker就可以去获取并执行任务了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 源码
var redisDelayedTasksKey = "delayed_tasks"

func (b *RedisBroker) Publish(signature *tasks.Signature) error {
        ...
	// 根据ETA判断普通任务还是延时任务
  // ETA不为nil,延时任务,将ETA作为score到ZSET
	if signature.ETA != nil {
		now := time.Now().UTC()

		if signature.ETA.After(now) {
			score := signature.ETA.UnixNano()

			_, err = conn.Do("ZADD", redisDelayedTasksKey, score, msg)
			return err
		}
	}

  // ETA为nil,普通任务,到LIST
	_, err = conn.Do("RPUSH", signature.RoutingKey, msg)
	return err
}

获取结果

1
2
3
4
5
6
7
// 业务代码
// asyncResult由上述发布任务函数machinery.SendTask()所返回
results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
        log.LoggerWrapperWithCaller().Errorf(err.Error())
        return
}

其中,asyncResult.Get()函数为一个异步阻塞函数,根据设定的参数轮询从backends中获取任务执行结果。以redis作为backends时为例,每一个任务的执行结果会存储于一条redis的key-value中,所以Get()函数最终会调用GET方法,根据任务的UUID,去redis中获取结果:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 源码
// 根据uuid,获取该任务最近的状态
func (b *RedisBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
	conn := b.open()
	defer conn.Close()

	item, err := redis.Bytes(conn.Do("GET", taskUUID))
	if err != nil {
		return nil, err
	}

	state := new(tasks.TaskState)
	decoder := json.NewDecoder(bytes.NewReader(item))
	decoder.UseNumber()
	if err := decoder.Decode(state); err != nil {
		return nil, err
	}

	return state, nil
}

通过上面的介绍,我们基本知道了在machinery中如何走一个最简单的任务操作及其原理。但是,如同我们在系列一中说到的,任务队列仅有这些功能是远远不够的,接下来我们将继续介绍machinery中的额外的任务队列功能。

重试机制

任务重试,算是任务队列的一个除了基本功能外的一个重要的基础功能,在上文中也已经顺带提到了。之所以会单独拎出来说,machinery中提供了两种方式来实现任务重试。

对于第一种,在上文中已经提到过,machinery中通过设置任务的RetryCount和RetryTimeout参数来实现,当任务执行出错后,会通过这两个参数来更新任务的ETA参数。RetryCount提供了重试次数,RetryTimeout提供了一个基于斐波那契数列的回退超时机制。

关于第二种,通过返回一个ErrRetryTaskLater类型的值来制定。

1
2
3
4
5
6
// 源码
// ErrRetryTaskLater ...
type ErrRetryTaskLater struct {
	name, msg string
	retryIn   time.Duration
}

同时需要注意的是,第二种方式的优先级高于第一种,也就是说,如果在新建任务的时候指定了RetryCount和RetryTimeout参数,但是在执行任务失败后返回了ErrRetryTaskLater类型的数值,依然按照返回值里的参数来制定重试方式。

在系列之一的最后部分,我们提到了Worker处理中的Process()接口的实现,下面让我们再来看一下Process()实现中的任务重试部分的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 源码
// worker执行任务的函数
func (worker *Worker) Process(signature *tasks.Signature) error {

	...

	// 任务执行失败后,判定重试的逻辑
	results, err := task.Call()
	if err != nil {
		// If a tasks.ErrRetryTaskLater was returned from the task,
		// retry the task after specified duration
		retriableErr, ok := interface{}(err).(tasks.ErrRetryTaskLater)
		if ok {
			return worker.retryTaskIn(signature, retriableErr.RetryIn())
		}
		// Otherwise, execute default retry logic based on signature.RetryCount
		// and signature.RetryTimeout values
		if signature.RetryCount > 0 {
			return worker.taskRetry(signature)
		}
		return worker.taskFailed(signature, err)
	}

	...

}

我们看到,当任务执行失败后,将会首先判断tasks.ErrRetryTaskLater是否有被实现,根据是否实现了该接口,分别调用retryTaskIn或者taskRetry函数。其中,

  • retryTaskIn()函数将任务参数ETA刷新,并重新发布该任务;
  • retryTask()函数将任务参数RetryCount减1,同时根据RetryTimeout参数更新ETA,并重新发布该任务;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 源码
func (worker *Worker) retryTaskIn(signature *tasks.Signature, retryIn time.Duration) error {
   // 更新任务状态为Retry
   if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
      return fmt.Errorf("Set state retry error: %s", err)
   }

   // 将参数ETA更新为当前时间+retryIn.Seconds()
   eta := time.Now().UTC().Add(retryIn)
   signature.ETA = &eta

   log.WARNING.Printf("Task %s failed. Going to retry in %.0f seconds.", signature.UUID, retryIn.Seconds())

   // 重新发布任务
   _, err := worker.server.SendTask(signature)
   return err
}
// 源码
func (worker *Worker) taskRetry(signature *tasks.Signature) error {
   // Update task state to RETRY
   if err := worker.server.GetBackend().SetStateRetry(signature); err != nil {
      return fmt.Errorf("Set state retry error: %s", err)
   }

   // RetryCount - 1
   signature.RetryCount--

   // 更新Retrytimeout时间
   signature.RetryTimeout = retry.FibonacciNext(signature.RetryTimeout)

   // 更新ETA参数
   eta := time.Now().UTC().Add(time.Second * time.Duration(signature.RetryTimeout))
   signature.ETA = &eta

   log.WARNING.Printf("Task %s failed. Going to retry in %d seconds.", signature.UUID, signature.RetryTimeout)

   // 重新发布任务
   _, err := worker.server.SendTask(signature)
   return err
}

WorkFlow

运行一个任务,支持实时执行、或者延时执行,同时在出错后支持重试等功能,这些在部分场合已经十分适用了,但是,在更多场景下,我们需要执行的任务之间有上下依赖需要串行执行、或者任务之间完全并行不相关、又或者根据结果成功失败来执行回调的需求。为了满足这类需求,machinery中Workflow模式的作用体现出来了。

Chain链式任务

所谓任务链式调度,即一系列任务之间采用one by one的串行调度,只有前一个任务执行完毕,才会执行后一个任务。我们依然根据上文介绍普通任务的流程来介绍在machinery中对于链式任务的处理流程:

任务创建

machinery中提供了NewChain()接口来实现链式任务的创建,如下所示,我们分别创建了两个任务实例task0和task1,然后通过NewChain()生成了链式任务chain实例:

1
2
3
4
5
6
7
8
// 调用代码
task0 := buildDownloadFromCos("audience.DownloadFromCos",appId, bucketName)
task1 := buildDoCompare("audience.DoCompare",appId, upstreamId)
chain, err := tasks.NewChain(&task0,&task1)
if err != nil {
    log.LoggerWrapperWithCaller().Errorf(err.Error())
    return
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 源码
func NewChain(signatures ...*Signature) (*Chain, error) {
   // uuid的生成逻辑
   for _, signature := range signatures {
      if signature.UUID == "" {

         signatureID, err := uuid.NewV4()

         if err != nil {
            return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
         }

         signature.UUID = fmt.Sprintf("task_%v", signatureID)
      }
   }

   // 将所有任务拼装,并传递给Chain数据结构中的Tasks变量
   for i := len(signatures) - 1; i > 0; i-- {
      if i > 0 {
         signatures[i-1].OnSuccess = []*Signature{signatures[i]}
      }
   }

   chain := &Chain{Tasks: signatures}

   return chain, nil
}

...

// 链式任务数据结构
type Chain struct {
	Tasks []*Signature
}

我们重点看一下上述的任务拼装部分,通过链式任务的拼装,最终我们通过chain任务中的第一个任务,就可以看到chain中的所有任务,下方是一个chain任务存储于Broker中的示例,该chain任务中,一共有2个基本任务task,分别通过OnSuccess来实现任务的连接调用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Chain任务在Broker中的示例
{
    "UUID": "task_14a0f22c-b456-493c-9c9c-7cd2216d4339",
    "Name": "audience.ExtractEsToFile",
    "RoutingKey": "machinery_tasks",
    "ETA": null,
    "GroupUUID": "",
    "GroupTaskCount": 0,
    "Args": [
        {
            "Name": "",
            "Type": "string",
            "Value": "123456789"
        },
        {
            "Name": "",
            "Type": "string",
            "Value": "audience/2018-06-07/4703668260587260.data"
        },
        {
            "Name": "",
            "Type": "string",
            "Value": "[{\"field\":\"gender.keyword\",\"value\":[\"女\"],\"operation\":\"and\"},{\"field\":\"job_title.keyword\",\"value\":[\"快?~R佑~X\"],\"operation\":\"and\"}} ,{\"field\":\"school_role.keyword\",\"value\":[\"维修工\"],\"operation\":\"and\"}]"
        }
    ],
    "Headers": null,
    "Immutable": false,
    "RetryCount": 3,
    "RetryTimmeout": 5,
    "OnSuccess": [
        {
            "UUID": "task_cab90b2c-2dc5-4035-91f6-4803df67c8c0",
            "Name": "audience.EncryptExtractFile",
            "RoutingKey": "",
            "ETA": null,
            "GroupUUID": "",
            "GroupTaskCount": 0,
            "Args": [
                {
                    "Name": "",
                    "Type": "string",
                    "Value": "123456789"
                },
                {
                    "Name": "",
                    "Type": "int64",
                    "Value": 29
                }
            ],
            "Headers": null,
            "Immutable": false,
            "RetryCount": 3,
            "RetryTimeout": 5,
            "OnSuccess": [
                {
                    "UUID": "task_82f4f831-af24-4f71-950b-5d11abac3dea",
                    "Name": "audience.UploadExtractFileToCos",
                    "RoutingKey": "",
                    "ETA": null,
                    "GroupUUID": "",
                    "GroupTaskCount": 0,
                    "Args": [
                        {
                            "Name": "",
                            "Type": "string",
                            "Value": "123456789"
                        },
                        {
                            "Name": "",
                            "Type": "string",
                            "Value": "beta-abc"
                        },
                        {
                            "Name": "",
                            "Type": "string",
                            "Value": "ap-shanghai"
                        },
                        {
                            "Name": "",
                            "Type": "string",
                            "Value": "audience/2018-06-07/4703668260587260.data"
                        }
                    ],
                    "Headers": null,
                    "Immutable": false,
                    "RetryCount": 3,
                    "RetryTimeout": 5,
                    "OnSuccess": null,
                    "OnError": null,
                    "ChordCallback": null
                }
            ],
            "OnError": null,
            "ChordCallback": null
        }
    ],
    "OnError": null,
    "ChordCallback": null
}

任务发布

链式任务发布的方法与普通任务接口设计基本一致,直接通过调用SendChain()接口即可:

1
2
3
4
5
6
// 调用代码
asyncResult, err := mc.SendChain(&chain)
if err != nil {
 log.LoggerFromContextWithCaller(ctx).Errorf(err.Error())
 return
}

结合上述对chain任务创建的介绍,我们来看下SendChain()代码实现。如上文描述,由于第一个任务中已经包含了所有的后续任务信息,发布一个chain任务,实际上只需要发布第一个任务到Broker中即可:

1
2
3
4
5
6
7
8
9
// 源码
func (server *Server) SendChain(chain *tasks.Chain) (*backends.ChainAsyncResult, error) {
   _, err := server.SendTask(chain.Tasks[0])
   if err != nil {
      return nil, err
   }

   return backends.NewChainAsyncResult(chain.Tasks, server.backend), nil
}

因此,真正实现的chain中后续的任务调用,是由worker对每一个任务的OnSuccess参数检查来实现调度的,当worker中每一个任务调用成功后,都会触发taskSucceeded()执行,taskSucceeded中则会针对OnSuccess进行判断,如果发现存在OnSuccess,则会发布下一个任务,从而实现了任务的链式调度:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 源码
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {

   ...

   for _, successTask := range signature.OnSuccess {
      if signature.Immutable == false {
         // Pass results of the task to success callbacks
         for _, taskResult := range taskResults {
            successTask.Args = append(successTask.Args, tasks.Arg{
               Type:  taskResult.Type,
               Value: taskResult.Value,
            })
         }
      }

      worker.server.SendTask(successTask)
   }

   ...

}

结果获取

链式任务的结果获取,获取的是最后一个任务的执行结果,调用方式与普通任务获取无异,方式如下:

1
2
3
4
5
6
// 调用代码
results, err := chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
log.LoggerWrapperWithCaller().Errorf(err.Error())
return
}

参数传递

这时候,细心的同学可能会问了,如果串行执行的任务之间有参数依赖需要传递的话,如何实现呢?这时候我们回头看一下Signature的参数中有一个Immutable,chain任务会根据每一个任务的Immutable参数的数值来决定是否帮我们将上一个任务的返回值通过参数传递给chain中的下一个任务,如果immutable为true,那么参数将不会被传递下去,实现该部分的代码在任务执行成功的taskSucceeded()函数中。我们会看上一小节的taskSucceeded()代码,就可以看到对每一个任务的Immutable检查。

Group分组任务

分组任务,听起来可能不太理解,但如果换成另一个名词:并行任务,就很好理解了。即一个分组group内的所有任务,是相互独立,同时执行的,这在任务队列中也是有着相当大的使用场景的。

任务创建

machinery中创建分组任务的接口是NewGroup(),可以接受多个普通任务参数,并生成一个group实例:

1
2
3
4
5
// 调用代码
group, err := tasks.NewGroup(&task0, &task1, &task2)
if err != nil {
    log.LoggerWrapperWithCaller().Errorf(err.Error())
}

在Group中的所有任务,没有执行顺序的要求,所有的任务将会由所有worker去竞争获取并执行。也是因此,创建分组任务的实现也十分简单,只需要进行简单的复制即可:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 源码
func NewGroup(signatures ...*Signature) (*Group, error) {
   // 创建Group的uuid
   groupUUID, err := uuid.NewV4()

   ...

   for _, signature := range signatures {
      if signature.UUID == "" {

         signatureID, err := uuid.NewV4()

         if err != nil {
            return nil, fmt.Errorf("Error generating signature id: %s", err.Error())
         }

         signature.UUID = fmt.Sprintf("task_%v", signatureID)
      }
      signature.GroupUUID = groupID
      signature.GroupTaskCount = len(signatures)
   }

   return &Group{
      GroupUUID: groupID,
      Tasks:     signatures,
   }, nil
}

...

// 分组任务数据结构
type Group struct {
	GroupUUID string
	Tasks     []*Signature
}

任务发布

machinery中发布分组任务接口是SendGroup(),支持将任务并行发布到Broker中,同时还支持了一个发送任务的并发控制,以防止同一时刻发布太多任务到任务队列,照顾到Broker的性能。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
asyncResults, err := server.SendGroup(group, /*并发任务数量*/5)
if err != nil {
   return fmt.Errorf("Could not send group: %s", err.Error())
}
SendGroup()接口的代码如下不必要的地方已经省略同时添加了部分注释

// 源码
func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*backends.AsyncResult, error) {

   ...

   wg.Add(len(group.Tasks))
   errorsChan := make(chan error, len(group.Tasks)*2)

   // 初始化group,主要负责在backend中根据group uuid创建一条记录,以存储该group任务的状态
   server.backend.InitGroup(group.GroupUUID, group.GetUUIDs())

   // 任务状态设置
   for _, signature := range group.Tasks {
      if err := server.backend.SetStatePending(signature); err != nil {
         errorsChan <- err
         continue
      }
   }

   // 并发控制
   pool := make(chan struct{}, sendConcurrency)
   go func() {
      for i := 0; i < sendConcurrency; i++ {
         pool <- struct{}{}
      }
   }()

   for i, signature := range group.Tasks {
      if sendConcurrency > 0 {
         <-pool
      }

      go func(s *tasks.Signature, index int) {
         defer wg.Done()
         // 发布任务
         err := server.broker.Publish(s)

         if sendConcurrency > 0 {
            pool <- struct{}{}
         }

         if err != nil {
            errorsChan <- fmt.Errorf("Publish message error: %s", err)
            return
         }

         asyncResults[index] = backends.NewAsyncResult(s, server.backend)
      }(signature, i)
   }

   ...

}

结果获取

分组任务中每个任务的执行结果,全部存储于asyncResult 中,可通过遍历获取:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
	// 调用代码
	for _, asyncResult := range asyncResults {
		results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
		if err != nil {
			return fmt.Errorf("Getting task result failed with error: %s", err.Error())
		}
		log.INFO.Printf(
			"%v + %v = %v\n",
			asyncResult.Signature.Args[0].Value,
			asyncResult.Signature.Args[1].Value,
			tasks.HumanReadableResults(results),
		)
    }

Chord任务

chord任务,其功能是分组任务+回调任务。即,chord任务允许我们在并行执行完毕所有一组group任务之后,回调一个callback任务,这也是一个有着非常多的应用场景。

任务创建

machinery通过NewChord()接口实现chord任务创建,接受两个参数,分别是group任务实例和回调函数:

1
2
3
4
5
// 调用代码
chord, err := tasks.NewChord(group, &cbTask)
if err != nil {
   return fmt.Errorf("Error creating chord: %s", err)
}

而NewChord()函数主要负责给group中每个任务的ChordCallback赋值为需要回调的函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 源码
func NewChord(group *Group, callback *Signature) (*Chord, error) {

   ...

   for _, signature := range group.Tasks {
      signature.ChordCallback = callback
   }
   return &Chord{Group: group, Callback: callback}, nil
}

...

// Chord任务数据结构
type Chord struct {
	Group    *Group
	Callback *Signature
}

任务发布

chord任务发布比较也比较简单,接口SendChord()与SendGroup()类似,而SendChord()中的代码逻辑也比较简单,即简单的warp了一下SendGroup任务。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 调用代码
chordAsyncResult, err := server.SendChord(chord, /*并发任务数量*/5)
if err != nil {
   return fmt.Errorf("Could not send chord: %s", err.Error())
}
// 源码
func (server *Server) SendChord(chord *tasks.Chord, sendConcurrency int) (*backends.ChordAsyncResult, error) {
   _, err := server.SendGroup(chord.Group, sendConcurrency)
   if err != nil {
      return nil, err
   }

   return backends.NewChordAsyncResult(
      chord.Group.Tasks,
      chord.Callback,
      server.backend,
   ), nil
}

而主要对chord的回调函数的调用,则同样是通过任务执行成功后的执行函数taskSucceeded()来实现,其相关代码如下,具体的逻辑可以从添加的注释中了解:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 源码
func (worker *Worker) taskSucceeded(signature *tasks.Signature, taskResults []*tasks.TaskResult) error {

   ...

   // 若该任务不是groupe任务中的, return
	if signature.GroupUUID == "" {
		return nil
	}

	// 检查是否group中的所有任务都执行完毕
	groupCompleted, err := worker.server.GetBackend().GroupCompleted(
		signature.GroupUUID,
		signature.GroupTaskCount,
	)
	if err != nil {
		return fmt.Errorf("Group completed error: %s", err)
	}
	if !groupCompleted {
		return nil
	}

   ...

   // 若无chord callback, return
   if signature.ChordCallback == nil {
      return nil
   }

   // 触发chord callback, 确保只触发一次
   shouldTrigger, err := worker.server.GetBackend().TriggerChord(signature.GroupUUID)
   if err != nil {
      return fmt.Errorf("Trigger chord error: %s", err)
   }
   if !shouldTrigger {
      return nil
   }

   ...

   // 发送chord任务
   _, err = worker.server.SendTask(signature.ChordCallback)
   if err != nil {
      return err
   }

   return nil
}

结果获取

chord任务的结果获取与其他的操作无异,如下所示:

1
2
3
4
5
// 调用代码
results, err = chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
   return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
}

总结

我们详细的介绍了golang中任务队列machinery的使用和原理,从而为众多golang使用者们推荐了一款好用的任务队列,并从源码层解析了其详细实现。

同时,本文主要是基于Redis作为存储介质来进行详细介绍,而Redis中缺乏如Ack之类的机制,尽管可以通过LUA脚本简介实现,但是在云时代的Redis集群对EVAL接口支持的能力还有不足,使得使用起来还是略有欠缺。因此,对于任务可靠性要求更高的,可以使用基于AMQP的方案来使用,更多关于AMQP的实现接口,基本与Redis下一致,大家可以进一步阅读源码。

转载: https://cloud.tencent.com/developer/article/1169675 https://cloud.tencent.com/developer/article/1177831