rocketmq的消息发送
文章目录
前言
RocketMQ支持3种消息发送方式:同步(sync)、异步(async)、单向 (oneway)
- 同步:发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
- 异步:发送者向MQ执行发送消息API时,指定消息发送成功后的回掉函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
- 单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上。
RocketMQ消息发送需要考虑以下几个问题:
- 消息队列如何进行负载?
- 消息发送如何实现高可用?
- 批量消息发送如何实现一致性?
认识消息
|
|
Message的基础属性主要包括消息所属主题topic、消息Flag(RocketMQ不做处理)、扩展属性、消息体。
Message扩展属性主要包含下面几个。
- tag:消息TAG,用于消息过滤。
- keys: Message索引键,多个用空格隔开, RocketMQ可以根据这些key快速检索到消息。
- waitStoreMsgOK:消息发送时是否等消息存储完成后再返回。
- delayTimeLevel:消息延迟级别,用于定时消息或消息重试。
这些扩展属性存储在Message的properties中。
消息发送的基本流程
消息发送流程主要的步骤:验证消息、查找路由、消息发送(包含异常处理机制).
消息长度验证
消息发送之前,首先确保生产者处于运行状态,然后验证消息是否符合相应的规范,具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度4M (maxMessageSize=10241024 4)。
查找主题路由信息
消息发送之前,首先需要获取主题的路由信息,只有获取了这些信息我们才知道消息要发送到具体的Broker节点。
tryToFindTopicPublishInfo是查找主题的路由信息的方法。如果生产者中缓存了topic的路由信息,如果该路由信息中包含了消息队列,则直接返回该路由信息,如果没有缓存或没有包含消息队列,则向NameServer查询该topic的路由信息。如果最终未找到路由信息,则抛出异常:无法找到主题相关路由信息异常。
循环遍历路由信息的QueueData信息,如果队列没有写权限,则继续遍历下一个QueueData ;根据brokerName找到brokerData信息,找不到或没有找到Master节点,则遍历下一个QueueData ;根据写队列个数,根据topic+序号创建MessageQueue,填充topicPublishInfo的List<QuueMessage>。完成消息发送的路由查找。
选择消息队列
根据路由信息选择消息队列,返回的消息队列按照broker、序号排序。举例说明,如果topicA在broker一a,broker一b.上分别创建了4个队列,那么返回的消息队列:[{“broker一Name”: ”broker一a ,”queueId” :0},{“brokerName”:”broker一a ,”queueId”:1},{“brokerName”:”broker一a”,”queueId" :2},{“brokerName”:”broker一a”,”queueId”:3},{“brokerName”:”broker一b”,” queueId”:0},{ “brokerName”:”broker一b”,”queueId”:1},{“brokerName”:”broker一b”,”queueId”:2}, {“brokerName” :” broker一b”,”queueld” :3}], 那RocketMQ如何选择消息队列呢?
首先消息发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。由retryTimesWhenSendAsyncFailed指定,接下来就是循环执行,选择消息队列、发送消息,发送成功则返回,收到异常则重试。选择消息队列有两种方式。
- sendLatencyFultEnable=false,默认不启用Broker故障延迟机制。
- sendLatencyFaultEnable= 一true,启用Broker故障延迟机制。
故障延迟机制
首先在一次消息发送过程中,可能会多次执行选择消息队列这个方法,lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时,lastBrokerName为null,此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模,返回该位置的MessageQueue(selectOneMessageQueue()方法),如果消息发送再失败的话,下次进行消息队列选择时规避上次MesageQueue所在的Broker,否则还是很有可能再次失败。
该算法在一次消息发送过程中能成功规避故障的Broker,但如果Broker宕机,由于路由算法中的消息队列是按Broker排序的,如果上一次根据路由算法选择的是宕机的Broker的第一个队列,那么随后的下次选择的是宕机Broker的第二个队列,消息发送很有可能会.失败,再次引发重试,带来不必要的性能损耗,那么有什么方法在一次消息发送失败后,暂时将该Broker排除在消息队列选择范围外呢?或许有朋友会问,Broker 不可用后,路由信息中为什么还会包含该Broker的路由信息呢?其实这不难解释:首先,NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中。
- 根据对消息队列进行轮询获取一个消息队列。
- 验证该消息队列是否可用,latencyFaultTolerance.isAvailable(mq.getBrokerName)是关键。
- 如果返回的MessageQueue可用,移除latencyFaultTolerance关于该topic条目,表明该Broker故障已经恢复。
消息发送
- 根据MessageQueue获取Broker的网络地址。如果MQClientInstance的brokerAddrTable未缓存该Broker的信息,则从NameServer主动更新一下topic的路由信息。如果路由更新后还是找不到Broker信息,则抛出MQClientException,提示Broker不存在。
- 为消息分配全局唯一ID,如果消息体默认超过4K(compressMsgBodyOverHowmuch),会对消息体采用zip压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED FLAG。如果是事务Prepared消息,则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED TYPE。
- 如果注册了消息发送钩子函数,则执行消息发送之前的增强逻辑。
- 构建消息发送请求包。主要包含如下重要信息:生产者组、主题名称、默认创建主题Key、该主题在单个Broker默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、消息发送时间、消息标记(RocketMQ对消息中的flag不做任何处理,供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息等。
- 根据消息发送方式,同步、异步、单向方式进行网络传输。
返回状态
消息发送的返回状态有如下四种:FLUSH_DISK TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE NOT_ AVAILABLE、SENDOK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的。
- FLUSH DISK_TIMEOUT :表示没有在规定时间内完成刷盘(需要Broker的刷盘策略被设置成SYNC_FLUSH才会报这个错误)。
- FLUSH_SLAVE_TIMEOUT:表示在主备方式下,并且Broker被设置成SYNC_MASTER方式,没有在设定时间内完成主从同步。
- SLAVE_NOT_AVAILABLE :这个状态产生的场景和FLUSH_SLAVE_TIMEOUT类似,表示在主备方式下,并且Broker被设置成SYNC_MASTER,但是没有找到被配置成Slave的Broker。
- SEND_OK:表示发送成功,发送成功的具体含义,比如消息是否已经被存储到磁盘?消息是否被同步到了Slave 上?消息在Slave上是否被写人磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK。
同步发送
主要流程是:创建一个DefaultMQProducer对象,设置好GroupName和NameServer地址后启动,然后把待发送的消息拼装成Message对象,使用Producer来发送
- 检查消息发送是否合理,这里完成了以下几件事情。
- 检查该Broker是否有写权限。
- 检查该Topic是否可以进行消息发送。主要针对默认主题,默认主题不能发送消息,仅仅供路由查找。
- 在NameServer端存储主题的配置信息,默认路径: ${ROCKET_ HOME}/store/config/topic.json。下面是主题存储信息。order:是否是顺序消息;perm:权限码;read lQueueNums :读队列数量; writeQueueNums :写队列数量; to topicNa e:主题名称;topicSysFlag : topic Flag,当前版本暂为保留; topicFilterType :主题过滤方式,当前版本.仅支持SINGLE TAG。
- 检查队列,如果队列不合法,返回错误码。
- 如果消息重试次数超过允许的最大重试次数,消息将进人到DLD延迟队列。
- 调用DefaultMessageStore#putMessage进行消息存储。
|
|
异步发送
消息异步发送是指消息生产者调用发送的API后,无须阻塞等待消息服务器返回本次消息发送结果,只需要提供一个回调函数,供消息发送客户端在收到响应结果回调。异步方式相比同步方式,消息发送端的发送性能会显著提高,但为了保护消息服务器的负载压力,RocketMQ对消息发送的异步消息进行了并发控制,通过参数clientAsyncSemaphoreValue来控制,默认为65535。 异步消息发送虽然也可以通过retryTimesWhenSendAsyncFailed属性来控制消息重试次数,但是重试的调用入口是在收到服务端响应包时进行的,如果出现网络异常、网络超时等将不会重试。
|
|
单向发送
单向发送是指消息生产者调用消息发送的API后,无须等待消息服务器返回本次消息发送结果,并且无须提供回调函数,表示消息发送压根就不关心本次消息发送是否成功,其实现原理与异步消息发送相同,只是消息发送客户端在收到响应结果后什么都不做而已,并且没有重试机制。
|
|
批量发送
批量消息发送是将同一主题的多条消息一起打包发送到消息服务端,减少网络调用次数,提高网络传输效率。当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长,则打包多条消息发送会影响其他线程发送消息的响应时间,并且单批次消息发送总长度不能超过maxMessageSize。批量消息发送要解决的是如何将这些消息编码以便服务端能够正确解码出每条消息的消息内容。
单条消息发送时,消息体的内容将保存在body中。批量消息发送,需要将多条消息体的内容存储在body中,如何存储方便服务端正确解析出每条消息呢?
RocketMQ采取的方式是,对单条消息内容使用固定格式进行存储.
接下来梳理一下批量消息发送的核心流程。
首先在消息发送端,调用batch方法,将一批消息封装成MessageBatch对象。MessageBatch继承自Message 对象,MessageBatch 内部持有List<Message> messages。 这样的话,批量消息发送与单条消息发送的处理流程完全一样。MessageBatch只需要将该集合中的每条消息的消息体body聚合成一个byte[] 数值,在消息服务端能够从该byte[]数值中正确解析出消息即可。
在消息发送端将会按照上述结构进行解码,然后整个发送流程与单个消息发送没什么差异,就不一一介绍了.
|
|
发送队列自选择
消息发送默认根据主题的路由信息(主题消息队列)进行负载均衡,负载均衡机制为轮询策略。例如现在有这样一个场景,订单的状态变更消息发送到特定主题,为了避免消息消费者同时消费同一订单的不同状态的变更消息,我们应该使用顺序消息。为了提高消息消费的并发度,如果我们能根据某种负载算法,相同订单的不同消息能统一发到同一个消息消费队列上,则可以避免引人分布式锁,RocketMQ 在消息发送时提供了消息队列选择器MessageQueueSelector。
如何提高发送能力
发送一条消息出去要经过三步,一是客户端发送请求到服务器,二是服务器处理该请求,三是服务器向客户端返回应答,一次消息的发送耗时是上述三个步骤的总和。在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用Oneway方式发送,Oneway 方式只发送请求不等待应答,即将数据写人客户端的Socket缓冲区就返回,不等待对方返回结果,用这种方式发送消息的耗时可以缩短到微秒级。
另一种提高发送速度的方法是增加Producer的并发量,使用多个Producer同时发送,我们不用担心多Producer同时写会降低消息写磁盘的效率,RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写人DirectMem中,然后异步地将连续一段无空 洞的数据刷人文件系统当中。顺序写CommitLog可让RocketMQ无论在HDD还是SSD磁盘情况下都能保持较高的写入性能。
文章作者 Forz
上次更新 2020-04-21