在Broker端进行消息过滤,可以减少无效消息发送到Consumer,少占用网络带宽从而提高吞吐量。Broker端有三种方式进行消息过滤。

消息的Tag和Key

对一个应用来说,尽可能只用一个Topic,不同的消息子类型用Tag来标识(每条消息只能有一个Tag),服务器端基于Tag进行过滤,并不需要读取消息体的内容,所以效率很高。发送消息设置了Tag以后,消费方在订阅消息时,才可以利用Tag在Broker端做消息过滤。

其次是消息的Key。对发送的消息设置好Key,以后可以根据这个Key来查找消息。所以这个Key一般用消息在业务层面的唯一标识码来表示,这样后续查询消息异常,消息丢失等都很方便。Broker 会创建专门的索引文件,来存储Key到消息的映射,由于是哈希索引,应尽量使Key唯一,避免潜在的哈希冲突。

Tag和Key的主要差别是使用场景不同,Tag用在Consumer的代码中,用来进行服务端消息过滤,Key主要用于通过命令行查询消息。

通过Tag进行过滤

用Tag方式进行过滤的方法是传人感兴趣的Tag标签,Tag 标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效,Broker端可以在ConsumeQueue中做这种过滤,只从CommitLog里读取过滤后被命中的消息。看一下ConsumerQueue的存储格式.

ConsumeQueue的第三部分存储的是Tag对应的hashcode,是一个定长的字符串,通过Tag过滤的过程就是对比定长的hashcode。经过hashcode对比,符合要求的消息被从CommitLog读取出来,不用担心Hash冲突问题,消息在被消费前,会对比完整的Message Tag字符串,消除Hash冲突造成的误读。

生产端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
	p, _ := rocketmq.NewProducer(
		producer.WithNameServer([]string{"127.0.0.1:9876"}),
		producer.WithRetry(2),
	)
	err := p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	tags := []string{"TagA", "TagB", "TagC"}
	for i := 0; i < 3; i++ {
		tag := tags[i%3]
		msg := primitive.NewMessage("test",
			[]byte("Hello RocketMQ Go Client!"))
		msg.WithTag(tag)

		res, err := p.SendSync(context.Background(), msg)
		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

消费端:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, _ := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
	)
	selector := consumer.MessageSelector{
		Type:       consumer.TAG,
		Expression: "TagA || TagC",
	}
	err := c.Subscribe("TopicTest", selector, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("shutdown Consumer error: %s", err.Error())
	}
}

用 SQL表达式的方式进行过滤

使用Tag方式过滤虽然高效,但是支持的逻辑比较简单,在构造Message的时候,还可以通过putUserProperty函数来增加多个自定义的属性,基于这些属性可以做复杂的过滤逻辑.

我们用类似SQL表达式的方式对消息进行过滤.

类似SQL的过滤表达式,支持如下语法:

  • 数字对比,比如>、>=、<、<=、BETWEEN、=;
  • 字符串对比,比如=、<>、IN;
  • IS NULL or IS NOT NULL;
  • 逻辑符号AND、OR、NOT。

支持的数据类型:

  • 数字型,比如123、3.1415;
  • 字符型,比如’abc’、注意必须用单引号;
  • NULL,这个特殊字符;
  • 布尔型,TRUEorFALSE。

SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增大磁盘压力,没有Tag方式高效。

Filter Server方式过滤

Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。

要使用Filter Server,首先要在启动Broker前在配置文件里加上filterServerNums=3这样的配置,Broker 在启动的时候,就会在本机启动3个FilterServer进程。Filter Server类似一个RocketMQ的Consumer进程,它从本机Broker获取消息,然后根据用户上传过来的Java函数进行过滤,过滤后的消息再传给远端的Consumer。这种方式会占用很多Broker机器的CPU资源,要根据实际情况谨慎使用。上传的java代码也要经过检查,不能有申请大内存、创建线程等这样的操作,否则容易造成Broker服务器宕机。