Topic 与 Tag 最佳实践

在RocketMQ中,Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。您可通过本文了解如何搭配使用 Topic 和 Tag 来实现消息过滤。

背景信息

Topic 和 Tag 的定义如下:

  • Topic 消息主题,通过 Topic 对不同的业务消息进行分类。
  • Tag 消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

Topic 和 Tag 的关系如下图所示。

适用场景

您可能会有这样的疑问:到底什么时候该用 Topic,什么时候该用 Tag?

建议您从以下几个方面进行判断:

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。

场景示例

以天猫交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建 Topic_Order 和 Topic_Pay,其中订单消息根据商品品类以不同的 Tag 再进行细分,列如电器类、男装类、女装类、化妆品类等被各个不同的系统所接收。

通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。

消费幂等

为了防止消息重复消费导致业务处理异常,消息队列 RocketMQ 版的消费者在接收到消息后,有必要根据业务上的唯一 Key 对消息做幂等处理。本文介绍消息幂等的概念、适用场景以及处理方法。

什么是消息幂等

当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这整个过程就可实现消息幂等。

例如,在支付场景下,消费者消费扣款消息,对一笔订单执行扣款操作,扣款金额为 100 元。如果因网络不稳定等原因导致扣款消息重复投递,消费者重复消费了该扣款消息,但最终的业务结果是只扣款一次,扣费 100 元,且用户的扣款记录中对应的订单只有一条扣款流水,不会多次扣除费用。那么这次扣款操作是符合要求的,整个消费过程实现了消费幂等。

适用场景

在互联网应用中,尤其在网络不稳定的情况下,RocketMQ的消息有可能会出现重复。如果消息重复会影响您的业务处理,请对消息做幂等处理。

消息重复的场景如下:

发送时消息重复

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

投递时消息重复

消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及消费者应用重启)

当消息队列 RocketMQ 版的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

处理方法

因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。

以支付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下:

1
2
3
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);  

消费者收到消息时可以根据消息的 Key,即订单号来实现消息幂等:

1
2
3
4
5
6
consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的 Key 做幂等处理
    }
});           

订阅关系一致

订阅关系一致指的是同一个消费者 Group ID 下所有 Consumer 实例的处理逻辑必须完全一致。一旦订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。本文提供订阅关系不一致的示例代码,帮助您顺畅地订阅消息。

背景信息

消息队列 RocketMQ 版里的一个消费者 Group ID 代表一个 Consumer 实例群组。对于大多数分布式应用来说,一个消费者 Group ID 下通常会挂载多个 Consumer 实例。

由于消息队列 RocketMQ 版的订阅关系主要由 Topic + Tag 共同组成,因此,保持订阅关系一致意味着同一个消费者 Group ID 下所有的实例需在以下两方面均保持一致:

  • 订阅的 Topic 必须一致
  • 订阅的 Topic 中的 Tag 必须一致

正确订阅关系图片示例

多个 Group ID 订阅了多个 Topic,并且每个 Group ID 里的多个消费者实例的订阅关系保持了一致。

错误订阅关系图片示例

单个 Group ID 订阅了多个 Topic,但是该 Group ID 里的多个消费者实例的订阅关系并没有保持一致。

错误订阅关系代码示例一

以下例子中,同一个 Group ID 下的两个实例订阅的 Topic 不一致。

Consumer 实例 1-1:

1
2
3
4
5
6
7
8
9
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });    

Consumer 实例 1-2:

1
2
3
4
5
6
7
8
9
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_1");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_B ", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });     

错误订阅关系代码示例二

以下例子中,同一个 Group ID 下订阅 Topic 的 Tag 不一致。Consumer 实例 2-1 订阅了 TagA,而 Consumer 实例 2-2 未指定 Tag。

Consumer 实例 2-1:

1
2
3
4
5
6
7
8
9
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });   

Consumer 实例 2-2:

1
2
3
4
5
6
7
8
9
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_2");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "*", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });   

错误订阅关系代码示例三

此例中,错误的原因如下所述:

  • 同一个 Group ID 下订阅 Topic 个数不一致。
  • 同一个 Group ID 下订阅 Topic 的 Tag 不一致。

Consumer 实例 3-1:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagA", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });
    consumer.subscribe("jodie_test_B", "TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });         

Consumer 实例 3-2:

1
2
3
4
5
6
7
8
9
    Properties properties = new Properties();
    properties.put(PropertyKeyConst.GROUP_ID, "GID_jodie_test_3");
    Consumer consumer = ONSFactory.createConsumer(properties);
    consumer.subscribe("jodie_test_A", "TagB", new MessageListener() {
        public Action consume(Message message, ConsumeContext context) {
            System.out.println(message.getMsgID());
            return Action.CommitMessage;
        }
    });