暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ中消息的发送

程序猿研习社 2021-09-06
681

RocketMQ中NameServer的设计的结尾我们提到:NameServer需要等Broker失效至少120s才能将该Broker从路由表中移除掉,那如果在Broker故障期间,消息生产者 Producer根据主题获取到的路由信息包含已经看机的 Broker,会导致消息发送失败,那这种情况怎么办,岂不是消息发送不是高可用的?今天我们带着这个问题来看一下RocketMQ发送消息时都做了些什么,问题答案就会出来...


RocketMQ 支持 3 种消息发送方式 :

  1. 同步(sync)

  2. 异步(async)

  3. 单向(oneway)

同步 : 发送者向MQ发送消息后 ,同步等待, 直到消息服务器返回发送结果 。

异步 : 发送者向MQ发送消息时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞 ,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行 。

单向:消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果, 也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上 。


消息发送基本流程

  1. 验证消息

  2. 查找路由

  3. 消息发送

发送消息的方法如下图,我们直接跟踪代码

Valicators.checkMessage方法为验证消息(基本校验)

tryToFindTopicPublishInfo,首先从本地找,找不到从NameServer找

TopicPulishInfo里面是关于Topic和队列的信息,如下图

selectOneMessageQueue为选择一个消息队列,这里就是文章开头问题的答案

sendLatencyFaultEnable属性代表是否开启规避发送失败的规避策略,也叫故障延迟

这里我们返回再看一眼sendDefaultImpl方法,在消息发送失败时,会调用updateFaultItem方法

进入updateFaultItem方法看一下,实际上做的类似一个我们小时候上学时的“罚站”操作,例如:老师挨个找每位小朋友回答问题,到你了没回答上来,老师说:你去后面站着去,五分钟之后我再考你!😝

这个方法是把不好使的Broker放入一个HashMap中,如果之前就不好使,那就更新罚站时间,如果之前没罚站过,则直接添加;

看到这里我们再次回到上面的selectOneMessageQueue方法

首先,Topic对应的队列是例如下面的结构:

如下红框内的代码,在某个Broker不好使并且“罚站”时间没到的时候,会在循环内“规避”掉该无效的Broker所属的所有MessageQueue


队列选择好之后,会调用sendKernelImpl进行消息的发送,发送消息过程很清晰

  1. 找到Broker的网络地址

  2. 验证一下消息大小是否需要进行压缩

  3. 判断是否注册了钩子函数(消息发送前后执行)

发送消息会根据模式来执行switch中的语句

  • 异步,则指定回调函数并直接返回

  • 同步,等待结果返回并不指定回调函数

  • 单向,啥都不做


总结:

闭眼睛想想,其实就三步完成发消息

  1. 验证

  2. 找队列

  3. 发消息

貌似也没想象中的复杂

消息发送高可用主要通过两个手段 : 重试与Broker规避(罚站)。Broker规避就是在一次消息发送过程中发现错误,在某一时间段内,消息生产者不会选择该 Broker(消息服务器)上的消息队列,提高发送消息的成功率。

文章转载自程序猿研习社,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论