整体流程
NewPushConsumer
创建pushConsumer对象.配置负载均衡策略.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
defaultOpts := defaultPushConsumerOptions()
for _, apply := range opts {
apply(&defaultOpts)
}
srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
if err != nil {
return nil, errors.Wrap(err, "new Namesrv failed.")
}
if !defaultOpts.Credentials.IsEmpty() {
srvs.SetCredentials(defaultOpts.Credentials)
}
defaultOpts.Namesrv = srvs
if defaultOpts.Namespace != "" {
defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
}
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
cType: _PushConsume,
state: int32(internal.StateCreateJust),
prCh: make(chan PullRequest, 4),
model: defaultOpts.ConsumerModel,
consumeOrderly: defaultOpts.ConsumeOrderly,
fromWhere: defaultOpts.FromWhere,
allocate: defaultOpts.Strategy,
option: defaultOpts,
namesrv: srvs,
}
p := &pushConsumer{
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
}
dc.mqChanged = p.messageQueueChanged
if p.consumeOrderly {
p.submitToConsume = p.consumeMessageOrderly
} else {
p.submitToConsume = p.consumeMessageCurrently
}
p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)
return p, nil
}
|
Start
Start方法整体代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
func (pc *pushConsumer) Start() error {
var err error
pc.once.Do(func() {
rlog.Info("the consumer start beginning", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messageModel": pc.model,
"unitMode": pc.unitMode,
})
atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
pc.validate()
err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
if err != nil {
rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
err = ErrCreated
return
}
err = pc.defaultConsumer.start()
if err != nil {
return
}
go func() {
// todo start clean msg expired
for {
select {
case pr := <-pc.prCh:
go func() {
pc.pullMessage(&pr)
}()
case <-pc.done:
rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
}()
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {
return
}
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
defer lockTicker.Stop()
for {
select {
case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
}
})
})
if err != nil {
return err
}
pc.client.UpdateTopicRouteInfo()
for k := range pc.subscribedTopic {
_, exist := pc.topicSubscribeInfoTable.Load(k)
if !exist {
pc.client.Shutdown()
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
pc.client.RebalanceImmediately()
return err
}
|
defaultConsumer.start
根据消费消息方式的不同,OffsetStore 的类型也不同。如果是BROADCASTING模式,使用的是LocalFileOffsetStore,Offset 存到本地;如果是CLUSTERING模式,使用的是RemoteBrokerOffsetStore,Offset 存到Broker机器上。
然后调用start方法,开始获取数据.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (dc *defaultConsumer) start() error {
if len(dc.option.NameServerAddrs) == 0 {
dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
}
if dc.model == Clustering {
// set retry topic
retryTopic := internal.GetRetryTopic(dc.consumerGroup)
sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
dc.subscriptionDataTable.Store(retryTopic, sub)
}
if dc.model == Clustering {
dc.option.ChangeInstanceNameToPID()
dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
} else {
dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
}
dc.client.Start()
atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
return nil
}
|
pullMessage
获取消息的逻辑实现在pullMessage函数中,这是一个很大的函数,前半部分是进行一些判断,是进行流量控制的逻辑;中间是对返回消息结果做处理的逻辑;后面是发送获取消息请求的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdForQueue": pc.option.PullThresholdForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
if pc.queueFlowControlTimes%1000 == 0 {
rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
"minOffset": pq.Min(),
"maxOffset": pq.Max(),
"count": pq.msgCache,
"size(MiB)": cachedMessageSizeInMiB,
"flowControlTimes": pc.queueFlowControlTimes,
rlog.LogKeyPullRequest: request.String(),
})
}
pc.queueFlowControlTimes++
sleepTime = _PullDelayTimeWhenFlowControl
goto NEXT
}
|
通过判断未处理消息的个数和总大小来控制是否继续请求消息。对于顺序消息还有一些特殊判断逻辑。获取的消息返回后,根据返回状态,调用相应的处理方法.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
switch result.Status {
case primitive.PullFound:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
prevRequestOffset := request.nextOffset
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
pc.processPullResult(request.mq, result, sd)
msgFounded := result.GetMessageExts()
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
"nextBeginOffset": result.NextBeginOffset,
"firstMsgOffset": firstMsgOffset,
"prevRequestOffset": prevRequestOffset,
})
}
case primitive.PullNoNewMsg:
rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
case primitive.PullNoMsgMatched:
request.nextOffset = result.NextBeginOffset
pc.correctTagsOffset(request)
case primitive.PullOffsetIllegal:
rlog.Warning("the pull request offset illegal", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
"result": result.String(),
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.processQueueTable.Delete(request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
}
|
最后是发送获取消息请求,这三个阶段不停地循环执行,直到程序被停止.
并发处理流程
处理效率的高低是反应 Consumer实现好坏的重要指标
从 Broker获取到一批消息以后,根据 BatchSize的设置,把一批消息封装到一个PullRequest中,然后把这个PullRequest提交到prCh中执行.
1
2
3
4
5
6
7
|
type PullRequest struct {
consumerGroup string
mq *primitive.MessageQueue
pq *processQueue
nextOffset int64
lockedFirst bool
}
|
下面让我们一一来介绍一下PullRequest的核心属性:
- String consumerGroup: 消费者组。
- MessageQueue messageQueue:待拉取消费队列。
- ProcessQueue processQueue:消息处理队列,从Broker 拉取到的消息先存人ProccessQueue,然后再提交到消费者消费线程池消费。
- long nextOffset:待拉取的MessageQueue偏移量。
- Boolean lockedFirst: 是否被锁定。
消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS和RECONSUME_LATER。如果消费不成功,要把消息5秒后再执行;如果消费模式是 CLUSTERING模式,未消费成功的消息会先被发送回 Broker,供这个 Consumer Group里的其他 Consumer消费,如果发送回 Broker失败,再调用 RECONSUME LATER.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
if result == ConsumeSuccess {
increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(msgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
for i := 0; i < len(msgs); i++ {
msg := msgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
|
ProcessQueue对象
在前面的源码中,有个 Process Queue类型的对象,这个对象的功能是什么呢?从Broker获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态,比如如何获得当前消息堆积的数量,如何解决处理超时情况等。
RocketMQ定义了一个快照类 ProcessQueue来解决这些问题,在 PushConsumer运行的时候,每个 Message Queue都会有一个对应的 Process Queue对象,保存了这个 Message Queue消息处理状态的快照.
Process Queue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的oset作为Key,以消息内容的引用为 Value,保存了所有从 MessageQueue获取到但是还未被处理的消息,读写锁控制着多个线程对TreeMap对象的并发访问。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
type processQueue struct {
cachedMsgCount int64
cachedMsgSize int64
tryUnlockTimes int64
queueOffsetMax int64
msgAccCnt int64
msgCache *treemap.Map
mutex sync.RWMutex
consumeLock sync.Mutex
consumingMsgOrderlyTreeMap *treemap.Map
dropped *uatomic.Bool
lastPullTime time.Time
lastConsumeTime atomic.Value
locked *uatomic.Bool
lastLockTime atomic.Value
consuming bool
lockConsume sync.Mutex
msgCh chan []*primitive.MessageExt
order bool
}
|
有了 Process Queue对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息。顺序消息是通过 ConsumeMessageOrderly实现的,主要流程和 ConsumeMessage Concurrently类似,区别只是在对并发消费的控制上.
并发消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
} else {
next := count + pc.option.ConsumeMessageBatchMaxSize
subMsgs = msgs[count:next]
count = next - 1
}
go primitive.WithRecover(func() {
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyConsumerGroup: pc.consumerGroup,
})
return
}
beginTime := time.Now()
pc.resetRetryAndNamespace(subMsgs)
var result ConsumeResult
var err error
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
concurrentCtx := primitive.NewConsumeConcurrentlyContext()
concurrentCtx.MQ = *mq
ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)
result, err = pc.consumeInner(ctx, subMsgs)
consumeRT := time.Now().Sub(beginTime)
if err != nil {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
} else if consumeRT >= pc.option.ConsumeTimeout {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
} else if result == ConsumeSuccess {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
} else if result == ConsumeRetryLater {
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(msgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
for i := 0; i < len(msgs); i++ {
msg := msgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
}
}
}
offset := pq.removeMessage(subMsgs...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)
}
if len(msgBackFailed) > 0 {
subMsgs = msgBackFailed
time.Sleep(5 * time.Second)
goto RETRY
}
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
"message": msgs,
})
}
})
}
}
|
顺序消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
return
}
lock := pc.queueLock.fetchLock(*mq)
lock.Lock()
defer lock.Unlock()
if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
beginTime := time.Now()
continueConsume := true
for continueConsume {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
break
}
if pc.model == Clustering {
if !pq.IsLock() {
rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
if pq.isLockExpired() {
rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
pc.tryLockLaterAndReconsume(mq, 10)
return
}
}
interval := time.Now().Sub(beginTime)
if interval > pc.option.MaxTimeConsumeContinuously {
time.Sleep(10 * time.Millisecond)
return
}
batchSize := pc.option.ConsumeMessageBatchMaxSize
msgs := pq.takeMessages(batchSize)
pc.resetRetryAndNamespace(msgs)
if len(msgs) == 0 {
continueConsume = false
break
}
// TODO: add message consumer hook
beginTime = time.Now()
ctx := context.Background()
msgCtx := &primitive.ConsumeMessageContext{
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
Msgs: msgs,
}
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
orderlyCtx := primitive.NewConsumeOrderlyContext()
orderlyCtx.MQ = *mq
ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)
pq.lockConsume.Lock()
result, _ := pc.consumeInner(ctx, msgs)
pq.lockConsume.Unlock()
if result == Rollback || result == SuspendCurrentQueueAMoment {
rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{
rlog.LogKeyConsumerGroup: pc.consumerGroup,
"messages": msgs,
rlog.LogKeyMessageQueue: mq,
})
}
// jsut put consumeResult in consumerMessageCtx
//interval = time.Now().Sub(beginTime)
//consumeReult := SuccessReturn
//if interval > pc.option.ConsumeTimeout {
// consumeReult = TimeoutReturn
//} else if SuspendCurrentQueueAMoment == result {
// consumeReult = FailedReturn
//} else if ConsumeSuccess == result {
// consumeReult = SuccessReturn
//}
// process result
commitOffset := int64(-1)
if pc.option.AutoCommit {
switch result {
case Commit, Rollback:
rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
})
case ConsumeSuccess:
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
pq.putMessage(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
} else {
commitOffset = pq.commit()
}
default:
}
} else {
switch result {
case ConsumeSuccess:
case Commit:
commitOffset = pq.commit()
case Rollback:
// pq.rollback
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
}
default:
}
}
if commitOffset > 0 && !pq.IsDroppd() {
_ = pc.updateOffset(mq, commitOffset)
}
}
} else {
if pq.IsDroppd() {
rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq.String(),
})
}
pc.tryLockLaterAndReconsume(mq, 100)
}
}
|