rocketmq的定时消息
文章目录
定时消息
定时消息是指消息发送到Broker后,并不立即被消费者消费而是要等到特定的时间后才能被消费,RocketMQ并不支持任意的时间精度,如果要支持任意时间精度的定时调度,不可避免地需要在Broker层做消息排序,再加上持久化方面的考量,将不可避免地带来具大的性能消耗,所以RocketMQ只支持特定级别的延迟消息。消息延迟级别在Broker端通过messageDelayLevel配置,默认为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m30m 1h 2h" , delayLevel=1表示延迟1s,delayLevel=2 表示延迟5s,依次类推。说到定时任务,上文提到的消息重试正是借助定时任务实现的,在将消息存人commitlog文件之前需要判断消息的重试次数,如果大于0,则会将消息的主题设置为SCHEDULETOPICxxxX。RocketMQ定时消息实现类为org.apache.rocketmq.store.schedule. ScheduleMessageService。该类的实例在DefaultMessageStore中创建,通过在DefaultMessageStore中调用load方法加载并调用start方法进行启动。接下来我们分析一下ScheduleMessageService实现原理。
start
start根据延迟级别创建对应的定时任务,启动定时任务持久化延迟消息队列进度存储。
- 根据延迟队列创建定时任务,遍历延迟级别,根据延迟级别level从offsetTable中获取消费队列的消费进度,如果不存在,则使用0。也就是说每一个延迟级别对应一个消息消费队列。然后创建定时任务,每一个定时任务第一次启动时默认延迟1s 先执行一次定时任务,第二次调度开始才使用相应的延迟时间。延迟级别与消息消费队列的映射关系为:消息队列ID=延迟级别一1。
- 创建定时任务,每隔10s持久化一次延迟队列的消息消费进度(延迟消息调进度),持久化频率可以通过flushDelayOffsetInterval配置属性进行设置。
定时消息的第一个设计关键点是,定时消息单独一个主题: SCHEDULE_TOPIC_XXXX,该主题下队列数量等于MessageStoreConfig#messageDelayLevel配置的延迟级别数量,其对应关系为queueId等于延迟级别减1。ScheduleMessageService 为每一个延迟级别创建一个定时Timer根据延迟级别对应的延迟时间进行延迟调度。在消息发送时,如果消息的延迟级别delayLevel 大于0,将消息的原主题名称、队列ID存入消息的属性中,然后改变消息的主题、队列与延迟主题与延迟主题所属队列,消息将最终转发到延迟队列的消费队列。
定时调度逻辑
ScheduleMessageService的start方法启动后,会为每一个延迟级别创建一个调度任务,每一个延迟级别其实对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。定时调度任务的实现类为DeliverDelayedMessageTimerTask, 其核心实现为executeOnTimeup。
- 根据队列ID与延迟主题查找消息消费队列,如果未找到,说明目前并不存在该延时级别的消息,忽略本次任务,根据延时级别创建下一次调度任务即可。
- 根据offset从消息消费队列中获取当前队列中所有有效的消息。如果未找到,则更新一下延迟队列定时拉取进度并创建定时任务待下一次继续尝试。
- 遍历ConsumeQueue,每一个标准ConsumeQueue条目为20个字节。解析出消息的物理偏移量、消息长度、消息tag hashcode,为从commitlog加载具体的消息做准备。
- 根据消息物理偏移量与消息大小从commitlog文件中查找消息。如果未找到消息,打印错误日志,根据延迟时间创建下一个定时器。
- 根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并恢复消息原先的消息主题与消息消费队列,消息的消费次数reconsumeTimes并不会丢失。
- 将消息再次存人到commitlog,并转发到主题对应的消息队列上,供消费者再次消费。
- 更新延迟队列拉取进度。
定时消息的第二个设计关键点:消息存储时如果消息的延迟级别属性delayLevel大于0,则会备份原主题、原队列到消息属性中,其键分别为PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID,通过为不同的延迟级别创建不同的调度任务,当时间到达后执行调度任务,调度任务主要就是根据延迟拉取消息消费进度从延迟队列中拉取消息,然后从commitlog中加载完整消息,清除延迟级别属性并恢复原先的主题、队列,再次创建一条新的消息存入到commitlog中并转发到消息消费队列供消息消费者消费。
发送
|
|
接收:
|
|
文章作者 Forz
上次更新 2020-04-21