什么是ACL?

RocketMQ在4.4.0版本开始支持ACL。ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

  • 用户 用户是访问控制的基础要素,也不难理解,RocketMQ ACL必然也会引入用户的概念,即支持用户名、密码。
  • 资源 资源,需要保护的对象,在RocketMQ中,消息发送涉及的Topic、消息消费涉及的消费组,应该进行保护,故可以抽象成资源。
  • 权限 针对资源,能进行的操作,
  • 角色 RocketMQ中,只定义两种角色:是否是管理员。

另外,RocketMQ还支持按照客户端IP进行白名单设置。

ACL基本流程图

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:

如何配置ACL

acl配置文件

acl默认的配置文件名:plain_acl.yml,需要放在{ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。

globalWhiteRemoteAddresses

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:

  • 空 表示不设置白名单,该条规则默认返回false。
  • “*” 表示全部匹配,该条规则直接返回true,将会阻断其他规则的判断,请慎重使用。
  • 192.168.0.{100,101} 多地址配置模式,ip地址的最后一组,使用{},大括号中多个ip地址,用英文逗号(,)隔开。
  • 192.168.1.100,192.168.2.100 直接使用,分隔,配置多个ip地址。
  • 192.168..或192.168.100-200.10-20 每个IP段使用 “*” 或"-“表示范围。

accounts

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

accessKey

登录用户名,长度必须大于6个字符。

secretKey

登录密码。长度必须大于6个字符。

whiteRemoteAddress

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

admin

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

  • UPDATE_AND_CREATE_TOPIC 更新或创建主题。
  • UPDATE_BROKER_CONFIG 更新Broker配置。
  • DELETE_TOPIC_IN_BROKER 删除主题。
  • UPDATE_AND_CREATE_SUBSCRIPTIONGROUP 更新或创建订阅组信息。
  • DELETE_SUBSCRIPTIONGROUP 删除订阅组信息。

defaultTopicPerm

默认topic权限。该值默认为DENY(拒绝)。

defaultGroupPerm

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

topicPerms

设置topic的权限。其类型为数组,其可选择值在下节介绍。

groupPerms

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

RocketMQ ACL权限可选值

  • DENY 拒绝。
  • PUB 拥有发送权限。
  • SUB 拥有订阅权限。

权限验证流程

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。

配置示例

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。

broker.conf的配置文件如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort=10915
storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
aclEnable=true

plain_acl.yml文件内容如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
globalWhiteRemoteAddresses:

accounts:
- accessKey: RocketMQ
  secretKey: 12345678
  whiteRemoteAddress:
  admin: false
  defaultTopicPerm: DENY
  defaultGroupPerm: SUB
  topicPerms:
  - TopicTest=PUB
  groupPerms:
  # the group should convert to retry topic
  - oms_consumer_group=DENY

- accessKey: admin
  secretKey: 12345678
  whiteRemoteAddress:
  # if it is admin, it could access all resources
  admin: true

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

代码演示

生产端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
	p, err := rocketmq.NewProducer(
		producer.WithNameServer([]string{"127.0.0.1:9876"}),
		producer.WithRetry(2),
		producer.WithCredentials(primitive.Credentials{
			AccessKey: "RocketMQ",
			SecretKey: "12345678",
		}),
	)

	if err != nil {
		fmt.Println("init producer error: " + err.Error())
		os.Exit(0)
	}

	err = p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s", err.Error())
		os.Exit(1)
	}
	for i := 0; i < 100000; i++ {
		res, err := p.SendSync(context.Background(), primitive.NewMessage("test",
			[]byte("Hello RocketMQ Go Client!")))

		if err != nil {
			fmt.Printf("send message error: %s\n", err)
		} else {
			fmt.Printf("send message success: result=%s\n", res.String())
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Printf("shutdown producer error: %s", err.Error())
	}
}

消费端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("testGroup"),
		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
		consumer.WithCredentials(primitive.Credentials{
			AccessKey: "RocketMQ",
			SecretKey: "12345678",
		}),
	)
	if err != nil {
		fmt.Println("init consumer error: " + err.Error())
		os.Exit(0)
	}

	err = c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		fmt.Printf("subscribe callback: %v \n", msgs)
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Println(err.Error())
	}
	// Note: start after subscribe
	err = c.Start()
	if err != nil {
		fmt.Println(err.Error())
		os.Exit(-1)
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Printf("Shutdown Consumer error: %s", err.Error())
	}
}