暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RocketMQ之HA高可用

不修边幅的创客 2020-10-13
275
点击上方蓝色字体,选择“设为星标”



RocketMQ在集群模式的部署中,允许多Master和多Slave部署,在这种模式下,通过对broker设置,主备之间可以实现异步复制和同步复制两种方式来保证数据一致性,从而实现高可用。

顾名思义,异步复制在Master宕机或磁盘损坏情况,会丢失少量信息,但是性能更优;而同步复制在Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高,但是性能比异步复制模式稍低,大约低10%左右,发送单个消息的RT会稍高,目前主宕机后,备机不能自动切换为主机,但是上面的消息可以被消费。

下面,我们代码层面介绍异步复制和同步复制两种方式。




1

元数据同步
复制
主备Broker之间不可能只是消息之间的同步,还需要元数据的同步,因为当Master宕机之后,还需要从Slave节点继续消费,所以这就需要Slave也有跟Master一摸一样的Topic、路由等信息。
当Slave节点启动时,会主动从Master同步元数据信息,这里的元数据信息包括Topic配置、消费进度信息、延迟消费进度信息、订阅关系,其次才是消息数据同步。
我们分析下broker启动时的业务逻辑处理,broker服务启动时会创建BrokerController对象并将其初始化initialize()分析其方法

//如果Broker是Slave
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
    if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
        this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
        this.updateMasterHAServerAddrPeriodically = false;
    } else {
        this.updateMasterHAServerAddrPeriodically = true;
    }
    //启动一个定时的单线程池
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.slaveSynchronize.syncAll();
            } catch (Throwable e) {
                log.error("ScheduledTask syncAll slave exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
} else {//如果是master
  //定时打印master与slave的差距
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.printMasterAndSlaveDiff();
            } catch (Throwable e) {
                log.error("schedule printMasterAndSlaveDiff error.", e);
            }
        }
    }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}

复制


当前broker角色为slave时,定时任务执行slaveSynchronize.syncAll(),每隔60秒同步master中的元数据信息。
public void syncAll() {
    //同步主题配置信息
    this.syncTopicConfig();
    //同步消费者偏移量信息
    this.syncConsumerOffset();
    //同步延迟消费的偏移量信息
    this.syncDelayOffset();
    //同步订阅的消息组信息
    this.syncSubscriptionGroupConfig();
}

复制


a、Topic配置同步

  调用SlaveSynchronize.syncTopicConfig()方法向主用Broker发起topics.json文件的同步。大致步骤如下:

1)向主用Broker发起GET_ALL_TOPIC_CONFIG请求码,主用Broker将所有topic配置信息返回给备用Broker;

2)比较主备topic信息的DataVersion;若不同则用主用Broker返回的topic配置信息更新备用Broker的topic,并进行持久化,同时更新备用Broker中topic信息的DataVersion。

b、消费进度信息同步

调用SlaveSynchronize.syncConsumerOffset()方法向主用Broker发起consumerOffset.json文件的同步。大致步骤如下:

1)向主用Broker发起GET_ALL_CONSUMER_OFFSET请求码,主用Broker将所有ConsumerOffset配置信息返回给备用Broker;

2)更新备用Broker的ConsumerOffsetManager.offsetTable变量,同时进行持久化;

c、延迟消费进度信息同步

调用SlaveSynchronize.syncDelayOffset()方法向主用Broker发起delayOffset.json文件的同步。大致步骤如下:

向主用Broker发起GET_ALL_DELAY_OFFSET请求码,主用Broker将所有delayOffset信息返回给备用Broker;备用Broker直接将收到的delayOffset信息持久化到物理文件delayOffset.json中;

d、订阅关系同步

调用SlaveSynchronize.syncSubscriptionGroupConfig()方法向主用Broker发起delayOffset.json文件的同步。大致步骤如下:

1)向主用Broker发起GET_ALL_SUBSCRIPTIONGROUP_CONFIG请求码,主用Broker将所有SubscriptionGroup配置信息返回给备用Broker;

2)更新备用Broker的ConsumerOffsetManager. subscriptionGroupTable变量,同时进行持久化;


2

高可用之异步复制
复制

首先通过一张图片对主备Broker异步复制流程有个总体的认识,如下图所示:



Broker在部署时,在其broker.properties中允许设置一个属性参数:brokerRole,它有三种选择,默认为ASYNC_MASTER。如下图所示:


因为本节讲的是异步复制,所以主Broker的brokerRole的配置必须设置为:ASYNC_MASTER(这里注意:如果是备Broker属性brokerRole的值必须设置为:SLAVE)。

当Broker启动时,不管角色时主还是备,都要执行DefaultMessageStore.start()方法,该方法会启动HAService.start()方法,主Broker和备Broker的消息同步核心逻辑就在该类中。下代码所示:
/**
 * 启动存储服务
 */

public void start() throws Exception {
    // 在构造函数已经start了。
    // this.indexService.start();
    // 在构造函数已经start了。
    // this.dispatchMessageService.start();
    this.flushConsumeQueueService.start();
    this.commitLog.start();
    this.storeStatsService.start();

    // slave不启动scheduleMessageService避免对消费队列的并发操作
    if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
        this.scheduleMessageService.start();
    }

    if (this.reputMessageService != null) {
        this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
        this.reputMessageService.start();
    }

    //启动主备同步服务
    this.haService.start();

    this.createTempFile();
    this.addScheduleTask();
    this.shutdown = false;
}

复制


HAService.start()方法启动时会触发3个服务同时启动,a、AcceptSocketService服务线程,该服务功能主要是接受备用Broker的拉取消息的请求和发送消息到备Broker。b、GroupTransferService服务线程,该线程主要用于同步复制检测中,在下一节同步复制中会详解。c、HAClient服务线程主要在备Broker中使用,主要功能就是发送当前消息的最大偏移量到主Broker,并接受主Broker发过来的消息。如代码所示:
public void start() {
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();
}

复制


AcceptSocketService.beginAccept()会初始化一个ServerScoketChannel,主要用于接收来自备Broker的拉取消息请求。
public void beginAccept() {
    try {
        this.serverSocketChannel = ServerSocketChannel.open();
        this.selector = RemotingUtil.openSelector();
        this.serverSocketChannel.socket().setReuseAddress(true);
        this.serverSocketChannel.socket().bind(this.socketAddressListen);
        this.serverSocketChannel.configureBlocking(false);
        this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
    }
    catch (Exception e) {
        log.error("beginAccept exception", e);
    }
}

复制


AcceptSocketService.start()方法调用时就启动了服务线程,该服务的run()方法将被执行,run()方法中监听备Broker发来的请求连接(OP_ACCEPT)事件,然后把对应的SocketChannel封装成HAConnection对象,然后调用该对象的start()方法,该方法将启动ReadSocketService和WriteSocketService服务线程分别处理备Broker的OP_READ和OP_WRITE事件,相关代码如下:

@Override
public void run() 
{
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
        try {
            this.selector.select(1000);
            Set<SelectionKey> selected = this.selector.selectedKeys();
            if (selected != null) {
                for (SelectionKey k : selected) {
                  //接收备Broker的链接事件
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
                        SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
                        if (sc != null) {
                            HAService.log.info("HAService receive new connection, "
                                    + sc.socket().getRemoteSocketAddress());

                            try {
                              //封装socketchannel对象到conn中,内部单独处理读写事件
                                HAConnection conn = new HAConnection(HAService.this, sc);
                                //启动读写服务
                                conn.start();
                                HAService.this.addConnection(conn);
                            }
                            catch (Exception e) {
                                log.error("new HAConnection exception", e);
                                sc.close();
                            }
                        }
                    }
                    else {
                        log.warn("Unexpected ops in select " + k.readyOps());
                    }
                }

                selected.clear();
            }

        }
        catch (Exception e) {
            log.error(this.getServiceName() + " service has exception.", e);
        }
    }

    log.error(this.getServiceName() + " service end");
}

复制
/**
 * 向Slave传输数据协议 <Phy Offset> <Body Size> <Body Data><br>
 * 从Slave接收数据协议 <Phy Offset>
 */


public void start() {
    this.readSocketService.start();
    this.writeSocketService.start();
}

复制


HAConnection.ReadSocketService服务线程,该模块主要是读取备Broker的心跳信息,该信息就是8个字节,值为备用Broker的最大物理偏移量,在解析到该值之后,首先,将该值赋值给HAConnection.slaveAckOffset变量;然后若HAConnection.slaveRequestOffset小于零(在第一次启动时赋值为-1)则赋值给该变量;最后调用HAService.notifyTransferSome(long slaveAckOffset)方法,在该方法中,若slaveAckOffset大于HAService.push2SlaveMaxOffset的值则更新push2SlaveMaxOffset的值,并通知调用GroupTransferService.notifyTransferSome方法唤醒GroupTransferService服务线程。在同步复制模式下面,前端调用者会通过此线程服务来监听主备同步情况(第三节着重说)。

while (!this.isStoped()) {
    try {
        this.selector.select(1000);
        //处理备broker传来的消息的offset
        boolean ok = this.processReadEvent();
        if (!ok) {
            HAConnection.log.error("processReadEvent error");
            break;
        }

        // 检测心跳间隔时间,超过则强制断开
        long interval =
                HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
                        - this.lastReadTimestamp;
        if (interval > HAConnection.this.haService.getDefaultMessageStore()
            .getMessageStoreConfig().getHaHousekeepingInterval()) {
            log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr
                    + "] expired, " + interval);
            break;
        }
    }
    catch (Exception e) {
        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
        break;
    }
}

复制
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;

    if (!this.byteBufferRead.hasRemaining()) {
        this.byteBufferRead.flip();
        this.processPostion = 0;
    }

    while (this.byteBufferRead.hasRemaining()) {
        try {
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                readSizeZeroTimes = 0;
                this.lastReadTimestamp =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                // 接收Slave上传的offset
                if ((this.byteBufferRead.position() - this.processPostion) >= 8) {
                    int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                    long readOffset = this.byteBufferRead.getLong(pos - 8);
                    this.processPostion = pos;

                    // 处理Slave的请求
                    HAConnection.this.slaveAckOffset = readOffset;
                    if (HAConnection.this.slaveRequestOffset < 0) {
                        //读取到备broker传来到最大offset,赋值给slaveRequestOffset,注意:该变量在给备broker写数据到时候会用到
                        HAConnection.this.slaveRequestOffset = readOffset;
                        log.info("slave[" + HAConnection.this.clientAddr + "] request offset "
                                + readOffset);
                    }

                    // 通知前端线程(同步复制的)
                    HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                }
            }
            else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            }
            else {
                log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                return false;
            }
        }
        catch (IOException e) {
            log.error("processReadEvent exception", e);
            return false;
        }
    }

    return true;
}

复制


此时,ReadSocketService服务已经读取到备broker传来的offset,并传给了变量slaveRequestOffset,该变量会被WriteSocketService服务读取,计算下一个offset并读取消息,把消息传给备broker。

while (!this.isStoped()) {
    try {
        this.selector.select(1000);

        //查HAConnection.slaveRequestOffset是否等于-1,即刚启动的状态,还没有收到备用Broker端的最大偏移量值;
        //则等待1秒钟之后再次监听slaveRequestOffset变量;若收到了备用Broker的最大偏移量,即不等于-1了
        if (-1 == HAConnection.this.slaveRequestOffset) {
            Thread.sleep(10);
            continue;
        }

        // 第一次传输,需要计算从哪里开始
        // Slave如果本地没有数据,请求的Offset为0,那么master则从物理文件最后一个文件开始传送数据
        //检查WriteSocketService.nextTransferFromWhere是否等于-1,即刚启动的状态,若是则要计算从哪里开始读取数据进行同步
        if (-1 == this.nextTransferFromWhere) {
            //若HAConnection.slaveRequestOffset等于零,表示备用Broker端还没有commitlog数据,则将最后一个文件同步到备用Broker,
            // 即nextTransferFromWhere=最大偏移量maxOffset-maxOffset%1G,得到的值为最后一个文件的开始偏移量;
            if (0 == HAConnection.this.slaveRequestOffset) {
                long masterOffset =
                        HAConnection.this.haService.getDefaultMessageStore().getCommitLog()
                            .getMaxOffset();
                masterOffset =
                        masterOffset
                                - (masterOffset % HAConnection.this.haService
                                    .getDefaultMessageStore().getMessageStoreConfig()
                                    .getMapedFileSizeCommitLog());

                if (masterOffset < 0) {
                    masterOffset = 0;
                }

                this.nextTransferFromWhere = masterOffset;
            }
            else {
                //若HAConnection.slaveRequestOffset不等于零,则将slaveRequestOffset赋值给nextTransferFromWhere变量,表示就以备用Broker传来的最大偏离量开始读取数据进行同步;
                this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
            }

            log.info("master transfer data from " + this.nextTransferFromWhere + " to slave["
                    + HAConnection.this.clientAddr + "], and slave request "
                    + HAConnection.this.slaveRequestOffset);
        }

        //向备用Broker发送心跳消息,消息为12个字节,前8个字节为开始同步的偏移量offset,后4个字节填0;若发送成功则继续下面的逻辑,否则从第1步开始重新执行;
        if (this.lastWriteOver) {
            // 如果长时间没有发消息则尝试发心跳
            long interval =
                    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now()
                            - this.lastWriteTimestamp;

            if (interval > HAConnection.this.haService.getDefaultMessageStore()
                .getMessageStoreConfig().getHaSendHeartbeatInterval()) {
                // 向Slave发送心跳
                // Build Header
                this.byteBufferHeader.position(0);
                this.byteBufferHeader.limit(HEADER_SIZE);
                this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                this.byteBufferHeader.putInt(0);
                this.byteBufferHeader.flip();

                this.lastWriteOver = this.transferData();
                if (!this.lastWriteOver)
                    continue;
            }
        }
        // 继续传输
        else {
            this.lastWriteOver = this.transferData();
            if (!this.lastWriteOver)
                continue;
        }

        // 传输数据,
        // selectResult会赋值给this.selectMapedBufferResult,出现异常也会清理掉
        //以nextTransferFromWhere为开始读取偏移量从commitlog中读取数据,调用DefaultMessageStore对象的getCommitLogData方法;若没有获取到数据则该服务线程等待100毫秒之后重新开始执行;
        SelectMapedBufferResult selectResult =
                HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(
                    this.nextTransferFromWhere);
        if (selectResult != null) {
            int size = selectResult.getSize();
            //若获取到commitlog数据,再检查该数据的大小是否大于了32K,每次数据同步最多只能同步32K,若大于了32K,则只发送前32K数据;消息机构为:
            //(12个字节的消息头,其中,前8个字节为开始同步的偏移量offset,后4个字节为同步数据的大小;先发送消息头,发送完成之后再发送同步数据;)
            if (size > HAConnection.this.haService.getDefaultMessageStore()
                .getMessageStoreConfig().getHaTransferBatchSize()) {
                size =
                        HAConnection.this.haService.getDefaultMessageStore()
                            .getMessageStoreConfig().getHaTransferBatchSize();
            }

            long thisOffset = this.nextTransferFromWhere;
            this.nextTransferFromWhere += size;

            selectResult.getByteBuffer().limit(size);
            this.selectMapedBufferResult = selectResult;

            // Build Header
            this.byteBufferHeader.position(0);
            this.byteBufferHeader.limit(HEADER_SIZE);
            this.byteBufferHeader.putLong(thisOffset);
            this.byteBufferHeader.putInt(size);
            this.byteBufferHeader.flip();

            this.lastWriteOver = this.transferData();
        }
        else {
            // 没有数据,等待通知
            HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
        }
    }
    catch (Exception e) {
        // 只要抛出异常,一般是网络发生错误,连接必须断开,并清理资源
        HAConnection.log.error(this.getServiceName() + " service has exception.", e);
        break;
    }
}

复制


在this.transferData()中,通过this.socketChannel.write()方法把数据传给备Broker。

/**
 * 表示是否传输完成
 */

private boolean transferData() throws Exception {
    int writeSizeZeroTimes = 0;
    // Write Header
    while (this.byteBufferHeader.hasRemaining()) {
        int writeSize = this.socketChannel.write(this.byteBufferHeader);
        if (writeSize > 0) {
            writeSizeZeroTimes = 0;
            this.lastWriteTimestamp =
                    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
        }
        else if (writeSize == 0) {
            if (++writeSizeZeroTimes >= 3) {
                break;
            }
        }
        else {
            throw new Exception("ha master write header error < 0");
        }
    }

    if (null == this.selectMapedBufferResult) {
        return !this.byteBufferHeader.hasRemaining();
    }

    writeSizeZeroTimes = 0;

    // Write Body
    if (!this.byteBufferHeader.hasRemaining()) {
        while (this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
            int writeSize = this.socketChannel.write(this.selectMapedBufferResult.getByteBuffer());
            if (writeSize > 0) {
                writeSizeZeroTimes = 0;
                this.lastWriteTimestamp =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
            }
            else if (writeSize == 0) {
                if (++writeSizeZeroTimes >= 3) {
                    break;
                }
            }
            else {
                throw new Exception("ha master write body error < 0");
            }
        }
    }

    boolean result =
            !this.byteBufferHeader.hasRemaining()
                    && !this.selectMapedBufferResult.getByteBuffer().hasRemaining();

    if (!this.selectMapedBufferResult.getByteBuffer().hasRemaining()) {
        this.selectMapedBufferResult.release();
        this.selectMapedBufferResult = null;
    }

    return result;
}

复制


上面说了主Broker如何处理备Broker发来的请求,那么备broker是如何发起请求、又如何处理主Broker发来的消息呢。还记得上面HAService.start()方法启动时,同时通过this.haClient.start()启动HAClient服务线程,备Broker发起请求就在其run()方法中。

while (!this.isStoped()) {
    try {
        //判断当前节点是否为备节点,从nameserver获取主节点地址,并与主节点连接
        if (this.connectMaster()) {
            // 先汇报最大物理Offset || 定时心跳方式汇报
            if (this.isTimeToReportOffset()) {
                //向主broker报告一次备broker的最大偏移量
                boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                if (!result) {
                    this.closeMaster();
                }
            }

            // 等待应答
            this.selector.select(1000);

            // 接收数据
            boolean ok = this.processReadEvent();
            if (!ok) {
                this.closeMaster();
            }

            // 只要本地有更新,就汇报最大物理Offset
            if (!reportSlaveMaxOffsetPlus()) {
                continue;
            }

            // 检查Master的反向心跳
            long interval =
                    HAService.this.getDefaultMessageStore().getSystemClock().now()
                            - this.lastWriteTimestamp;
            if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig()
                .getHaHousekeepingInterval()) {
                log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress
                        + "] expired, " + interval);
                this.closeMaster();
                log.warn("HAClient, master not response some time, so close connection");
            }
        }
        else {
            this.waitForRunning(1000 * 5);
        }
    }
    catch (Exception e) {
        log.warn(this.getServiceName() + " service has exception. ", e);
        this.waitForRunning(1000 * 5);
    }
}

复制
private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        //如果是主节点,addr为空,就不会与主broker连接了
        String addr = this.masterAddress.get();
        if (addr != null) {

            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }

        // 每次连接时,要重新拿到最大的Offset
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

        this.lastWriteTimestamp = System.currentTimeMillis();
    }

    return this.socketChannel != null;
}

复制
private boolean reportSlaveMaxOffset(final long maxOffset) {
    this.reportOffset.position(0);
    this.reportOffset.limit(8);
    this.reportOffset.putLong(maxOffset);
    this.reportOffset.position(0);
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
        try {
            //把当前备broker的最大的offset向主broker报告一次,主broker就知道从哪里开始传输消息
            this.socketChannel.write(this.reportOffset);
        }
        catch (IOException e) {
            log.error(this.getServiceName()
                    + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        }
    }

    return !this.reportOffset.hasRemaining();
}

复制
private boolean processReadEvent() {
    int readSizeZeroTimes = 0;
    while (this.byteBufferRead.hasRemaining()) {
        try {
            //然后从SocketChannel读取主用Broker返回的数据,一直循环的读取并解析数据,直到HAClient.byteBufferRead:ByteBuffer中无可写的空间为止,
            //当ByteBuffer中的this.position<this.limit时表示有可写空间,该byteBufferRead变量是在初始化时HAClient是创建的,初始化空间为ReadMaxBufferSize=4M,
            //若为空将重复读取3次后仍然没有数据则跳出该循环。
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) {
                lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
                readSizeZeroTimes = 0;
                //处理主broker传输的消息
                boolean result = this.dispatchReadRequest();
                if (!result) {
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                }
            }
            else if (readSize == 0) {
                if (++readSizeZeroTimes >= 3) {
                    break;
                }
            }
            else {
                // TODO ERROR
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            }
        }
        catch (IOException e) {
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        }
    }
    return true;
}

复制
private boolean dispatchReadRequest() {
    final int MSG_HEADER_SIZE = 8 + 4; // phyoffset + size
    int readSocketPos = this.byteBufferRead.position();

    while (true) {
        //用HAClient.dispatchPostion变量来标记从byteBufferRead变量读取数据的位置;初始化值为0;
        //byteBufferRead变量的position值表示从SocketChannel中收到的数据的最后位置;首先将此position值赋值给临时变量readSocketPos,比较position减dispatchPostion的值大于12(消息头部长度为12个字节):
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        //如果大于12字节,表示有心跳消息从主broker发送过来
        if (diff >= MSG_HEADER_SIZE) {
            //在byteBufferRead中从dispatchPostion位置开始读取数据,初始化状态下dispatchPostion等于0;读取8个字节的数据即为主用Broker的同步的起始物理偏移量masterPhyOffset,再后4字节为数据的大小bodySize;
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

            //从备用Broker中获取最大的物理偏移量,若与主用Broker传来的起始物理偏移量masterPhyOffset不相等,则直接返回
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

            // 发生重大错误
            if (slavePhyOffset != 0) {
                if (slavePhyOffset != masterPhyOffset) {
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                            + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                }
            }

            //若position-dispatchPostion的值大于消息头部长度12字节加上bodySize之和;则说明有数据同步,则继续在byteBufferRead中以position+dispatchPostion开始位置读取bodySize大小的数据
            // 可以凑够一个请求
            if (diff >= (MSG_HEADER_SIZE + bodySize)) {
                byte[] bodyData = new byte[bodySize];
                this.byteBufferRead.position(this.dispatchPosition + MSG_HEADER_SIZE);
                this.byteBufferRead.get(bodyData);

                // TODO 结果是否需要处理,暂时不处理
                //调用DefaultMessageStore.appendToCommitLog(long startOffset, byte[] data)方法进行数据的写入;
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                //将byteBufferRead变量的position值重置为readSocketPos;dispatchPostion值累计12+bodySize;
                this.byteBufferRead.position(readSocketPos);
                this.dispatchPosition += MSG_HEADER_SIZE + bodySize;

                //检查当前备用Broker的最大物理偏移量是否大于了上次向主用Broker报告时的最大物理偏移量(HAClient.currentReportedOffset),若大于则更新HAClient.currentReportedOffset的值,并将最新的物理偏移量向主用Broker报告。
                if (!reportSlaveMaxOffsetPlus()) {
                    return false;
                }

                continue;
            }
        }

        //若小于12个字节,并且byteBufferRead变量中没有可写空间(this.position>=this.limit),则调用HAClient.reallocateByteBuffer()方法进行ByteBuffer的整理,
        if (!this.byteBufferRead.hasRemaining()) {
            this.reallocateByteBuffer();
        }

        break;
    }

    return true;
}

复制


这里堆了很多代码(我太懒了),里面关键部分都用文字记录了(我在读代码时记录的),大家可以从代码中细细查看,这里就不在讲细节了,我太懒了。。到这里,备broker请求主broker、主broker处理与响应的流程基本结束了。正常情况下,备broker会不停的拉取主broker的增量数据,并且是异步的。这就是所谓的异步复制,缺点就是:如果主broker宕机且不可恢复,备broker还没有来得及同步完成,就会有数据丢失。下面一节,主要讲一条消息在保证主和备broker都接收到后,在进行下一条的生产,保证了每条消息不会丢失,但是牺牲了性能。


3

高可用之同步复制
复制

所谓同步复制,就是保证主Broker和备Broker数据都提交后,在通知生产者发送下一条消息。如下图所示,在异步复制的基础上增加GroupTransferService来保证当前提交的消息已在备Broker提交,如下图所示。


同步复制情况下,brokerRole属性的值必须设置为:SYNC_MASTER,在第二节中说过,然后我们在看,当一条消息在主broker处理后,会发生什么。这里还是要从CommitLog.putMessage(msg)说起。

// Synchronous write double
//如果主备是同步复制模式,必须阻塞完成当前消息在备broker的刷盘
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
    HAService service = this.defaultMessageStore.getHaService();
    if (msg.isWaitStoreMsgOK()) {
        // Determine whether to wait
        //判断备实例是否OK的条件是,主备offset的进度差不能大于1024 * 1024 * 256
        if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
            if (null == request) {
                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            }
            service.putRequest(request);

            //唤醒HAService中的所有等待线程
            service.getWaitNotifyObject().wakeupAll();

            //最大超时时间5s,无论成功与否都要返回,如果失败,给生产者返回备broker超时,
            boolean flushOK =
            // TODO
                    request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig()
                        .getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do sync transfer other node, wait return, but failed, topic: "
                        + msg.getTopic() + " tags: " + msg.getTags() + " client address: "
                        + msg.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }
        }
        // Slave problem
        else {
            // Tell the producer, slave not available
            putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
        }
    }
}

复制

当消息在主Broker刷盘后,会判断当前broker的brokerRole是否为SYNC_MASTER,如果是,则等待备Broker把当前消息刷盘。那如何刷盘的呢,先把当前消息封装成GroupCommitRequest对象,然后该消息通过HAService.putRequest(request)方法传给GroupTransferService.putRequest(request),并最终放到GroupTransferService的属性:requestWrite集合中。随即唤醒等待线程,为什么要唤醒线程,因为当没有同步数据时,线程是wait状态。GroupTransferService的run方法会对集合中的每个request进行检查,判断offset是否同步,如果同步完成,则继续生产下一条,否则给生产者返回:FLUSH_SLAVE_TIMEOUT或者SLAVE_NOT_AVAILABLE。在规定的时间内(5s),不管request的消息是否同步到备节点,都要返回让当前request往后执行,不至于阻塞后面流程。

public void putRequest(final GroupCommitRequest request) {
    this.groupTransferService.putRequest(request);
}

复制

// 异步通知
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();


public void putRequest(final GroupCommitRequest request) {
    synchronized (this) {
        this.requestsWrite.add(request);
        if (!this.hasNotified) {
            this.hasNotified = true;
            this.notify();

            // TODO 这里要Notify两个线程 1、GroupTransferService
            // 2、WriteSocketService
            // 在调用putRequest后,已经Notify了WriteSocketService
        }
    }
}

复制
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStoped()) {
        try {
            this.waitForRunning(0);
            this.doWaitTransfer();
        }
        catch (Exception e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

复制
private void doWaitTransfer() {
    if (!this.requestsRead.isEmpty()) {
        for (GroupCommitRequest req : this.requestsRead) {
            boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
            for (int i = 0; !transferOK && i < 5; i++) {
                //等待1s
                this.notifyTransferObject.waitForRunning(1000);
                //尝试循环5次,比较提交到备用broker的offset与当前消息的offset的进度,如果后者大于等于前者,说明,当前消息已经在备broker提交了,可以继续了
                transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
            }

            if (!transferOK) {
                log.warn("transfer message to slave timeout, " + req.getNextOffset());
            }
            //5s间隔 无论成功与否不能阻塞 CommitLog.putMessage()流程
            req.wakeupCustomer(transferOK);
        }

        this.requestsRead.clear();
    }
}

复制


每个GroupCommitRequest都会定义属性CountDownLatch(1),方法wakeupCustomer会返回flush结果,并执行countDown()方法,无论结果是否成功,不可能让当前对象阻塞后面的方法执行。waitForFlush(timeout)设置超时时间,并返回最终结果。


public void wakeupCustomer(final boolean flushOK) {
    this.flushOK = flushOK;
    this.countDownLatch.countDown();
}


public boolean waitForFlush(long timeout) {
    try {
        boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
        return result || this.flushOK;
    }
    catch (InterruptedException e) {
        e.printStackTrace();
        return false;
    }
}

复制

很多细节都在代码的注释中,还需要大家自己花时间啃代码慢慢品。。。因为我太懒了!



文章转载自不修边幅的创客,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论