RocketMQ在集群模式的部署中,允许多Master和多Slave部署,在这种模式下,通过对broker设置,主备之间可以实现异步复制和同步复制两种方式来保证数据一致性,从而实现高可用。
顾名思义,异步复制在Master宕机或磁盘损坏情况,会丢失少量信息,但是性能更优;而同步复制在Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高,但是性能比异步复制模式稍低,大约低10%左右,发送单个消息的RT会稍高,目前主宕机后,备机不能自动切换为主机,但是上面的消息可以被消费。
下面,我们代码层面介绍异步复制和同步复制两种方式。
1
元数据同步 复制
//如果Broker是Slave
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
//启动一个定时的单线程池
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask syncAll slave exception", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {//如果是master
//定时打印master与slave的差距
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}复制
public void syncAll() {
//同步主题配置信息
this.syncTopicConfig();
//同步消费者偏移量信息
this.syncConsumerOffset();
//同步延迟消费的偏移量信息
this.syncDelayOffset();
//同步订阅的消息组信息
this.syncSubscriptionGroupConfig();
}复制
调用SlaveSynchronize.syncTopicConfig()方法向主用Broker发起topics.json文件的同步。大致步骤如下:
1)向主用Broker发起GET_ALL_TOPIC_CONFIG请求码,主用Broker将所有topic配置信息返回给备用Broker;
2)比较主备topic信息的DataVersion;若不同则用主用Broker返回的topic配置信息更新备用Broker的topic,并进行持久化,同时更新备用Broker中topic信息的DataVersion。
b、消费进度信息同步
调用SlaveSynchronize.syncConsumerOffset()方法向主用Broker发起consumerOffset.json文件的同步。大致步骤如下:
1)向主用Broker发起GET_ALL_CONSUMER_OFFSET请求码,主用Broker将所有ConsumerOffset配置信息返回给备用Broker;
2)更新备用Broker的ConsumerOffsetManager.offsetTable变量,同时进行持久化;
c、延迟消费进度信息同步
调用SlaveSynchronize.syncDelayOffset()方法向主用Broker发起delayOffset.json文件的同步。大致步骤如下:
向主用Broker发起GET_ALL_DELAY_OFFSET请求码,主用Broker将所有delayOffset信息返回给备用Broker;备用Broker直接将收到的delayOffset信息持久化到物理文件delayOffset.json中;
d、订阅关系同步
调用SlaveSynchronize.syncSubscriptionGroupConfig()方法向主用Broker发起delayOffset.json文件的同步。大致步骤如下:
1)向主用Broker发起GET_ALL_SUBSCRIPTIONGROUP_CONFIG请求码,主用Broker将所有SubscriptionGroup配置信息返回给备用Broker;
2)更新备用Broker的ConsumerOffsetManager. subscriptionGroupTable变量,同时进行持久化;
2
高可用之异步复制 复制
首先通过一张图片对主备Broker异步复制流程有个总体的认识,如下图所示:
Broker在部署时,在其broker.properties中允许设置一个属性参数:brokerRole,它有三种选择,默认为ASYNC_MASTER。如下图所示:
因为本节讲的是异步复制,所以主Broker的brokerRole的配置必须设置为:ASYNC_MASTER(这里注意:如果是备Broker,属性brokerRole的值必须设置为:SLAVE)。
/**
* 启动存储服务
*/
public void start() throws Exception {
// 在构造函数已经start了。
// this.indexService.start();
// 在构造函数已经start了。
// this.dispatchMessageService.start();
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
// slave不启动scheduleMessageService避免对消费队列的并发操作
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
if (this.reputMessageService != null) {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
this.reputMessageService.start();
}
//启动主备同步服务
this.haService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}复制
public void start() {
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
this.groupTransferService.start();
this.haClient.start();
}复制
public void beginAccept() {
try {
this.serverSocketChannel = ServerSocketChannel.open();
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
this.serverSocketChannel.socket().bind(this.socketAddressListen);
this.serverSocketChannel.configureBlocking(false);
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
catch (Exception e) {
log.error("beginAccept exception", e);
}
}复制
AcceptSocketService.start()方法调用时就启动了服务线程,该服务的run()方法将被执行,run()方法中监听备Broker发来的请求连接(OP_ACCEPT)事件,然后把对应的SocketChannel封装成HAConnection对象,然后调用该对象的start()方法,该方法将启动ReadSocketService和WriteSocketService服务线程,分别处理备Broker的OP_READ和OP_WRITE事件,相关代码如下:
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.selector.select(1000);
Set<SelectionKey> selected = this.selector.selectedKeys();
if (selected != null) {
for (SelectionKey k : selected) {
//接收备Broker的链接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info("HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
//封装socketchannel对象到conn中,内部单独处理读写事件
HAConnection conn = new HAConnection(HAService.this, sc);
//启动读写服务
conn.start();
HAService.this.addConnection(conn);
}
catch (Exception e) {
log.error("new HAConnection exception", e);
sc.close();
}
}
}
else {
log.warn("Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
}
catch (Exception e) {
log.error(this.getServiceName() + " service has exception.", e);
}
}
log.error(this.getServiceName() + " service end");
}复制
/**
* 向Slave传输数据协议 <Phy Offset> <Body Size> <Body Data><br>
* 从Slave接收数据协议 <Phy Offset>
*/
public void start() {
this.readSocketService.start();
this.writeSocketService.start();
}复制
HAConnection.ReadSocketService服务线程,该模块主要是读取备Broker的心跳信息,该信息就是8个字节,值为备用Broker的最大物理偏移量,在解析到该值之后,首先,将该值赋值给HAConnection.slaveAckOffset变量;然后若HAConnection.slaveRequestOffset小于零(在第一次启动时赋值为-1)则赋值给该变量;最后调用HAService.notifyTransferSome(long slaveAckOffset)方法,在该方法中,若slaveAckOffset大于HAService.push2SlaveMaxOffset的值则更新push2SlaveMaxOffset的值,并通知调用GroupTransferService.notifyTransferSome方法唤醒GroupTransferService服务线程。在同步复制模式下面,前端调用者会通过此线程服务来监听主备同步情况(第三节着重说)。
while (!this.isStoped()) {
try {
this.selector.select(1000);
//处理备broker传来的消息的offset
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error("processReadEvent error");
break;
}
// 检测心跳间隔时间,超过则强制断开
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
- this.lastReadTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaHousekeepingInterval()) {
log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr
+ "] expired, " + interval);
break;
}
}
catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}复制
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
if (!this.byteBufferRead.hasRemaining()) {
this.byteBufferRead.flip();
this.processPostion = 0;
}
while (this.byteBufferRead.hasRemaining()) {
try {
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
this.lastReadTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
// 接收Slave上传的offset
if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
long readOffset = this.byteBufferRead.getLong(pos - 8);
this.processPostion = pos;
// 处理Slave的请求
HAConnection.this.slaveAckOffset = readOffset;
if (HAConnection.this.slaveRequestOffset < 0) {
//读取到备broker传来到最大offset,赋值给slaveRequestOffset,注意:该变量在给备broker写数据到时候会用到
HAConnection.this.slaveRequestOffset = readOffset;
log.info("slave[" + HAConnection.this.clientAddr + "] request offset "
+ readOffset);
}
// 通知前端线程(同步复制的)
HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}
}
else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
else {
log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
return false;
}
}
catch (IOException e) {
log.error("processReadEvent exception", e);
return false;
}
}
return true;
}复制
此时,ReadSocketService服务已经读取到备broker传来的offset,并传给了变量slaveRequestOffset,该变量会被WriteSocketService服务读取,计算下一个offset并读取消息,把消息传给备broker。
while (!this.isStoped()) {
try {
this.selector.select(1000);
//查HAConnection.slaveRequestOffset是否等于-1,即刚启动的状态,还没有收到备用Broker端的最大偏移量值;
//则等待1秒钟之后再次监听slaveRequestOffset变量;若收到了备用Broker的最大偏移量,即不等于-1了
if (-1 == HAConnection.this.slaveRequestOffset) {
Thread.sleep(10);
continue;
}
// 第一次传输,需要计算从哪里开始
// Slave如果本地没有数据,请求的Offset为0,那么master则从物理文件最后一个文件开始传送数据
//检查WriteSocketService.nextTransferFromWhere是否等于-1,即刚启动的状态,若是则要计算从哪里开始读取数据进行同步
if (-1 == this.nextTransferFromWhere) {
//若HAConnection.slaveRequestOffset等于零,表示备用Broker端还没有commitlog数据,则将最后一个文件同步到备用Broker,
// 即nextTransferFromWhere=最大偏移量maxOffset-maxOffset%1G,得到的值为最后一个文件的开始偏移量;
if (0 == HAConnection.this.slaveRequestOffset) {
long masterOffset =
HAConnection.this.haService.getDefaultMessageStore().getCommitLog()
.getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService
.getDefaultMessageStore().getMessageStoreConfig()
.getMapedFileSizeCommitLog());
if (masterOffset < 0) {
masterOffset = 0;
}
this.nextTransferFromWhere = masterOffset;
}
else {
//若HAConnection.slaveRequestOffset不等于零,则将slaveRequestOffset赋值给nextTransferFromWhere变量,表示就以备用Broker传来的最大偏离量开始读取数据进行同步;
this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
}
log.info("master transfer data from " + this.nextTransferFromWhere + " to slave["
+ HAConnection.this.clientAddr + "], and slave request "
+ HAConnection.this.slaveRequestOffset);
}
//向备用Broker发送心跳消息,消息为12个字节,前8个字节为开始同步的偏移量offset,后4个字节填0;若发送成功则继续下面的逻辑,否则从第1步开始重新执行;
if (this.lastWriteOver) {
// 如果长时间没有发消息则尝试发心跳
long interval =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaSendHeartbeatInterval()) {
// 向Slave发送心跳
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(this.nextTransferFromWhere);
this.byteBufferHeader.putInt(0);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
}
// 继续传输
else {
this.lastWriteOver = this.transferData();
if (!this.lastWriteOver)
continue;
}
// 传输数据,
// selectResult会赋值给this.selectMapedBufferResult,出现异常也会清理掉
//以nextTransferFromWhere为开始读取偏移量从commitlog中读取数据,调用DefaultMessageStore对象的getCommitLogData方法;若没有获取到数据则该服务线程等待100毫秒之后重新开始执行;
SelectMapedBufferResult selectResult =
HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(
this.nextTransferFromWhere);
if (selectResult != null) {
int size = selectResult.getSize();
//若获取到commitlog数据,再检查该数据的大小是否大于了32K,每次数据同步最多只能同步32K,若大于了32K,则只发送前32K数据;消息机构为:
//(12个字节的消息头,其中,前8个字节为开始同步的偏移量offset,后4个字节为同步数据的大小;先发送消息头,发送完成之后再发送同步数据;)
if (size > HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaTransferBatchSize()) {
size =
HAConnection.this.haService.getDefaultMessageStore()
.getMessageStoreConfig().getHaTransferBatchSize();
}
long thisOffset = this.nextTransferFromWhere;
this.nextTransferFromWhere += size;
selectResult.getByteBuffer().limit(size);
this.selectMapedBufferResult = selectResult;
// Build Header
this.byteBufferHeader.position(0);
this.byteBufferHeader.limit(HEADER_SIZE);
this.byteBufferHeader.putLong(thisOffset);
this.byteBufferHeader.putInt(size);
this.byteBufferHeader.flip();
this.lastWriteOver = this.transferData();
}
else {
// 没有数据,等待通知
HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}
}
catch (Exception e) {
// 只要抛出异常,一般是网络发生错误,连接必须断开,并清理资源
HAConnection.log.error(this.getServiceName() + " service has exception.", e);
break;
}
}复制
在this.transferData()中,通过this.socketChannel.write()方法把数据传给备Broker。
/**
* 表示是否传输完成
*/
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header
while (this.byteBufferHeader.hasRemaining()) {
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write header error < 0");
}
}
if (null == this.selectMapedBufferResult) {
return !this.byteBufferHeader.hasRemaining();
}
writeSizeZeroTimes = 0;
// Write Body
if (!this.byteBufferHeader.hasRemaining()) {
while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
if (writeSize > 0) {
writeSizeZeroTimes = 0;
this.lastWriteTimestamp =
HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
}
else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
}
else {
throw new Exception("ha master write body error < 0");
}
}
}
boolean result =
!this.byteBufferHeader.hasRemaining()
&& !this.selectMapedBufferResult.getByteBuffer().hasRemaining();
if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
this.selectMapedBufferResult.release();
this.selectMapedBufferResult = null;
}
return result;
}复制
上面说了主Broker如何处理备Broker发来的请求,那么备broker是如何发起请求、又如何处理主Broker发来的消息呢。还记得上面HAService.start()方法启动时,同时通过this.haClient.start()启动HAClient服务线程,备Broker发起请求就在其run()方法中。
while (!this.isStoped()) {
try {
//判断当前节点是否为备节点,从nameserver获取主节点地址,并与主节点连接
if (this.connectMaster()) {
// 先汇报最大物理Offset || 定时心跳方式汇报
if (this.isTimeToReportOffset()) {
//向主broker报告一次备broker的最大偏移量
boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
if (!result) {
this.closeMaster();
}
}
// 等待应答
this.selector.select(1000);
// 接收数据
boolean ok = this.processReadEvent();
if (!ok) {
this.closeMaster();
}
// 只要本地有更新,就汇报最大物理Offset
if (!reportSlaveMaxOffsetPlus()) {
continue;
}
// 检查Master的反向心跳
long interval =
HAService.this.getDefaultMessageStore().getSystemClock().now()
- this.lastWriteTimestamp;
if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
.getHaHousekeepingInterval()) {
log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
+ "] expired, " + interval);
this.closeMaster();
log.warn("HAClient, master not response some time, so close connection");
}
}
else {
this.waitForRunning(1000 * 5);
}
}
catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.waitForRunning(1000 * 5);
}
}复制
private boolean connectMaster() throws ClosedChannelException {
if (null == socketChannel) {
//如果是主节点,addr为空,就不会与主broker连接了
String addr = this.masterAddress.get();
if (addr != null) {
SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
if (socketAddress != null) {
this.socketChannel = RemotingUtil.connect(socketAddress);
if (this.socketChannel != null) {
this.socketChannel.register(this.selector, SelectionKey.OP_READ);
}
}
}
// 每次连接时,要重新拿到最大的Offset
this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
this.lastWriteTimestamp = System.currentTimeMillis();
}
return this.socketChannel != null;
}复制
private boolean reportSlaveMaxOffset(final long maxOffset) {
this.reportOffset.position(0);
this.reportOffset.limit(8);
this.reportOffset.putLong(maxOffset);
this.reportOffset.position(0);
this.reportOffset.limit(8);
for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
try {
//把当前备broker的最大的offset向主broker报告一次,主broker就知道从哪里开始传输消息
this.socketChannel.write(this.reportOffset);
}
catch (IOException e) {
log.error(this.getServiceName()
+ "reportSlaveMaxOffset this.socketChannel.write exception", e);
return false;
}
}
return !this.reportOffset.hasRemaining();
}复制
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
//然后从SocketChannel读取主用Broker返回的数据,一直循环的读取并解析数据,直到HAClient.byteBufferRead:ByteBuffer中无可写的空间为止,
//当ByteBuffer中的this.position<this.limit时表示有可写空间,该byteBufferRead变量是在初始化时HAClient是创建的,初始化空间为ReadMaxBufferSize=4M,
//若为空将重复读取3次后仍然没有数据则跳出该循环。
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
readSizeZeroTimes = 0;
//处理主broker传输的消息
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
}
else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
}
else {
// TODO ERROR
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
}
catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}复制
private boolean dispatchReadRequest() {
final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();
while (true) {
//用HAClient.dispatchPostion变量来标记从byteBufferRead变量读取数据的位置;初始化值为0;
//byteBufferRead变量的position值表示从SocketChannel中收到的数据的最后位置;首先将此position值赋值给临时变量readSocketPos,比较position减dispatchPostion的值大于12(消息头部长度为12个字节):
int diff = this.byteBufferRead.position() - this.dispatchPosition;
//如果大于12字节,表示有心跳消息从主broker发送过来
if (diff >= MSG_HEADER_SIZE) {
//在byteBufferRead中从dispatchPostion位置开始读取数据,初始化状态下dispatchPostion等于0;读取8个字节的数据即为主用Broker的同步的起始物理偏移量masterPhyOffset,再后4字节为数据的大小bodySize;
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
//从备用Broker中获取最大的物理偏移量,若与主用Broker传来的起始物理偏移量masterPhyOffset不相等,则直接返回
long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
// 发生重大错误
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
//若position-dispatchPostion的值大于消息头部长度12字节加上bodySize之和;则说明有数据同步,则继续在byteBufferRead中以position+dispatchPostion开始位置读取bodySize大小的数据
// 可以凑够一个请求
if (diff >= (MSG_HEADER_SIZE + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + MSG_HEADER_SIZE);
this.byteBufferRead.get(bodyData);
// TODO 结果是否需要处理,暂时不处理
//调用DefaultMessageStore.appendToCommitLog(long startOffset, byte[] data)方法进行数据的写入;
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
//将byteBufferRead变量的position值重置为readSocketPos;dispatchPostion值累计12+bodySize;
this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += MSG_HEADER_SIZE + bodySize;
//检查当前备用Broker的最大物理偏移量是否大于了上次向主用Broker报告时的最大物理偏移量(HAClient.currentReportedOffset),若大于则更新HAClient.currentReportedOffset的值,并将最新的物理偏移量向主用Broker报告。
if (!reportSlaveMaxOffsetPlus()) {
return false;
}
continue;
}
}
//若小于12个字节,并且byteBufferRead变量中没有可写空间(this.position>=this.limit),则调用HAClient.reallocateByteBuffer()方法进行ByteBuffer的整理,
if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}
break;
}
return true;
}复制
这里堆了很多代码(我太懒了),里面关键部分都用文字记录了(我在读代码时记录的),大家可以从代码中细细查看,这里就不在讲细节了,我太懒了。。到这里,备broker请求主broker、主broker处理与响应的流程基本结束了。正常情况下,备broker会不停的拉取主broker的增量数据,并且是异步的。这就是所谓的异步复制,缺点就是:如果主broker宕机且不可恢复,备broker还没有来得及同步完成,就会有数据丢失。下面一节,主要讲一条消息在保证主和备broker都接收到后,在进行下一条的生产,保证了每条消息不会丢失,但是牺牲了性能。
3
高可用之同步复制 复制
所谓同步复制,就是保证主Broker和备Broker数据都提交后,在通知生产者发送下一条消息。如下图所示,在异步复制的基础上增加GroupTransferService来保证当前提交的消息已在备Broker提交,如下图所示。
同步复制情况下,brokerRole属性的值必须设置为:SYNC_MASTER,在第二节中说过,然后我们在看,当一条消息在主broker处理后,会发生什么。这里还是要从CommitLog.putMessage(msg)说起。
// Synchronous write double
//如果主备是同步复制模式,必须阻塞完成当前消息在备broker的刷盘
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (msg.isWaitStoreMsgOK()) {
// Determine whether to wait
//判断备实例是否OK的条件是,主备offset的进度差不能大于1024 * 1024 * 256
if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
if (null == request) {
request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
}
service.putRequest(request);
//唤醒HAService中的所有等待线程
service.getWaitNotifyObject().wakeupAll();
//最大超时时间5s,无论成功与否都要返回,如果失败,给生产者返回备broker超时,
boolean flushOK =
// TODO
request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
.getSyncFlushTimeout());
if (!flushOK) {
log.error("do sync transfer other node, wait return, but failed, topic: "
+ msg.getTopic() + " tags: " + msg.getTags() + " client address: "
+ msg.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
}
// Slave problem
else {
// Tell the producer, slave not available
putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}复制
public void putRequest(final GroupCommitRequest request) {
this.groupTransferService.putRequest(request);
}复制
// 异步通知
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
public void putRequest(final GroupCommitRequest request) {
synchronized (this) {
this.requestsWrite.add(request);
if (!this.hasNotified) {
this.hasNotified = true;
this.notify();
// TODO 这里要Notify两个线程 1、GroupTransferService
// 2、WriteSocketService
// 在调用putRequest后,已经Notify了WriteSocketService
}
}
}复制
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStoped()) {
try {
this.waitForRunning(0);
this.doWaitTransfer();
}
catch (Exception e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info(this.getServiceName() + " service end");
}复制
private void doWaitTransfer() {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
for (int i = 0; !transferOK && i < 5; i++) {
//等待1s
this.notifyTransferObject.waitForRunning(1000);
//尝试循环5次,比较提交到备用broker的offset与当前消息的offset的进度,如果后者大于等于前者,说明,当前消息已经在备broker提交了,可以继续了
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}
if (!transferOK) {
log.warn("transfer message to slave timeout, " + req.getNextOffset());
}
//5s间隔 无论成功与否不能阻塞 CommitLog.putMessage()流程
req.wakeupCustomer(transferOK);
}
this.requestsRead.clear();
}
}复制
每个GroupCommitRequest都会定义属性CountDownLatch(1),方法wakeupCustomer会返回flush结果,并执行countDown()方法,无论结果是否成功,不可能让当前对象阻塞后面的方法执行。waitForFlush(timeout)设置超时时间,并返回最终结果。
public void wakeupCustomer(final boolean flushOK) {
this.flushOK = flushOK;
this.countDownLatch.countDown();
}
public boolean waitForFlush(long timeout) {
try {
boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
return result || this.flushOK;
}
catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}复制