实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset有关。

首先来明确一下Offset的含义,RocketMQ中,一种类型的消息会放到一个Topic里,为了能够并行,一般一个Topic会有多个Message Queue (也可以设置成一个),Offset 是指某个Topic下的一条消息在某个Message Queue里的位置,通过Offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后继续处理。

Offset的类结构,主要分为本地文件类型和Broker代存的类型两种。对于DefaultMQPushConsumer来说,默认是CLUSTERING模式,也就是同一个Consumer group里的多个消费者每人消费一部分,各自收到的消息内容不一样。这种情况下,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。

在DefaultMQPushConsumer里的BROADCASTING模式下,每个Consumer都收到这个Topic的全部消息,各个Consumer间相互没有干扰,RocketMQ 使用LocalFileOffsetStore,把Offset存到本地。

OffsetStore使用Json格式存储,简洁明了,下面是个例子:

在使用DefaultMQPushConsumer的时候,我们不用关心OffsetStore的事,但是如果PullConsumer,我们就要自己处理OffsetStore了。如果我们在代码里把Offset存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失Offset,在实际应用中不推荐这样做。

了解OffsetStore的存储机制以后,我们看看如何设置Consumer读取消息的初始位置。DefaultMQPushConsumer类里有个函数用来设置从哪儿开始消费消息:比如 consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),这个语句设置从最小的Offset开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用ConsumeFromFirstOffset参数就不合适了,可以设置从某个时间开始消费消息,比如Consumer.setConsumeFromWhere(ConsumeFrom Where.CONSUME_ FROM_ TIMESTAMP),Consumer.setConsumeTimestamp("20131223171201"),时间戳格式是精确到秒的。

注意设置读取位置不是每次都有效,它的优先级默认在OffsetStore后面,比如在DefaultMQPushConsumer的BROADCASTING方式下,默认是从Broker里读取某个Topic对应ConsumerGroup的Offset,当读取不到Offset的时候,ConsumeFrom Where的设置才生效。大部分情况下这个设置在ConsumerGroup初次启动时有效。如果Consumer正常运行后被停止,然后再启动,会接着上次的Offset开始消费,ConsumeFromWhere的设置无效。