在RocketMQ中NameServer的设计的结尾我们提到:NameServer需要等Broker失效至少120s才能将该Broker从路由表中移除掉,那如果在Broker故障期间,消息生产者 Producer根据主题获取到的路由信息包含已经看机的 Broker,会导致消息发送失败,那这种情况怎么办,岂不是消息发送不是高可用的?今天我们带着这个问题来看一下RocketMQ发送消息时都做了些什么,问题答案就会出来...
RocketMQ 支持 3 种消息发送方式 :
同步(sync)
异步(async)
单向(oneway)
同步 : 发送者向MQ发送消息后 ,同步等待, 直到消息服务器返回发送结果 。
异步 : 发送者向MQ发送消息时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞 ,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行 。
单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果, 也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上 。
消息发送基本流程
验证消息
查找路由
消息发送
发送消息的方法如下图,我们直接跟踪代码
Valicators.checkMessage方法为验证消息(基本校验)
tryToFindTopicPublishInfo,首先从本地找,找不到从NameServer找
TopicPulishInfo里面是关于Topic和队列的信息,如下图
selectOneMessageQueue为选择一个消息队列,这里就是文章开头问题的答案
sendLatencyFaultEnable属性代表是否开启规避发送失败的规避策略,也叫故障延迟
这里我们返回再看一眼sendDefaultImpl方法,在消息发送失败时,会调用updateFaultItem方法
进入updateFaultItem方法看一下,实际上做的类似一个我们小时候上学时的“罚站”操作,例如:老师挨个找每位小朋友回答问题,到你了没回答上来,老师说:你去后面站着去,五分钟之后我再考你!😝
这个方法是把不好使的Broker放入一个HashMap中,如果之前就不好使,那就更新罚站时间,如果之前没罚站过,则直接添加;
看到这里我们再次回到上面的selectOneMessageQueue方法
首先,Topic对应的队列是例如下面的结构:
如下红框内的代码,在某个Broker不好使并且“罚站”时间没到的时候,会在循环内“规避”掉该无效的Broker所属的所有MessageQueue
队列选择好之后,会调用sendKernelImpl进行消息的发送,发送消息过程很清晰
找到Broker的网络地址
验证一下消息大小是否需要进行压缩
判断是否注册了钩子函数(消息发送前后执行)
发送消息会根据模式来执行switch中的语句
异步,则指定回调函数并直接返回
同步,等待结果返回并不指定回调函数
单向,啥都不做
总结:
闭眼睛想想,其实就三步完成发消息
验证
找队列
发消息
貌似也没想象中的复杂
消息发送高可用主要通过两个手段 : 重试与Broker规避(罚站)。Broker规避就是在一次消息发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的消息队列,提高发送消息的成功率。