概述
消息消费以组的模式开展,一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题,消费组之间有集群模式与广播模式两种消费模式。集群模式,主题下的同一条消息只允许被其中一个消费者消费。广播模式,主题下的同一条消息将被集群内的所有消费者消费一次。消息服务器与消费者之间的消息传送也有两种方式:推模式、拉模式。所谓的拉模式,是消费端主动发起拉消息请求,而推模式是消息到达消息服务器后,推送给消息消费者。RocketMQ 消息推模式的实现基于拉模式,在拉模式上包装一层,一个拉取任务完成后开始下一个拉取任务。
集群模式下,多个消费者如何对消息队列进行负载呢?消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消费者消费,一个消费者可以消费多个消息队列。
RocketMQ支持局部顺序消息消费,也就是保证同一个消息队列上的消息顺序消费。不支持消息全局顺序消费,如果要实现某一主题的全局顺序消息消费,可以将该主题的队列数设置为1,牺牲高可用性。
RocketMQ支持两种消息过滤模式:表达式(TAG、SQL92)与类过滤模式。
消息拉模式,主要是由客户端手动调用消息拉取API,而消息推模式是消息服务器主动将消息推送到消息消费端,本章将以推模式为突破口重点介绍RocketMQ消息消费实现原理。
广播与集群模式
RocketMQ支持两种消息模式:Clustering和Broadcasting。
-
在Clustering模式下,同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体,从而达到负载均衡的目的。默认为Clustering模式
-
在Broadcasting模式下,同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息,也就是一个消息会被多次分发,被多个Consumer消费。
Clustering模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("subscribe callback: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shutdown Consumer error: %s", err.Error())
}
}
|
Broadcasting模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerModel(consumer.BroadCasting),
)
err := c.Subscribe("min", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("Shutdown Consumer error: %s", err.Error())
}
}
|
推拉模式
根据使用者对读取操作的控制情况,消费者可分为两种类型。一个是DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传人的处理方法来处理;另一个是DefaultMQPullConsumer,读取操作中的大部分功能由使用者自主控制。
推模式
基本使用
使用DefaultMQPushConsumer主要是设置好各种参数和传人处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset,而且加入新的DefaultMQPushConsumer后会自动做负载均衡。
DefaultMQPushConsumer需要设置三个参数:
- Consumer的GroupName
- NameServer的地址和端口号
- Topic的名称
下面将分别进行详细介绍。
GroupName
Consumer的GroupName用于把多个Consumer组织到一起,提高并发处理能力,GroupName需要和消息模式(MessageModel)配合使用。
1
2
|
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"))
|
NameServer
NameServer的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的,比如“ipl:port;p2:port;ip3:port"。
1
2
3
|
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"))
)
|
Topic
Topic 名称用来标识消息类型,需要提前创建。如果不需要消费某个Topic下的所有消息,可以通过指定消息的Tag进行消息过滤,比如:
1
2
3
4
5
6
7
8
9
|
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expression: "TagA || TagC",
}
err := c.Subscribe("TopicTest", selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback: %v \n", msgs)
return consumer.ConsumeSuccess, nil
})
|
表示这个Consumer要消费“TopicTest”下带有TagA或tagB的消息(Tag 是在发送消息时设置的标签)。在填写Tag参数的位置,用空结构体表示要消费这个Topic的所有消息。
消费者启动流程
- 构建主题订阅信息SubscriptionData并加入到RebalanceImpl的订阅消息中。订阅关系来源主要有两个。
- 通过调用DefaultMQPushConsumerImpl#subscribe ( String topic, String subExpression)方法。
- 订阅重试主题消息。从这里可以看出,RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。
- 初始化MQClientInstance、RebalanceImple ( 消息重新负载实现类)等。
- 初始化消息进度。如果消息消费是集群模式,那么消息进度保存在Broker上;如果是广播模式,那么消息消费进度存储在消费端。
- 根据是否是顺序消费,创建消费端消费线程服务。ConsumeMessageService 主要负责消息消费,内部维护一个线程池。
- 向MQClientInstance注册消费者,并启动MQClientInstance,在一个进程中的所有消费者、生产者持有同一个MQClientInstance,MQClientInstance 只会启动一次。
消息处理
消息的处理逻辑是在这个函数里的func (pc *pushConsumer) pullMessage(request *PullRequest)
中。在pullMessage函数里有个switch语句,根据从Broker返回的消息类型做相应的处理,具体处理逻辑可以查看源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
pc.processPullResult(request.mq, result, sd)
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
case primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("the pull request offset illegal", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
|
为什么“PushConsumer”中使用"pullMessage"呢?这是通过“长轮询”方式达到Push效果的方法,长轮询方式既有Pull的优点,又兼具Push方式的实时性。
Push方式是Server端接收到消息后,主动把消息推送给Client端,实时性高。对于一个提供队列服务的Server来说,用Push方式主动推送有很多弊端:首先是加大Server端的工作量,进而影响Server的性能;其次,Client 的处理能力各不相同,Client的状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。
Pull方式是Client端循环地从Server端拉取消息,主动权在Client手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull 方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull的时间间隔太长,Server端有消息到来时,有可能没有被及时处理。
“长轮询”方式通过Client端和Server端的配合,达到既拥有Pull的优点,又能达到保证实时性的目的。
1
2
3
4
5
|
func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option {
return func(options *consumerOptions) {
options.SuspendCurrentQueueTimeMillis = suspendT
}
}
|
源码中有这一行设置语句func WithSuspendCurrentQueueTimeMillis(suspendT time.Duration) Option
,作用是设置Broker最长阻塞时间,注意是Broker在没有新消息的时候才阻塞,有消息会立刻返回。
服务端接到新消息请求后,如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5秒),然后后再Check。默认情况下当Broker一直没有新消息,第三次Check的时候,等待时间超过Request里面的Broker一SuspendMax TimeMillis,就返回空结果。在等待的过程中,Broker 收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是,Broker端HOLD住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中,Broker 即使有大量消息积压,也不会主动推送给Consumer。
长轮询方式的局限性,是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
流量控制
PushConsumer的核心还是Pull方式,所以采用这种方式的客户端能够根据自身的处理速度调整获取消息的操作速度。因为采用多线程处理方式实现,流量控制的方面比单线程要复杂得多。
PushConsumer有个线程池,消息处理逻辑在各个线程里同时执行.
Pull获得的消息,如果直接提交到线程池里执行,很难监控和控制,比如,如何得知当前消息堆积的数量?如何重复处理某些消息?如何延迟处理某些消息?RocketMQ定义了一个快照类ProcessQueue来解决这些问题,在PushConsumer运行的时候,每个Message Queue都会有个对应的ProcessQueue对象,保存了这个MessageQueue消息处理状态的快照。
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key,以消息内容的引用为Value,保存了所有从MessageQueue获取到,但是还未被处理的消息;读写锁控制着多个线程对TreeMap对象的并发访问。
有了ProcessQueue对象,流量控制就方便和灵活多了,客户端在每次Pull请求前会做下面三个判断来控制流量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if !pc.consumeOrderly {
if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
rlog.Warning("the queue's messages span too long, so do flow control", map[string]interface{}{
"ConsumeConcurrentlyMaxSpan": pc.option.ConsumeConcurrentlyMaxSpan,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"maxSpan": pq.getMaxSpan(),
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
}
|
从代码中可以看出,PushConsumer 会判断获取但还未处理的消息个数、消息总大小、Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。
DefaultMQPullConsumer
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数,写处理消息的函数,同时还需要做额外的事情。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package main
import (
"context"
"fmt"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
func main() {
c, err := rocketmq.NewPullConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
)
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
}
err = c.Start()
if err != nil {
rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)
}
ctx := context.Background()
queue := primitive.MessageQueue{
Topic: "TopicTest",
BrokerName: "", // replace with your broker name. otherwise, pull will failed.
QueueId: 0,
}
offset := int64(0)
for {
resp, err := c.PullFrom(ctx, queue, offset, 10)
if err != nil {
if err == rocketmq.ErrRequestTimeout {
fmt.Printf("timeout \n")
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("unexpectable err: %v \n", err)
return
}
if resp.Status == primitive.PullFound {
fmt.Printf("pull message success. nextOffset: %d \n", resp.NextBeginOffset)
for _, msg := range resp.GetMessageExts() {
fmt.Printf("pull msg: %v \n", msg)
}
}
offset = resp.NextBeginOffset
}
}
|
示例代码的处理逻辑是逐个读取某Topic下所有MessageQueue的内容,读完一遍后退出,主要处理额外的三件事情:
- 获取MessageQueue并遍历
一个Topic包括多个MessageQueue,如果这个Consumer需要获取Topic下所有的消息,就要遍历所有的Message Queue。如果有特殊情况,也可以选择某些特定的Message Queue来读取消息。
- 维护Offsetstore
从一个Message Queue里拉取消息的时候,要传人Offset参数(long类型的值),随着不断读取消息,Offset会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。
- 根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回: FOUND、NO_MATCHED_ MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态,需要根据每个状态做不同的处理。比较重要的两个状态是FOUND和NO_NEW_MSG,分别表示获取到消息和没有新的消息。
因为PullConsumer需要用户自己处理遍历MessageQueue、保存Offset,所以PullConsumer有更多的自主性和灵活性。
Consumer的启动、关闭流程
消息队列一般是提供一个不间断的持续性服务,Consumer在使用过程中,如何才能优雅地启动和关闭,确保不漏掉或者重复消费消息呢?
Consumer分为Push和Pull两种方式,对于PullConsumer来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存,要在程序的异常处理部分增加把Offset写人磁盘方面的处理,记准了每个MessageQueue的Offset,才能保证消息消费的准确性。
DefaultMQPushConsumer的退出,要调用shutdown()函数,以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。
PushConsumer在启动的时候,会做各种配置检查,然后连接NameServer获取Topic信息,启动时如果遇到异常,比如无法连接NameServer,程序仍然可以正常启动不报错(日志里有WARN信息)。在单机环境下可以测试这种情况,启动DefaultMQPushConsumer时故意把NameServer地址填错,程序仍然可以正常启动,但是不会收到消息。
为什么DefaultMQPushConsumer在无法连接NameServer时不直接报错退出呢?这和分布式系统的设计有关,RocketMQ集群可以有多个NameServer、Broker,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。可以进行这样一个测试,在DefaultMQPushConsumer正常运行的时候,手动kill掉Broker或NameServer,过一会儿再启动。会发现DefaultMQPushConsumer不会出错退出,在服务恢复后正常运行,在服务不可用的这段时间,仅仅会在日志里报异常信息。
如果需要在DefaultMQPushConsumer启动的时候,及时暴露配置问题,该如何操作呢?可以调用: fetchSubscribeMessageQueues("TopicName")
,这时如果配置信息写得不准确,或者当前服务不可用,这个语句会报异常。
消息重试
普通消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
// use concurrent consumer model, when Subscribe function return consumer.ConsumeRetryLater, the message will be
// send to RocketMQ retry topic. we could set DelayLevelWhenNextConsume in ConsumeConcurrentlyContext, which used to
// indicate the delay of message re-send to origin topic from retry topic.
//
// in this example, we always set DelayLevelWhenNextConsume=1, means that the message will be sent to origin topic after
// 1s. in case of the unlimited retry, we will return consumer.ConsumeSuccess after ReconsumeTimes > 5
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithConsumerModel(consumer.Clustering),
)
// The DelayLevel specify the waiting time that before next reconsume,
// and it range is from 1 to 18 now.
//
// The time of each level is the value of indexing of {level-1} in [1s, 5s, 10s, 30s,
// 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h]
delayLevel := 1
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback len: %d \n", len(msgs))
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
for _, msg := range msgs {
if msg.ReconsumeTimes > 5 {
fmt.Printf("msg ReconsumeTimes > 5. msg: %v", msg)
return consumer.ConsumeSuccess, nil
} else {
fmt.Printf("subscribe callback: %v \n", msg)
}
}
return consumer.ConsumeRetryLater, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shundown Consumer error: %s", err.Error())
}
}
|
顺序消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package main
import (
"context"
"fmt"
"os"
"time"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithGroupName("testGroup"),
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithConsumerOrder(true),
consumer.WithMaxReconsumeTimes(5),
)
err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
orderlyCtx, _ := primitive.GetOrderlyCtx(ctx)
fmt.Printf("orderly context: %v\n", orderlyCtx)
fmt.Printf("subscribe orderly callback len: %d \n", len(msgs))
for _, msg := range msgs {
if msg.ReconsumeTimes > 5 {
fmt.Printf("msg ReconsumeTimes > 5. msg: %v", msg)
} else {
fmt.Printf("subscribe orderly callback: %v \n", msg)
}
}
return consumer.SuspendCurrentQueueAMoment, nil
})
if err != nil {
fmt.Println(err.Error())
}
// Note: start after subscribe
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
err = c.Shutdown()
if err != nil {
fmt.Printf("shundown Consumer error: %s", err.Error())
}
}
|
提高处理能力
当Consumer的处理速度跟不上消息的产生速度,会造成越来越多的消息积压,这个时候首先查看消费逻辑本身有没有优化空间,除此之外还有三种方法可以提高Consumer的处理能力。
提高消费并行度
在同一个ConsumerGroup下( Clustering方式),可以通过增加Consumer实例的数量来提高并行度,通过加机器,或者在已有机器中启动多个Consumer进程都可以增加Consumer实例数。注意总的Consumer数量不要超过Topic下Read Queue数量,超过的Consumer实例接收不到消息。此外,通过提高单个Consumer实例中的并行处理的线程数,可以在同一个Consumer内增加并行度来提高吞吐量(设置方法是修改consumeThreadMin和consume ThreadMax)。(
以批量方式进行消费
某些业务场景下,多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及update某个数据库,一次update10条的时间会大大小于十次update1条数据的时间。这时可以通过批量方式消费来提高消费的吞吐量。实现方法是设置Consumer的consumeMessageBatchMaxSize这个参数,默认是1,如果设置为N,在消息多的时候每次收到的是个长度为N的消息链表。
检测延时情况,跳过非重要消息
Consumer在消费的过程中,如果发现由于某种原因发生严重的消息堆积,短时间无法消除堆积,这个时候可以选择丢弃不重要的消息,使Consumer尽快追上Producer的进度.