前言

RmqClient是客户端各种类型的Consumer和Producer的底层类。这个类首先从NameServer获取并保存各种配置信息,比如Topic的Route信息。同时RmqClient还会通过MQClientAPIImpl类实现消息的收发,也就是从Broker获取消息或者发送消息到Broker。

创建

既然RmqClient实现的是底层通信功能和获取并保存元数据的功能,就没必要每个Consumer或Producer都创建一个对象,一个RmqClient对象可以被多个Consumer或Producer公用。RocketMQ通过一个工厂类达到共用RmqClient的目的。创建函数为func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient

注意, RmqClient是通过工厂类被创建的,并不是一个单例模式,有些情况下需要创建多个实例。首先来看看 RmqClient的创建规则.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) RMQClient {
	client := &rmqClient{
		option:       option,
		remoteClient: remote.NewRemotingClient(),
		namesrvs:     option.Namesrv,
		done:         make(chan struct{}),
	}
	actual, loaded := clientMap.LoadOrStore(client.ClientID(), client)
	if !loaded {
		client.remoteClient.RegisterRequestFunc(ReqNotifyConsumerIdsChanged, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive broker's notification to consumer group", map[string]interface{}{
				rlog.LogKeyConsumerGroup: req.ExtFields["consumerGroup"],
			})
			client.RebalanceImmediately()
			return nil
		})
		client.remoteClient.RegisterRequestFunc(ReqCheckTransactionState, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			header := new(CheckTransactionStateRequestHeader)
			header.Decode(req.ExtFields)
			msgExts := primitive.DecodeMessage(req.Body)
			if len(msgExts) == 0 {
				rlog.Warning("checkTransactionState, decode message failed", nil)
				return nil
			}
			msgExt := msgExts[0]
			// TODO: add namespace support
			transactionID := msgExt.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
			if len(transactionID) > 0 {
				msgExt.TransactionId = transactionID
			}
			group := msgExt.GetProperty(primitive.PropertyProducerGroup)
			if group == "" {
				rlog.Warning("checkTransactionState, pick producer group failed", nil)
				return nil
			}
			if option.GroupName != group {
				rlog.Warning("producer group is not equal", nil)
				return nil
			}
			callback := &CheckTransactionStateCallback{
				Addr:   addr,
				Msg:    msgExt,
				Header: *header,
			}
			callbackCh <- callback
			return nil
		})

		client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
			rlog.Info("receive get consumer running info request...", nil)
			header := new(GetConsumerRunningInfoHeader)
			header.Decode(req.ExtFields)
			val, exist := clientMap.Load(header.clientID)
			res := remote.NewRemotingCommand(ResError, nil, nil)
			if !exist {
				res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
			} else {
				cli, ok := val.(*rmqClient)
				var runningInfo *ConsumerRunningInfo
				if ok {
					runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
				}
				if runningInfo != nil {
					res.Code = ResSuccess
					data, err := runningInfo.Encode()
					if err != nil {
						res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
					} else {
						res.Body = data
					}
				} else {
					res.Remark = "there is unexpected error when get running info, please check log"
				}
			}
			return res
		})
	}
	return actual.(*rmqClient)
}


func (c *rmqClient) ClientID() string {
	id := c.option.ClientIP + "@"
	if c.option.InstanceName == "DEFAULT" {
		id += strconv.Itoa(os.Getpid())
	} else {
		id += c.option.InstanceName
	}
	if c.option.UnitName != "" {
		id += "@" + c.option.UnitName
	}
	return id
}

系统中维护了clientMap这个Map对象,每创建一个新的 MQClient,都会以 clientId作为Key放人Map结构中。 clientId的格式是“clientIP”+@+“Instancename”,其中 clientIP是客户端机器的IP地址,一般不会变,instancename有默认值,也可以被手动设置。

clientId为客户端IP+instance+(unitname可选),如果在同一台物理服务器部署两个应用程序,应用程序岂不是clientId相同,会造成混乱?

为了避免这个问题,如果instance为默认值DEFAULT的话,RocketMQ会自动将instance设置为进程ID,这样避免了不同进程的相互影响,但同一个进程中的不同消费者和不同生产者在启动时获取到的MQClientInstane实例都是同一个。

普通情况下,一个用到RocketMQ客户端的程序,只要有一个RmqClient实例就够了。这时候创建一个或多个Consumer或者 Producer,底层使用的是同一个RmqClient实例

创建一个 DefaultMQPushConsumer来接收消息,没有设置这个 Consumer的 InstanceName参数(通过 setInstancename函数进行设置),这个时候 InstanceName的值是默认的“DEFAULT”。实际创建的RmqClient个数由设定的逻辑进行控制。

从 InstanceName的创建逻辑就可以看出,如果创建 Consumer或者 Producer类型的时候不手动指定 InstanceName,进程中只会有一个RmqClient对象。

有些情况下只有一个RmqClient对象是不够的,比如一个程序需要连接两个RoceketMQ集群,从一个集群读取消息,发送到另一个集群,一个RmqClient对象无法支持这种场景。这种情况下一定要手动指定不同的InstanceName,底层会创建两个RmqClient对象。

功能

首先来看一下 RmqClient的 Start函数,从Start函数中的逻辑能大致了解 RmqClient的功能.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
func (c *rmqClient) Start() {
	//ctx, cancel := context.WithCancel(context.Background())
	//c.cancel = cancel
	c.once.Do(func() {
		if !c.option.Credentials.IsEmpty() {
			c.remoteClient.RegisterInterceptor(remote.ACLInterceptor(c.option.Credentials))
		}
		// fetchNameServerAddr
		if len(c.option.NameServerAddrs) == 0 {
			c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
			go primitive.WithRecover(func() {
				op := func() {
					c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
				}
				time.Sleep(10 * time.Second)
				op()

				ticker := time.NewTicker(2 * time.Minute)
				defer ticker.Stop()
				for {
					select {
					case <-ticker.C:
						op()
					case <-c.done:
						rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
							"clientID": c.ClientID(),
						})
						return
					}
				}
			})
		}

		// schedule update route info
		go primitive.WithRecover(func() {
			// delay
			op := func() {
				c.UpdateTopicRouteInfo()
			}
			time.Sleep(10 * time.Millisecond)
			op()

			ticker := time.NewTicker(_PullNameServerInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		go primitive.WithRecover(func() {
			op := func() {
				c.namesrvs.cleanOfflineBroker()
				c.SendHeartbeatToAllBrokerWithLock()
			}

			time.Sleep(time.Second)
			op()

			ticker := time.NewTicker(_HeartbeatBrokerInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		// schedule persist offset
		go primitive.WithRecover(func() {
			op := func() {
				c.consumerMap.Range(func(key, value interface{}) bool {
					consumer := value.(InnerConsumer)
					err := consumer.PersistConsumerOffset()
					if err != nil {
						rlog.Error("persist offset failed", map[string]interface{}{
							rlog.LogKeyUnderlayError: err,
						})
					}
					return true
				})
			}
			time.Sleep(10 * time.Second)
			op()

			ticker := time.NewTicker(_PersistOffsetInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					op()
				case <-c.done:
					rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})

		go primitive.WithRecover(func() {
			ticker := time.NewTicker(_RebalanceInterval)
			defer ticker.Stop()
			for {
				select {
				case <-ticker.C:
					c.RebalanceImmediately()
				case <-c.done:
					rlog.Info("The RMQClient stopping do rebalance", map[string]interface{}{
						"clientID": c.ClientID(),
					})
					return
				}
			}
		})
	})
}

RmqClient会定时进行如下几个操作:获取NameServer地址、更新TopicRoute信息、清理离线的Broker和保存消费者的Offset。