Config

machinery的配置结构体是config包中的Config类型,具体结构如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Config holds all configuration for our program
type Config struct {
	Broker          string           `yaml:"broker" envconfig:"BROKER"`
	DefaultQueue    string           `yaml:"default_queue" envconfig:"DEFAULT_QUEUE"`
	ResultBackend   string           `yaml:"result_backend" envconfig:"RESULT_BACKEND"`
	ResultsExpireIn int              `yaml:"results_expire_in" envconfig:"RESULTS_EXPIRE_IN"`
	AMQP            *AMQPConfig      `yaml:"amqp"`
	SQS             *SQSConfig       `yaml:"sqs"`
	Redis           *RedisConfig     `yaml:"redis"`
	GCPPubSub       *GCPPubSubConfig `yaml:"-" ignored:"true"`
	MongoDB         *MongoDBConfig   `yaml:"-" ignored:"true"`
	TLSConfig       *tls.Config
	// NoUnixSignals - when set disables signal handling in machinery
	NoUnixSignals bool            `yaml:"no_unix_signals" envconfig:"NO_UNIX_SIGNALS"`
	DynamoDB      *DynamoDBConfig `yaml:"dynamodb"`
}

machinery具有用于从环境变量或YAML文件加载构造方便的方法。,例如,从环境变量加载配置:

1
cnf, err := config.NewFromEnvironment(true)

或从YAML文件加载:

1
cnf, err := config.NewFromYaml("config.yml", true)

第二个布尔值标志允许每10秒实时重新加载配置。使用false禁用重新加载。

machinery的配置由Config结构封装,并作为对需要它的对象的依赖项注入。

举例:config.yaml配置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
---
broker: 'redis://localhost:6379'
#broker: 'redis://localhost:6379'
#broker: 'https://sqs.us-west-2.amazonaws.com/123456789012'
default_queue: machinery_tasks

result_backend: 'redis://localhost:6379'
#result_backend: 'memcache://localhost:11211'
#result_backend: 'mongodb://localhost:27017'
results_expire_in: 3600000

broker

消息代理。

AMQP

使用以下格式的AMQP URL:

1
amqp://[username:password@]@host[:port]

例如:

1
amqp://guest:guest@localhost:5672

Redis

使用以下格式之一的Redis URL:

1
2
redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

例如:

1
2
redis://localhost:6379, or with password redis://password@localhost:6379
redis+socket://password@/path/to/file.sock:/0

default_queue

默认队列名称,例如machinery_tasks。

result_backend

后端,用于保留任务状态和结果。

Redis

使用以下格式之一的Redis URL:

1
2
redis://[password@]host[port][/db_num]
redis+socket://[password@]/path/to/file.sock[:/db_num]

例如:

1
2
3
redis://localhost:6379, or with password redis://password@localhost:6379
redis+socket://password@/path/to/file.sock:/0
cluster redis://host1:port1,host2:port2,host3:port3

Memcache

使用以下格式的Memcache URL:

1
memcache://host1[:port1][,host2[:port2],...[,hostN[:portN]]]

例如:

1
2
memcache://localhost:11211 for a single instance, or
memcache://10.0.0.1:11211,10.0.0.2:11211 for a cluster

AMQP

使用以下格式的AMQP URL:

1
amqp://[username:password@]@host[:port]

例如:

1
amqp://guest:guest@localhost:5672

请记住,不建议将AMQP作为后端。原因见下文.

MongoDB

使用以下格式的Mongodb URL:

1
mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]

例如:

1
mongodb://localhost:27017/taskresults

有关更多信息,请参见MongoDB文档。

results_expire_in

任务结果存储多长时间(以秒为单位)。默认为3600(1小时)。

AMQP

RabbitMQ相关的配置。如果您正在使用其他代理/后端,则没有必要。

  • Exchange:交换名称,例如 machinery_exchange
  • ExchangeType:交换类型,例如 direct
  • QueueBindingArguments:绑定到AMQP队列时使用的附加参数的可选映射
  • BindingKey:使用此密钥将队列绑定到交换,例如 machinery_task
  • PrefetchCount:要预取多少个任务(1如果您有长期运行的任务,则设置为)

Server

Machinery必须在使用前实例化。完成此操作的方法是创建一个Server实例。Server是存储Machinery配置和已注册任务的基础对象。

server基本结构如下:

1
2
3
4
5
6
7
type Server struct {
	config            *config.Config
	registeredTasks   map[string]interface{}
	broker            brokersiface.Broker
	backend           backendsiface.Backend
	prePublishHandler func(*tasks.Signature)
}

存储配置

server例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import (
  "github.com/RichardKnop/machinery/v1/config"
  "github.com/RichardKnop/machinery/v1"
)

var cnf = &config.Config{
  Broker:             "amqp://guest:guest@localhost:5672/",
  DefaultQueue:       "machinery_tasks",
  ResultBackend:      "amqp://guest:guest@localhost:5672/",
  AMQP:               &config.AMQPConfig{
    Exchange:     "machinery_exchange",
    ExchangeType: "direct",
    BindingKey:   "machinery_task",
  },
}

server, err := machinery.NewServer(cnf)
if err != nil {
  // do something with the error
}

注册任务

注册任务名和对应的执行函数,下面有更详细的解释.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Register tasks
tasks := map[string]interface{}{
	"add":               exampletasks.Add,
	"multiply":          exampletasks.Multiply,
	"sum_ints":          exampletasks.SumInts,
	"sum_floats":        exampletasks.SumFloats,
	"concat":            exampletasks.Concat,
	"split":             exampletasks.Split,
	"panic_task":        exampletasks.PanicTask,
	"long_running_task": exampletasks.LongRunningTask,
}

err:=server.RegisterTasks(tasks)
if err != nil {
  // do something with the error
}

Worker

注册worker

为了消费任务,您需要让一个或多个workers运行。

worker结构体信息如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Worker represents a single worker process
type Worker struct {
	server          *Server
	ConsumerTag     string
	Concurrency     int
	Queue           string
	errorHandler    func(err error)
	preTaskHandler  func(*tasks.Signature)
	postTaskHandler func(*tasks.Signature)
}

你需要利用server创建一个worker实例来运行并注册worker,例如:

1
worker := server.NewWorker("worker_name", 10)

每个worker将只消费注册的任务。对于队列中的每个任务,Worker.Process()方法将在goroutine中运行。

第一个参数是消费者标签,理想情况下,每个worker都应该有一个唯一的标签(worker1,worker2等)

使用的第二个参数server.NewWorker来限制同时运行的Worker.Process()调用的数量(每个worker)。例如:1将序列化任务执行,而0将使并发执行的任务数不受限制(默认)。

添加Handler

可以为任务开始和结束时添加hook,也可以做错误处理.

当任务返回错误时,默认行为是首先尝试重试该任务(如果可重试),否则记录该错误,然后最终调用任何错误回调。

要对此进行自定义,可以在worker上设置一个自定义的错误处理程序,它不仅可以执行重试失败且触发了错误回调后的日志记录,还可以做更多的事情:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
//这里,我们注入了一些用于错误处理的自定义代码,还有任务开始和结束的钩子
errorhandler := func(err error) {
	log.ERROR.Println("I am an error handler:", err)
}
pretaskhandler := func(signature *tasks.Signature) {
	log.INFO.Println("I am a start of task handler for:", signature.Name)
}
posttaskhandler := func(signature *tasks.Signature) {
	log.INFO.Println("I am an end of task handler for:", signature.Name)
}
worker.SetPostTaskHandler(posttaskhandler)
worker.SetErrorHandler(errorhandler)
worker.SetPreTaskHandler(pretaskhandler)

执行worker

1
2
3
4
err := worker.Launch()
if err != nil {
  // do something with the error
}

Task

Task是machinery的基础概念。Tasks的本质是一个传参和返回值有规定的function,它定义了当worker收到消息时会发生什么。

每个task都需要返回一个错误作为最后的返回值。除了错误,现在还可以返回任意数量的参数。

顾泽task规则的示例function如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Add ...
func Add(args ...int64) (int64, error) {
	sum := int64(0)
	for _, arg := range args {
		sum += arg
	}
	return sum, nil
}

// Multiply ...
func Multiply(args ...int64) (int64, error) {
	sum := int64(1)
	for _, arg := range args {
		sum *= arg
	}
	return sum, nil
}

// SumInts ...
func SumInts(numbers []int64) (int64, error) {
	var sum int64
	for _, num := range numbers {
		sum += num
	}
	return sum, nil
}

// SumFloats ...
func SumFloats(numbers []float64) (float64, error) {
	var sum float64
	for _, num := range numbers {
		sum += num
	}
	return sum, nil
}

// Concat ...
func Concat(strs []string) (string, error) {
	var res string
	for _, s := range strs {
		res += s
	}
	return res, nil
}

// Split ...
func Split(str string) ([]string, error) {
	return strings.Split(str, ""), nil
}

// PanicTask ...
func PanicTask() (string, error) {
	panic(errors.New("oops"))
}

// LongRunningTask ...
func LongRunningTask() error {
	log.INFO.Print("Long running task started")
	for i := 0; i < 10; i++ {
		log.INFO.Print(10 - i)
		time.Sleep(1 * time.Second)
	}
	log.INFO.Print("Long running task finished")
	return nil
}

注册task

在您的worker可以使用任务之前,您需要在服务器上注册它。这可以通过为任务分配唯一的名称来完成:

1
2
3
4
server.RegisterTasks(map[string]interface{}{
  "add":      Add,
  "multiply": Multiply,
})

任务也可以一一注册:

1
2
server.RegisterTask("add", Add)
server.RegisterTask("multiply", Multiply)

简而言之,当一个work收到这样的消息时:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
{
  "UUID": "48760a1a-8576-4536-973b-da09048c2ac5",
  "Name": "add",
  "RoutingKey": "",
  "ETA": null,
  "GroupUUID": "",
  "GroupTaskCount": 0,
  "Args": [
    {
      "Type": "int64",
      "Value": 1,
    },
    {
      "Type": "int64",
      "Value": 1,
    }
  ],
  "Immutable": false,
  "RetryCount": 0,
  "RetryTimeout": 0,
  "OnSuccess": null,
  "OnError": null,
  "ChordCallback": null
}

它将调用Add(1,1)。每个task也应该返回一个错误,以便我们处理失败。

理想情况下,task应该是幂等的,这意味着当使用相同的参数多次调用task时,不会有意外的后果。

Signature

基本类型

Signature 包装了task的调用参数,执行选项(例如immutability)和 success/error 回调,因此可以通过网络将其发送给worker。 Task signature 实现了一个简单的interface:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Arg represents a single argument passed to invocation fo a task
type Arg struct {
  Type  string
  Value interface{}
}

// Headers represents the headers which should be used to direct the task
type Headers map[string]interface{}

// Signature represents a single task invocation
type Signature struct {
	UUID           string
	Name           string
	RoutingKey     string
	ETA            *time.Time
	GroupUUID      string
	GroupTaskCount int
	Args           []Arg
	Headers        Headers
	Priority       uint8
	Immutable      bool
	RetryCount     int
	RetryTimeout   int
	OnSuccess      []*Signature
	OnError        []*Signature
	ChordCallback  *Signature
	//MessageGroupId for Broker, e.g. SQS
	BrokerMessageGroupId string
	//ReceiptHandle of SQS Message
	SQSReceiptHandle string
	// StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq,
  // and don't want machinery to delete from source queue
	StopTaskDeletionOnError bool
	// IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available
	// When this is true a task with no handler will be ignored and not placed back in the queue
	IgnoreWhenTaskNotRegistered bool
}
  • UUID,任务的unique ID,可以主动设定也可以由系统自行设定;
  • Name,任务的名称,用于识别任务;
  • RoutingKey,根据这个key,用于将任务扔到一个正确的队列中;
  • ETA,专用于延时任务,若该参数为nil,说明需要立即将该任务扔给worker,否则,在参数数值到来之前,该任务将一直delay;
  • GroupUUID和GroupTaskCount,用于workflow中的Group分组任务创建
  • Args,任务传递给worker时的参数列表
  • Headers,用于tracing
  • RetryCount和RetryTimeout,用于实现任务的重试机制
  • Immutable,该参数可以控制任务之间是否需要参数传递
  • OnSuccess和OnError,实现回调,workflow中的链式任务(chain),就是通过OnSuccess来实现链式调度
  • ChordCallback,workflow中的chord模式,在group内所有任务全部执行完成后,进行callback。

在将tasks发送到broker之前,Machinery将task编码为JSON。task结果也作为JSON编码的字符串存储在后端。因此,Signature仅支持具有JSON表示形式的类型。当前支持的类型是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
bool
int
int8
int16
int32
int64
uint
uint8
uint16
uint32
uint64
float32
float64
string
[]bool
[]int
[]int8
[]int16
[]int32
[]int64
[]uint
[]uint8
[]uint16
[]uint32
[]uint64
[]float32
[]float64
[]string

示例代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
var (
	addTask0, addTask1, addTask2                      tasks.Signature
	multiplyTask0, multiplyTask1                      tasks.Signature
	sumIntsTask, sumFloatsTask, concatTask, splitTask tasks.Signature
	panicTask                                         tasks.Signature
	longRunningTask                                   tasks.Signature
)

addTask0 = tasks.Signature{
	Name: "add",
	Args: []tasks.Arg{
		{
			Type:  "int64",
			Value: 1,
		},
		{
			Type:  "int64",
			Value: 1,
		},
	},
}
addTask1 = tasks.Signature{
	Name: "add",
	Args: []tasks.Arg{
		{
			Type:  "int64",
			Value: 2,
		},
		{
			Type:  "int64",
			Value: 2,
		},
	},
}
addTask2 = tasks.Signature{
	Name: "add",
	Args: []tasks.Arg{
		{
			Type:  "int64",
			Value: 5,
		},
		{
			Type:  "int64",
			Value: 6,
		},
	},
}
multiplyTask0 = tasks.Signature{
	Name: "multiply",
	Args: []tasks.Arg{
		{
			Type:  "int64",
			Value: 4,
		},
	},
}
multiplyTask1 = tasks.Signature{
	Name: "multiply",
}
sumIntsTask = tasks.Signature{
	Name: "sum_ints",
	Args: []tasks.Arg{
		{
			Type:  "[]int64",
			Value: []int64{1, 2},
		},
	},
}
sumFloatsTask = tasks.Signature{
	Name: "sum_floats",
	Args: []tasks.Arg{
		{
			Type:  "[]float64",
			Value: []float64{1.5, 2.7},
		},
	},
}
concatTask = tasks.Signature{
	Name: "concat",
	Args: []tasks.Arg{
		{
			Type:  "[]string",
			Value: []string{"foo", "bar"},
		},
	},
}
splitTask = tasks.Signature{
	Name: "split",
	Args: []tasks.Arg{
		{
			Type:  "string",
			Value: "foo",
		},
	},
}
panicTask = tasks.Signature{
	Name: "panic_task",
}
longRunningTask = tasks.Signature{
	Name: "long_running_task",
}

设置参数

延迟

您可以通过ETA在Signature 上设置时间戳字段来延迟任务。

1
2
3
// Delay the task by 5 seconds
eta := time.Now().UTC().Add(time.Second * 5)
signature.ETA = &eta

重试

您可以设置多个重试尝试,然后再将任务声明为失败。Fibonacci序列将用于随着时间间隔重试请求。

1
2
// If the task fails, retry it up to 3 times
signature.RetryCount = 3

或者,您可以从任务中返回tasks.ErrRetryTaskLater并指定应重试任务的持续时间,例如:

1
return tasks.NewErrRetryTaskLater("some error", 4 * time.Hour)

发送task

Single

task可以通过Server的sendtask实例调用。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
	asyncResult, err := server.SendTaskWithContext(ctx, &addTask0)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf("1 + 1 = %v\n", tasks.HumanReadableResults(results))

	/*
	* Try couple of tasks with a slice argument and slice return value
	*/
	asyncResult, err = server.SendTaskWithContext(ctx, &sumIntsTask)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf("sum([1, 2]) = %v\n", tasks.HumanReadableResults(results))

	asyncResult, err = server.SendTaskWithContext(ctx, &sumFloatsTask)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf("sum([1.5, 2.7]) = %v\n", tasks.HumanReadableResults(results))

	asyncResult, err = server.SendTaskWithContext(ctx, &concatTask)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf("concat([\"foo\", \"bar\"]) = %v\n", tasks.HumanReadableResults(results))

	asyncResult, err = server.SendTaskWithContext(ctx, &splitTask)
	if err != nil {
		return fmt.Errorf("Could not send task: %s", err.Error())
	}

	results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results))

tasks.HumanReadableResults返回string类型.

Workflows

运行单个异步任务很好,但是通常您会想要设计一种以协调方式执行的任务工作流。有几个有用的功能可以帮助您设计工作流程。

Group

Group 是一组将彼此独立执行的并行任务。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
group, err := tasks.NewGroup(&addTask0, &addTask1, &addTask2)
if err != nil {
	return fmt.Errorf("Error creating group: %s", err.Error())
}
asyncResults, err := server.SendGroupWithContext(ctx, group, 10)//The second parameter specifies the number of concurrent sending tasks. 0 means unlimited.
if err != nil {
	return fmt.Errorf("Could not send group: %s", err.Error())
}
for _, asyncResult := range asyncResults {
	results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
	if err != nil {
		return fmt.Errorf("Getting task result failed with error: %s", err.Error())
	}
	log.INFO.Printf(
		"%v + %v = %v\n",
		asyncResult.Signature.Args[0].Value,
		asyncResult.Signature.Args[1].Value,
		tasks.HumanReadableResults(results),
	)
}

SendGroup返回AsyncResult对象的切片。因此,您可以进行阻塞调用并等待组任务的结果

Chord

Chord允许您定义要在组中的所有任务完成处理之后执行的回调,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
group, err = tasks.NewGroup(&addTask0, &addTask1, &addTask2)
if err != nil {
	return fmt.Errorf("Error creating group: %s", err.Error())
}
chord, err := tasks.NewChord(group, &multiplyTask1)
if err != nil {
	return fmt.Errorf("Error creating chord: %s", err)
}
chordAsyncResult, err := server.SendChordWithContext(ctx, chord, 10)
if err != nil {
	return fmt.Errorf("Could not send chord: %s", err.Error())
}
results, err = chordAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
	return fmt.Errorf("Getting chord result failed with error: %s", err.Error())
}
log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", tasks.HumanReadableResults(results))

上面的示例并行执行&addTask0, &addTask1, &addTask2,将其结果汇总并将其传递给&multiplyTask1。因此,最终将发生的是:

1
(1 + 1) * (2 + 2) * (5 + 6)

Chain

Chain只是一组将逐个执行的任务,每个成功的任务都会触发链中的下一个任务。例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
chain, err := tasks.NewChain(&addTask0, &addTask1, &addTask2, &multiplyTask0)
if err != nil {
	return fmt.Errorf("Error creating chain: %s", err)
}
chainAsyncResult, err := server.SendChainWithContext(ctx, chain)
if err != nil {
	return fmt.Errorf("Could not send chain: %s", err.Error())
}
results, err = chainAsyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
	return fmt.Errorf("Getting chain result failed with error: %s", err.Error())
}
log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", tasks.HumanReadableResult(results))

上面的示例先执行task1,然后执行task2,再执行task3,将每个任务的结果传递给链中的下一个任务。因此,最终将发生的是:

1
(((1 + 1) + (2 + 2)) + (5 + 6)) * 4

获取待处理task

可以检查队列中当前正在等待工作的任务,例如:

1
server.GetBroker().GetPendingTasks("some_queue")

当前仅Redis broker支持。

保存结果

如果您配置result backend,则任务状态和结果将保留下来。可能的状态:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
const (
	// StatePending - initial state of a task
	StatePending = "PENDING"
	// StateReceived - when task is received by a worker
	StateReceived = "RECEIVED"
	// StateStarted - when the worker starts processing the task
	StateStarted = "STARTED"
	// StateRetry - when failed task has been scheduled for retry
	StateRetry = "RETRY"
	// StateSuccess - when the task is processed successfully
	StateSuccess = "SUCCESS"
	// StateFailure - when processing of the task fails
	StateFailure = "FAILURE"
)

当将AMQP用作result backend时,任务状态将针对每个任务保留在单独的队列中。尽管RabbitMQ可以扩展到数千个队列,但强烈建议您在运行大量并行任务时使用更适合的result backend(例如Memcache)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// TaskResult represents an actual return value of a processed task
type TaskResult struct {
  Type  string      `bson:"type"`
  Value interface{} `bson:"value"`
}

// TaskState represents a state of a task
type TaskState struct {
  TaskUUID  string        `bson:"_id"`
  State     string        `bson:"state"`
  Results   []*TaskResult `bson:"results"`
  Error     string        `bson:"error"`
}

// GroupMeta stores useful metadata about tasks within the same group
// E.g. UUIDs of all tasks which are used in order to check if all tasks
// completed successfully or not and thus whether to trigger chord callback
type GroupMeta struct {
  GroupUUID      string   `bson:"_id"`
  TaskUUIDs      []string `bson:"task_uuids"`
  ChordTriggered bool     `bson:"chord_triggered"`
  Lock           bool     `bson:"lock"`
}
  • TaskResult 表示已处理任务的返回值的一部分。

  • TaskState 每次任务状态更改时,都会对struct进行序列化和存储。

  • GroupMeta存储有关同一组内任务的有用元数据。例如,用于检查所有任务是否成功完成以及是否触发和弦回调的所有任务的UUID。

  • AsyncResult 对象允许您检查任务的状态:

1
2
3
taskState := asyncResult.GetState()
fmt.Printf("Current state of %v task is:\n", taskState.TaskUUID)
fmt.Println(taskState.State)

有几种方便的方法可以检查任务状态:

1
2
3
asyncResult.GetState().IsCompleted()
asyncResult.GetState().IsSuccess()
asyncResult.GetState().IsFailure()

您还可以进行同步阻塞调用以等待任务结果:

1
2
3
4
5
6
7
8
results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
  // getting result of a task failed
  // do something with the error
}
for _, result := range results {
  fmt.Println(result.Interface())
}

日志处理

您可以通过实现以下接口来定义Custom Logger:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
type Interface interface {
  Print(...interface{})
  Printf(string, ...interface{})
  Println(...interface{})

  Fatal(...interface{})
  Fatalf(string, ...interface{})
  Fatalln(...interface{})

  Panic(...interface{})
  Panicf(string, ...interface{})
  Panicln(...interface{})
}

然后只需通过调用package中的log包里的set函数设置即可

1
log.Set(myCustomLogger)

Tracing

work:

1
2
3
4
5
cleanup, err := tracers.SetupTracer(consumerTag)
if err != nil {
	log.FATAL.Fatalln("Unable to instantiate a tracer:", err)
}
defer cleanup()

send:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
 * Lets start a span representing this run of the `send` command and
 * set a batch id as baggage so it can travel all the way into
 * the worker functions.
 */
span, ctx := opentracing.StartSpanFromContext(context.Background(), "send")
defer span.Finish()
batchID := uuid.New().String()
span.SetBaggageItem("batch.id", batchID)
span.LogFields(opentracing_log.String("batch.id", batchID))

// Let's try a task which throws panic to make sure stack trace is not lost
asyncResult, err = server.SendTaskWithContext(ctx, &panicTask)
if err != nil {
	return fmt.Errorf("Could not send task: %s", err.Error())
}
_, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
if err == nil {
	return errors.New("Error should not be nil if task panicked")
}
log.INFO.Printf("Task panicked and returned error = %v\n", err.Error())
// Let's try a long running task

asyncResult, err = server.SendTaskWithContext(ctx, &longRunningTask)
if err != nil {
	return fmt.Errorf("Could not send task: %s", err.Error())
}
results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
	return fmt.Errorf("Getting long running task result failed with error: %s", err.Error()
}
log.INFO.Printf("Long running task returned = %v\n", tasks.HumanReadableResults(results))
return nil

参考:https://github.com/RichardKnop/machinery