rocketmq的消费负载均衡
文章目录
负载均衡
要做负载均衡,必须知道一些全局信息,也就是一个ConsumerGroup里到底有多少个Consumer,知道了全局信息,才可以根据某种算法来分配,比如简单地平均分到各个Consumer。在RocketMQ中,负载均衡或者消息分配是在Consumer端代码中完成的,Consumer从Broker处获得全局信息,然后自己做负载均衡,只处理分给自己的那部分消息。具体流程如下:
- 从主题订阅信息缓存表中获取主题的队列信息;发送请求从Broker中该消费组内当前所有的消费者客户端ID,主题topic的队列可能分布在多个Broker.上,那请求发往哪个Broker呢?RocketeMQ从主题的路由信息表中随机选择一个Broker。Broker为什么会存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientInstance中注册消费者,然后MQClientInstance会向所有的Broker发送心跳包,心跳包中包含MQClientInstance的消费者信息。如果mqSet、cidAll任意一个为空则忽略本次消息队列负载。
- 首先对cidAll,mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,确保同一个消费队列不会被多个消费者分配。
- ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable,当前消费者负载的消息队列缓存表,如果缓存表中的MessageQueue不包含在mqSet中,说明经过本次消息队列负载后,该mq被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将ProccessQueue的状态设置为droped=true,该ProcessQueue中的消息将不会再被消费,调用removeUnnecessaryMessageQueue方法判断是否将MessageQueue、ProccessQueue 缓存表中移除。removeUnnecessaryMessageQueue在RebalanceImple定义为抽象方法。 removeUnnecessaryMessageQueue方法主要持久化待移除MessageQueue消息消费进度。在Push模式下,如果是集群模式并且是顺序消息消费时,还需要先解锁队列.
- 遍历本次负载分配到的队列集合,如果processQueueTable中没有包含该消息队列,表明这是本次新增加的消息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ 提供CONSUME_FROM LAST_OFFSET、CONSUME_FROM_ FIRST_OFFSET、CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用setConsumeFromWhere方法设置。PullRequest的nextOffset 计算逻辑位于:computePullFromWhere。
- CONSUME_FROM_ LAST_OFFSET :从队列最新偏移量开始消费。
- CONSUME_FROM_FIRST_OFFSET:从头开始消费。
- CONSUME_FROM_TIMESTAMP:从消费者启动的时间戳对应的消费进度开始消费。
- 将PullRequest 加入到PullMessageService中,以便唤醒PullMessageService线程。
ConsumeFromWhere相关消费进度校正策略只有在从磁盘中获取消费进度返回一1时才会生效,如果从消息进度存储文件中返回的消费进度小于一1,表示偏移量非法,则使用偏移量一1去拉取消息,那么会发生什么呢?首先第一次去消息服务器拉取消息时无法取到消息,但是会用一1去更新消费进度,然后将消息消费队列丢弃,在下一次消息队列负载时会再次消费。
RocketMQ默认提供5种分配算法:
- AllocateMessageQueueAveragely:平均分配,推荐指数为5颗星。 举例来说,如果现在有8个消息消费队列q1,q2,q3,q4,q5,q6,q7,q8,有3个消费者c1,c2,c3,那么根据该负载算法,消息队列分配如下: cl: q1,q2,q3 c2:q4,q5,q6 c3:q7,q8
- AllocateMessageQueueAveragelyByCircle:平均轮询分配,推荐指数为5颗星。 举例来说,如果现在有8个消息消费队列q1,q2,q3,q4,q5,q6,q7,q8,有3个消费者c1,c2,c3,那么根据该负载算法,消息队列分配如下: c1: q1,q4,q7 c2: q2,q5,q8 c3: q3,q6
- AllocateMessageQueueConsistentHash: 一致性hash。不推荐使用,因为消息队列负载信息不容易跟踪。
- AllocateMessageQueueByConfig:根据配置,为每一个消费者配置固定的消息队列。
- AllocateMessageQueueByMachineRoom:根据Broker部署机房名,对每个消费者负责不同的Broker上的队列。
消息负载算法如果没有特殊的要求,尽量使用AllocateMessageQueueAveragely、AllocateMessageQueueAveragelyByCircle,因为分配算法比较直观。消息队列分配遵循一个消费者可以分配多个消息队列,但同一个消息队列只会分配给一个消费者,故如果消费者个数大于消息队列数量,则有些消费者无法消费消息。
问题1:PullRequest对象在什么时候创建并加人到pullRequestQueue中以便唤醒PullMessageService线程。
答:RebalanceService线程每隔20s对消费者订阅的主题进行一次队列重新分配,每一次分配都会获取主题的所有队列、从Broker服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的PullRequest对象。在一个进程中,同一个消费组同一个队列只会存在一个PullRequest对象。
问题2:集群内多个消费者是如何负载主题下的多个消费队列,并且如果有新的消费者加入时,消息队列又会如何重新分布。
答:由于每次进行队列重新负载时会从Broker实时查询出当前消费组内所有消费者,并且对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消费队列从而消费消息。

DefaultMQPushConsumer
DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个DefaultMQPushConsumer启动后,会马上会触发一个doRebalance动作;而且在同一个ConsumerGroup里加人新的DefaultMQPushConsumer时,各个Consumer都会被触发doRebalance动作。
具体的负载均衡算法有五种,默认用的是第一种AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,以及ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到MessageQueue,把Topic下的所有MessageQueue分配到不同的Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系影响负载均衡结果。
以AllocateMessageQueueAveragely策略为例,如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消 息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。
|
|
DefaultMQPullConsumer
PullConsumer可以看到所有的MessageQueue,而且从哪个MessageQueue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
文章作者 Forz
上次更新 2020-04-24