整体流程

NewPushConsumer

创建pushConsumer对象.配置负载均衡策略.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func NewPushConsumer(opts ...Option) (*pushConsumer, error) {
	defaultOpts := defaultPushConsumerOptions()
	for _, apply := range opts {
		apply(&defaultOpts)
	}
	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs)
	if err != nil {
		return nil, errors.Wrap(err, "new Namesrv failed.")
	}
	if !defaultOpts.Credentials.IsEmpty() {
		srvs.SetCredentials(defaultOpts.Credentials)
	}
	defaultOpts.Namesrv = srvs

	if defaultOpts.Namespace != "" {
		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
	}

	dc := &defaultConsumer{
		client:         internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
		consumerGroup:  defaultOpts.GroupName,
		cType:          _PushConsume,
		state:          int32(internal.StateCreateJust),
		prCh:           make(chan PullRequest, 4),
		model:          defaultOpts.ConsumerModel,
		consumeOrderly: defaultOpts.ConsumeOrderly,
		fromWhere:      defaultOpts.FromWhere,
		allocate:       defaultOpts.Strategy,
		option:         defaultOpts,
		namesrv:        srvs,
	}

	p := &pushConsumer{
		defaultConsumer: dc,
		subscribedTopic: make(map[string]string, 0),
		queueLock:       newQueueLock(),
		done:            make(chan struct{}, 1),
		consumeFunc:     utils.NewSet(),
	}
	dc.mqChanged = p.messageQueueChanged
	if p.consumeOrderly {
		p.submitToConsume = p.consumeMessageOrderly
	} else {
		p.submitToConsume = p.consumeMessageCurrently
	}

	p.interceptor = primitive.ChainInterceptors(p.option.Interceptors...)

	return p, nil
}

Start

Start方法整体代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
func (pc *pushConsumer) Start() error {
	var err error
	pc.once.Do(func() {
		rlog.Info("the consumer start beginning", map[string]interface{}{
			rlog.LogKeyConsumerGroup: pc.consumerGroup,
			"messageModel":           pc.model,
			"unitMode":               pc.unitMode,
		})
		atomic.StoreInt32(&pc.state, int32(internal.StateStartFailed))
		pc.validate()

		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
		if err != nil {
			rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
				rlog.LogKeyConsumerGroup: pc.consumerGroup,
			})
			err = ErrCreated
			return
		}

		err = pc.defaultConsumer.start()
		if err != nil {
			return
		}

		go func() {
			// todo start clean msg expired
			for {
				select {
				case pr := <-pc.prCh:
					go func() {
						pc.pullMessage(&pr)
					}()
				case <-pc.done:
					rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		}()

		go primitive.WithRecover(func() {
			// initial lock.
			if !pc.consumeOrderly {
				return
			}

			time.Sleep(1000 * time.Millisecond)
			pc.lockAll()

			lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
			defer lockTicker.Stop()
			for {
				select {
				case <-lockTicker.C:
					pc.lockAll()
				case <-pc.done:
					rlog.Info("push consumer close tick.", map[string]interface{}{
						rlog.LogKeyConsumerGroup: pc.consumerGroup,
					})
					return
				}
			}
		})
	})

	if err != nil {
		return err
	}

	pc.client.UpdateTopicRouteInfo()
	for k := range pc.subscribedTopic {
		_, exist := pc.topicSubscribeInfoTable.Load(k)
		if !exist {
			pc.client.Shutdown()
			return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
		}
	}
	pc.client.CheckClientInBroker()
	pc.client.SendHeartbeatToAllBrokerWithLock()
	pc.client.RebalanceImmediately()

	return err
}

defaultConsumer.start

根据消费消息方式的不同,OffsetStore 的类型也不同。如果是BROADCASTING模式,使用的是LocalFileOffsetStore,Offset 存到本地;如果是CLUSTERING模式,使用的是RemoteBrokerOffsetStore,Offset 存到Broker机器上。

然后调用start方法,开始获取数据.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (dc *defaultConsumer) start() error {
	if len(dc.option.NameServerAddrs) == 0 {
		dc.namesrv.UpdateNameServerAddress(dc.option.NameServerDomain, dc.option.InstanceName)
	}
	if dc.model == Clustering {
		// set retry topic
		retryTopic := internal.GetRetryTopic(dc.consumerGroup)
		sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
		dc.subscriptionDataTable.Store(retryTopic, sub)
	}

	if dc.model == Clustering {
		dc.option.ChangeInstanceNameToPID()
		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client, dc.namesrv)
	} else {
		dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
	}

	dc.client.Start()
	atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
	dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
	return nil
}

pullMessage

获取消息的逻辑实现在pullMessage函数中,这是一个很大的函数,前半部分是进行一些判断,是进行流量控制的逻辑;中间是对返回消息结果做处理的逻辑;后面是发送获取消息请求的逻辑。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
		cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
		if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
			if pc.queueFlowControlTimes%1000 == 0 {
				rlog.Warning("the cached message count exceeds the threshold, so do flow control", map[string]interface{}{
					"PullThresholdForQueue": pc.option.PullThresholdForQueue,
					"minOffset":             pq.Min(),
					"maxOffset":             pq.Max(),
					"count":                 pq.msgCache,
					"size(MiB)":             cachedMessageSizeInMiB,
					"flowControlTimes":      pc.queueFlowControlTimes,
					rlog.LogKeyPullRequest:  request.String(),
				})
			}
			pc.queueFlowControlTimes++
			sleepTime = _PullDelayTimeWhenFlowControl
			goto NEXT
		}

		if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
			if pc.queueFlowControlTimes%1000 == 0 {
				rlog.Warning("the cached message size exceeds the threshold, so do flow control", map[string]interface{}{
					"PullThresholdSizeForQueue": pc.option.PullThresholdSizeForQueue,
					"minOffset":                 pq.Min(),
					"maxOffset":                 pq.Max(),
					"count":                     pq.msgCache,
					"size(MiB)":                 cachedMessageSizeInMiB,
					"flowControlTimes":          pc.queueFlowControlTimes,
					rlog.LogKeyPullRequest:      request.String(),
				})
			}
			pc.queueFlowControlTimes++
			sleepTime = _PullDelayTimeWhenFlowControl
			goto NEXT
		}

通过判断未处理消息的个数和总大小来控制是否继续请求消息。对于顺序消息还有一些特殊判断逻辑。获取的消息返回后,根据返回状态,调用相应的处理方法.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
		switch result.Status {
		case primitive.PullFound:
			rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d found messages.", request.mq.Topic, request.mq.QueueId), nil)
			prevRequestOffset := request.nextOffset
			request.nextOffset = result.NextBeginOffset

			rt := time.Now().Sub(beginTime) / time.Millisecond
			increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))

			pc.processPullResult(request.mq, result, sd)

			msgFounded := result.GetMessageExts()
			firstMsgOffset := int64(math.MaxInt64)
			if msgFounded != nil && len(msgFounded) != 0 {
				firstMsgOffset = msgFounded[0].QueueOffset
				increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
				pq.putMessage(msgFounded...)
			}
			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
				rlog.Warning("[BUG] pull message result maybe data wrong", map[string]interface{}{
					"nextBeginOffset":   result.NextBeginOffset,
					"firstMsgOffset":    firstMsgOffset,
					"prevRequestOffset": prevRequestOffset,
				})
			}
		case primitive.PullNoNewMsg:
			rlog.Debug(fmt.Sprintf("Topic: %s, QueueId: %d no more msg, current offset: %d, next offset: %d",
				request.mq.Topic, request.mq.QueueId, pullRequest.QueueOffset, result.NextBeginOffset), nil)
		case primitive.PullNoMsgMatched:
			request.nextOffset = result.NextBeginOffset
			pc.correctTagsOffset(request)
		case primitive.PullOffsetIllegal:
			rlog.Warning("the pull request offset illegal", map[string]interface{}{
				rlog.LogKeyPullRequest: request.String(),
				"result":               result.String(),
			})
			request.nextOffset = result.NextBeginOffset
			pq.WithDropped(true)
			time.Sleep(10 * time.Second)
			pc.storage.update(request.mq, request.nextOffset, false)
			pc.storage.persist([]*primitive.MessageQueue{request.mq})
			pc.processQueueTable.Delete(request.mq)
			rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
		default:
			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
			sleepTime = _PullDelayTimeWhenError
		}

最后是发送获取消息请求,这三个阶段不停地循环执行,直到程序被停止.

并发处理流程

处理效率的高低是反应 Consumer实现好坏的重要指标

从 Broker获取到一批消息以后,根据 BatchSize的设置,把一批消息封装到一个PullRequest中,然后把这个PullRequest提交到prCh中执行.

1
2
3
4
5
6
7
type PullRequest struct {
	consumerGroup string
	mq            *primitive.MessageQueue
	pq            *processQueue
	nextOffset    int64
	lockedFirst   bool
}

下面让我们一一来介绍一下PullRequest的核心属性:

  • String consumerGroup: 消费者组。
  • MessageQueue messageQueue:待拉取消费队列。
  • ProcessQueue processQueue:消息处理队列,从Broker 拉取到的消息先存人ProccessQueue,然后再提交到消费者消费线程池消费。
  • long nextOffset:待拉取的MessageQueue偏移量。
  • Boolean lockedFirst: 是否被锁定。

消息的处理结果可能有不同的值,主要的两个是 CONSUME_SUCCESS和RECONSUME_LATER。如果消费不成功,要把消息5秒后再执行;如果消费模式是 CLUSTERING模式,未消费成功的消息会先被发送回 Broker,供这个 Consumer Group里的其他 Consumer消费,如果发送回 Broker失败,再调用 RECONSUME LATER.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
				if result == ConsumeSuccess {
					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
				} else {
					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
					if pc.model == BroadCasting {
						for i := 0; i < len(msgs); i++ {
							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
								"message": subMsgs[i],
							})
						}
					} else {
						for i := 0; i < len(msgs); i++ {
							msg := msgs[i]
							if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
								msg.ReconsumeTimes += 1
								msgBackFailed = append(msgBackFailed, msg)
							}
						}
					}
				}

ProcessQueue对象

在前面的源码中,有个 Process Queue类型的对象,这个对象的功能是什么呢?从Broker获得的消息,因为是提交到线程池里并行执行,很难监控和控制执行状态,比如如何获得当前消息堆积的数量,如何解决处理超时情况等。

RocketMQ定义了一个快照类 ProcessQueue来解决这些问题,在 PushConsumer运行的时候,每个 Message Queue都会有一个对应的 Process Queue对象,保存了这个 Message Queue消息处理状态的快照.

Process Queue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的oset作为Key,以消息内容的引用为 Value,保存了所有从 MessageQueue获取到但是还未被处理的消息,读写锁控制着多个线程对TreeMap对象的并发访问。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type processQueue struct {
	cachedMsgCount             int64
	cachedMsgSize              int64
	tryUnlockTimes             int64
	queueOffsetMax             int64
	msgAccCnt                  int64
	msgCache                   *treemap.Map
	mutex                      sync.RWMutex
	consumeLock                sync.Mutex
	consumingMsgOrderlyTreeMap *treemap.Map
	dropped                    *uatomic.Bool
	lastPullTime               time.Time
	lastConsumeTime            atomic.Value
	locked                     *uatomic.Bool
	lastLockTime               atomic.Value
	consuming                  bool
	lockConsume                sync.Mutex
	msgCh                      chan []*primitive.MessageExt
	order                      bool
}

有了 Process Queue对象,可以随时停止、启动消息的消费,同时也可用于帮助实现顺序消费消息。顺序消息是通过 ConsumeMessageOrderly实现的,主要流程和 ConsumeMessage Concurrently类似,区别只是在对并发消费的控制上.

并发消费

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101

func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
	msgs := pq.getMessages()
	if msgs == nil {
		return
	}
	for count := 0; count < len(msgs); count++ {
		var subMsgs []*primitive.MessageExt
		if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
			subMsgs = msgs[count:]
			count = len(msgs)
		} else {
			next := count + pc.option.ConsumeMessageBatchMaxSize
			subMsgs = msgs[count:next]
			count = next - 1
		}
		go primitive.WithRecover(func() {
		RETRY:
			if pq.IsDroppd() {
				rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
					rlog.LogKeyMessageQueue:  mq.String(),
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
				})
				return
			}

			beginTime := time.Now()
			pc.resetRetryAndNamespace(subMsgs)
			var result ConsumeResult

			var err error
			msgCtx := &primitive.ConsumeMessageContext{
				Properties:    make(map[string]string),
				ConsumerGroup: pc.consumerGroup,
				MQ:            mq,
				Msgs:          msgs,
			}
			ctx := context.Background()
			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
			concurrentCtx := primitive.NewConsumeConcurrentlyContext()
			concurrentCtx.MQ = *mq
			ctx = primitive.WithConcurrentlyCtx(ctx, concurrentCtx)

			result, err = pc.consumeInner(ctx, subMsgs)

			consumeRT := time.Now().Sub(beginTime)
			if err != nil {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionReturn)
			} else if consumeRT >= pc.option.ConsumeTimeout {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
			} else if result == ConsumeSuccess {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
			} else if result == ConsumeRetryLater {
				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
			}

			increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))

			if !pq.IsDroppd() {
				msgBackFailed := make([]*primitive.MessageExt, 0)
				if result == ConsumeSuccess {
					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
				} else {
					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
					if pc.model == BroadCasting {
						for i := 0; i < len(msgs); i++ {
							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
								"message": subMsgs[i],
							})
						}
					} else {
						for i := 0; i < len(msgs); i++ {
							msg := msgs[i]
							if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
								msg.ReconsumeTimes += 1
								msgBackFailed = append(msgBackFailed, msg)
							}
						}
					}
				}

				offset := pq.removeMessage(subMsgs...)

				if offset >= 0 && !pq.IsDroppd() {
					pc.storage.update(mq, int64(offset), true)
				}
				if len(msgBackFailed) > 0 {
					subMsgs = msgBackFailed
					time.Sleep(5 * time.Second)
					goto RETRY
				}
			} else {
				rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
					rlog.LogKeyMessageQueue: mq,
					"message":               msgs,
				})
			}
		})
	}
}

顺序消费

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
	if pq.IsDroppd() {
		rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
			rlog.LogKeyMessageQueue: mq.String(),
		})
		return
	}

	lock := pc.queueLock.fetchLock(*mq)
	lock.Lock()
	defer lock.Unlock()
	if pc.model == BroadCasting || (pq.IsLock() && !pq.isLockExpired()) {
		beginTime := time.Now()

		continueConsume := true
		for continueConsume {
			if pq.IsDroppd() {
				rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
					rlog.LogKeyMessageQueue: mq.String(),
				})
				break
			}
			if pc.model == Clustering {
				if !pq.IsLock() {
					rlog.Warning("the message queue not locked, so consume later", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq.String(),
					})
					pc.tryLockLaterAndReconsume(mq, 10)
					return
				}
				if pq.isLockExpired() {
					rlog.Warning("the message queue lock expired, so consume later", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq.String(),
					})
					pc.tryLockLaterAndReconsume(mq, 10)
					return
				}
			}
			interval := time.Now().Sub(beginTime)
			if interval > pc.option.MaxTimeConsumeContinuously {
				time.Sleep(10 * time.Millisecond)
				return
			}
			batchSize := pc.option.ConsumeMessageBatchMaxSize
			msgs := pq.takeMessages(batchSize)

			pc.resetRetryAndNamespace(msgs)

			if len(msgs) == 0 {
				continueConsume = false
				break
			}

			// TODO: add message consumer hook
			beginTime = time.Now()

			ctx := context.Background()
			msgCtx := &primitive.ConsumeMessageContext{
				Properties:    make(map[string]string),
				ConsumerGroup: pc.consumerGroup,
				MQ:            mq,
				Msgs:          msgs,
			}
			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)

			orderlyCtx := primitive.NewConsumeOrderlyContext()
			orderlyCtx.MQ = *mq
			ctx = primitive.WithOrderlyCtx(ctx, orderlyCtx)

			pq.lockConsume.Lock()
			result, _ := pc.consumeInner(ctx, msgs)
			pq.lockConsume.Unlock()

			if result == Rollback || result == SuspendCurrentQueueAMoment {
				rlog.Warning("consumeMessage Orderly return not OK", map[string]interface{}{
					rlog.LogKeyConsumerGroup: pc.consumerGroup,
					"messages":               msgs,
					rlog.LogKeyMessageQueue:  mq,
				})
			}

			// jsut put consumeResult in consumerMessageCtx
			//interval = time.Now().Sub(beginTime)
			//consumeReult := SuccessReturn
			//if interval > pc.option.ConsumeTimeout {
			//	consumeReult = TimeoutReturn
			//} else if SuspendCurrentQueueAMoment == result {
			//	consumeReult = FailedReturn
			//} else if ConsumeSuccess == result {
			//	consumeReult = SuccessReturn
			//}

			// process result
			commitOffset := int64(-1)
			if pc.option.AutoCommit {
				switch result {
				case Commit, Rollback:
					rlog.Warning("the message queue consume result is illegal, we think you want to ack these message: %v", map[string]interface{}{
						rlog.LogKeyMessageQueue: mq,
					})
				case ConsumeSuccess:
					commitOffset = pq.commit()
				case SuspendCurrentQueueAMoment:
					if pc.checkReconsumeTimes(msgs) {
						pq.putMessage(msgs...)
						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
						continueConsume = false
					} else {
						commitOffset = pq.commit()
					}
				default:
				}
			} else {
				switch result {
				case ConsumeSuccess:
				case Commit:
					commitOffset = pq.commit()
				case Rollback:
					// pq.rollback
					time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
					continueConsume = false
				case SuspendCurrentQueueAMoment:
					if pc.checkReconsumeTimes(msgs) {
						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
						continueConsume = false
					}
				default:
				}
			}
			if commitOffset > 0 && !pq.IsDroppd() {
				_ = pc.updateOffset(mq, commitOffset)
			}
		}
	} else {
		if pq.IsDroppd() {
			rlog.Warning("the message queue not be able to consume, because it's dropped.", map[string]interface{}{
				rlog.LogKeyMessageQueue: mq.String(),
			})
		}
		pc.tryLockLaterAndReconsume(mq, 100)
	}
}