顺序消息

RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息被顺序消费,如果需要做到全局顺序消费则可以将主题配置成一一个队列,例如数据库BinLog等要求严格顺序的场景。根据并发消息消费的流程,消息消费包含如下4个步骤:消息队列负载、消息拉取、消息消费、消息消费进度存储。

消息队列负载

RocketMQ首先需要通过RebalanceService线程实现消息队列的负载,集群模式下同一个消费组内的消费者共同承担其订阅主题下消息队列的消费,同一个消息消费队列在同一时刻只会被消费组内一个消费者消费,一个消费者同一时刻可以分配多个消费队列。

如果经过消息队列重新负载(分配)后,分配到新的消息队列时,首先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成功则创建该消息队列的拉取任务,否则将跳过,等待其他消费者释放该消息队列的锁,然后在下一次队列重新负载时再尝试加锁。

顺序消息消费与并发消息消费的第一个关键区别:顺序消息在创建消息队列拉取任务时需要在Broker服务器锁定该消息队列。

消息拉取

如果消息处理队列未被锁定,则延迟3s后再将PullRequest对象放人到拉取任务中,如果该处理队列是第一次拉取任务,则首先计算拉取偏移量,然后向消息服务端拉取消息。

消息消费

如果消费模式为集群模式,启动定时任务,默认每隔20s执行一次锁定分配给自己的消息消费队列。通过一Drocketmq.client.rebalance.lockInterval=20000设置间隔,该值建议与一次消息负载频率设置相同。从上文可知,集群模式下顺序消息消费在创建拉取任务时并未将ProcessQueue的locked状态设置为true,在未锁定消息队列之前无法执行消息拉取任务,ConsumeMessageOrderlyService 以每20s的频率对分配给自己的消息队列进行自动加锁操作,从而消费加锁成功的消息消费队列。

顺序消息的ConsumeRequest消费任务不会直接消费本次拉取的消息,而是在消息消费时从处理队列中拉取消息.

如果是广播模式的话,直接进入消费,无须锁定处理队列,因为相互直接无竞争;如果是集群模式,消息消费的前提条件是proceessQueue被锁定并且锁未超时。思考一下,会不会出现当消息队列重新负载时,原先由自己处理的消息队列被另外一个消费者分配,此时如果还未来得及将ProceeQueue解除锁定,就被另外一个消费者添加进去,此时会存储多个消息消费者同时消费一个消息队列?答案是不会的,因为当一个新的消费队列分配给消费者时,在添加其拉取任务之前必须先向Broker发送对该消息队列加锁请求,只有加锁成功后,才能添加拉取消息,否则等到下一次负载后,只有消费队列被原先占有的消费者释放后,才能开始新的拉取任务。集群模式下,如果未锁定处理队列,则延迟该队列的消息消费。

如果消息重试次数大于或等于允许的最大重试次数,将该消息发送到Broker端,该消息在消息服务端最终会进人到DLQ(死信队列),也就是RocketMQ不会再次消费,需要人工干预。如果消息成功进人到DLQ队列,checkReconsumeTimes返回false,该批消息将直接调用ProcessQueue#commit提交,表示消息消费成功,如果这批消息中有任意一条消息的重试次数小于允许的最大重试次数,将返回true,执行消息重试。

RocketMQ顺序消息消费,如果消息重试次数达到允许的最大重试次数并且向Broker服务器发送ACK消息返回成功,也就是成功将该消息存入到RocketMQ的DLQ队列中即认为是消息消费成功,继续该消息消费队列后续消息的消费。

全局顺序消息

RocketMQ在默认情况下不保证顺序,比如创建一一个Topic,默认八个写队列,八个读队列。这时候一条消息可能被写人任意一 一个队列里;在数据的读取过程中,可能有多个Consumer,每个Consumer也可能启动多个线程并行处理,所以消息被哪个Consumer消费,被消费的顺序和写人的顺序是否一致是不确定的。

要保证全局顺序消息,需要先把Topic的读写队列数设置为一,然后Producer和Consumer的并发设置也要是一。简单来说,为了保证整个Topic的全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理。这时高并发、高吞吐量的功能完全用不上了。

在实际应用中,更多的是像订单类消息那样,只需要部分有序即可。在这种情况下,我们经过合适的配置,依然可以利用RocketMQ高并发、高吞吐量的能力。

部分顺序消息

一个Topic会有多个MessageQueue,如果使用Producer的默认配置,这个Producer会轮流向各个MessageQueue发送消息。Consumer在消费消息的时候,会根据负载均衡策略,消费被分配到的MessageQueue,如果不经过特定的设置,某条消息被发往哪个MessageQueue,被哪个Consumer消费是未知的。

要保证部分消息有序,需要发送端和消费端配合处理。在发送端,要做到把同一业务ID的消息发送到同一个Message Queue;在消费过程中,要做到从同一个Message Queue读取的消息不被并发处理,这样才能达到部分有序。

发送端使用MessageQueueSelector类来控制把消息发往哪个MessageQueue.

1
producer.WithQueueSelector(producer.NewManualQueueSelector())

消费端通过使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
		consumer.WithConsumerModel(consumer.Clustering),
		consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
		consumer.WithConsumerOrder(true),
	)
	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
		fmt.Printf("orderly context: %v\n", orderlyCtx)
		fmt.Printf("subscribe orderly callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}

MessageListenerOrderly并不是简单地禁止并发处理。在MessageListenerOrderly的实现中,为每个Consumer Queue加个锁,消费每个消息前,需要先获得这个消息对应的Consumer Queue所对应的锁,这样保证了同一时间,同一个Consumer Queue的消息不被并发消费,但不同Consumer Queue的消息可以并发处理。