暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

HDFS|NameNode 支撑超高并发的秘诀

大数据记事本 2021-05-09
1308
    HDFS 作为底层的分布式文件系统,需要与各种存储框架、计算框架或者应用程序进行交互,如架构在其之上的Hive、 HBase,编写的 Spark、Flink 应用程序以及自定义的一些应用都会去操作 HDFS ,也就是需要和 NameNode 进行交互,而一个集群中 Active 状态的 NameNode 只有一个(暂不考虑联邦场景)。

    在这样的场景下,NameNode 必然会面临高并发的访问,与此同时,为了保证元数据的安全可靠,不能仅仅保存在内存中,而是需要刷写到磁盘。那么,NameNode 如何在高并发的场景下既保证高性能地处理各种元数据操作,又可以将元数据持久化到磁盘来保证安全?

    这里 NameNode 巧妙地使用了 双缓冲 机制。示意图如下:

    在分析 双缓冲 机制前,首先了解下 NameNode 是如何对元数据进行持久化的。集群启动时,NN 会将磁盘存储的元数据信息(FsImage 文件)加载到内存,之后所有对元数据的操作都会记录其操作日志,并将这些日志持久化到 EditLog 文件中。后续就可以在 FsImage 的基础上执行 EditLog 中的操作来恢复集群元数据。

    如果每次对元数据的操作都进行同步刷盘,性能会很低。这里 NameNode 会先把操作日志写入写缓冲(上图中的Current-Buffer),由于是内存操作,速度很快,当日志量达到一定阈值时,进行批量刷盘。为了解决刷盘过程中无法继续执行写操作的问题,NameNode 采用了双缓冲机制,也就是刷盘之前将 Sync-Buffer 和 Current-Buffer 进行交换,Current-Buffer 继续执行写操作,Sync-Buffer 进行刷盘,互不影响,刷盘完成后清空 Sync-Buffer,等待下次

    同时,为了提高并发处理能力,采用了分段加锁的方式,对于刷盘这种耗时的操作是不加锁的,其余操作由于是纯内存操作,即使加了锁速度也很快。这样既保证了高性能地处理并发访问,又可以将元数据持久化到磁盘来保证数据安全性。

    NameNode 写入缓冲并进行缓冲交换和刷盘的具体逻辑在 FSEditLog.logSync() 方法中,这里采用伪代码对其原理进行说明:

1. 将操作日志封装成 EditLog 类。该类有两个基本属性:

  • txid 表示操作的事务id

  • context 表示操作日志的内容

    /**
    * 定义日志类型,每条操作日志就是一个 EditLog 对象
    */
    class EditLog{
    private long txid ;
    private String context ;


    public EditLog(long txid, String context) {
    this.txid = txid;
    this.context = context;
    }


        //get 和 set 方法
    @Override
    public String toString() {
    return "EditLog{" +
    "txid=" + txid +
    ", context='" + context + '\'' +
    '}';
    }
    }
    复制
    2. 定义双缓冲操作类。其内部包含两个缓冲 currentBuffer 和 syncBuffer ,并定义其写入、交换、刷盘以及获取 syncBuffer 最大事务id 的方法
      class DoubleBuffer{


      //执行日志写入操作的缓冲
      private LinkedList<EditLog> currentBufffer = new LinkedList<EditLog>();
      //执行日志同步刷盘的缓冲
      private LinkedList<EditLog> syncBufffer = new LinkedList<EditLog>();


      //get 和 set 方法


      /**
      * 向currentBuffer缓冲中写入操作日志
      * @param log
      */
      public void write(EditLog log){
      currentBufffer.add(log);
      }


      /**
      * 返回同步缓冲中的最大txid
      * @return
      */
      public long getLastTxid(){
      return syncBufffer.getLast().getTxid();
      }


      /**
      * 交换缓冲
      */
      public void setReadyToFlush(){
      LinkedList<EditLog> tmp = currentBufffer;
      currentBufffer=syncBufffer;
      syncBufffer=tmp;
      }


      /**
           * 将同步缓冲中的数据刷盘,这里象征性地打印一下
      */
      public void flush(){
      for (EditLog editLog : syncBufffer) {
      System.out.println("存入操作日志:"+editLog);
      }
      //清空缓冲
      syncBufffer.clear();
      }
      }
      复制
      3.定义具体的方法:
      • logEdit:该方法对外部开发,用于写入操作日志
      • logSync:在 logEdit 方法中会调用该方法,用于缓冲交换及日志刷盘
        public class DoubleBufferDemo {


        private long txid = 0L;
        //双缓冲操作对象
        private DoubleBuffer buffer = new DoubleBuffer();
        //标记当前是否正在往磁盘里面刷写数据
        private volatile Boolean isSyncRunning = false;
        //标记是否有线程正在等待同步
        private volatile Boolean isWaitSync = false;
        //初始化刷写的最大事务id
        private volatile Long syncMaxTxid = 0L;


        //存储每个线程内部的事务id
        private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();
        ...


        /**
        * 写入操作日志
        * @param content
        */
        public void logEdit(String context){
        //加锁
        synchronized (this){
        //线程1:txid=1
        //线程2:txid=2
        //线程3:txid=3
        txid++;


                    //线程1:localTxid=1
        //线程2:localTxid=2
        //线程3:localTxid=3
        localTxid.set(txid);


        EditLog editLog = new EditLog(txid, content);


        //将操作日志写入缓冲
        buffer.write(editLog);
        }//释放锁




        logSync();
        }


        /**
             * 将缓冲中的数据刷盘
        */
        public void logSync(){
        synchronized (this){
        //当第一个线程执行到这里,isSyncRunning 为false,不执行分支内的逻辑
        if(isSyncRunning){
        //线程二:2
        //线程三:3
        long txid = localTxid.get();
        //当线程2和线程3进来时,由于txid<=syncMaxTxid(也就是3),说明线程1进行刷盘时,sync缓冲中
        //已经包含了线程2和线程3的操作日志,所以不需要线程2和线程3再进行刷盘,方法直接返回
        //当线程4进来时,该条件不满足,接着往下执行
        if(txid <= syncMaxTxid){
        return;
        }
        //线程4执行到这里,isWaitSync=false,继续向下执行
        //线程5执行到这里,isWaitSync=true,方法返回
        if(isWaitSync){
        //线程5直接返回
        return;
        }
        //线程4将isWaitSync赋值为true
        isWaitSync = true;
        //线程4执行到这里,isSyncRunning=true,执行里面的逻辑
        while(isSyncRunning){
        try {
        //线程4就在这儿等:退出等待有两种情况
        //1)等待时间到了 2)被人唤醒了
                                wait(2000);//wait操作会释放锁
        }catch (Exception e){
        e.printStackTrace();
        }
        }
        //线程4结束等待,更新isWaitSync=false
        isWaitSync = false;
        }


        //进行内存交换
                    //当第一个线程执行缓冲交换时,假设 currentBuffer             //中已经完成了线程1,线程2
        //中已经完成了线程1,线程2线程3 的操作日志写入
        buffer.setReadyToFlush();


        //如果同步缓冲中存在操作日志,获取其最大的事务id
        //这里由于假设线程1-3已经完成了日志写入,所以第一次获取到的最大事务id就是3
        if(buffer.getSyncBufffer().size() > 0) {
        syncMaxTxid = buffer.getLastTxid();
        }
        //更新标识,表明正在往磁盘中同步数据
        isSyncRunning = true;


        } //线程1 释放锁


        /**
        * 分段加锁
        * 线程1 刷写数据
        * 刷盘相对较慢
        */
        //这里耗时的刷盘操作并未加锁
        buffer.flush();


        //重新加锁
        synchronized (this) {
        //线程1刷写磁盘完成,将isSyncRunning 更新为false
        isSyncRunning = false;
        //唤醒等待线程。
        notifyAll();
        }
        }
        }
        复制
        下面通过两幅图对整个流程进行说明:

        • 线程1 调用 logEdit 方法,获取锁并将操作日志写入currentBuffer,之后释放锁并调用 logSync 方法

        • 线程1 释放锁后,线程2,线程3依次获取锁,将操作日志写入 currentBuffer

        • 线程1 执行 logSync 方法,首先会获取锁,由于 isSyncRunning 初始值为 false,if 条件不满足,直接跳过该分支执行缓冲交换(注意此时交换后的 syncBuffer 中包含线程2 和线程3 的操作日志),获取 syncBuffer 中的最大事务syncMaxTxid=3,然后将 isSyncRunning 标识更新为 true ,表示有线程在进行刷盘操作

        • 线程1 释放锁并执行刷盘操作,该操作耗时相对较长

        • 线程2 执行 logSync 方法,此时 isSyncRunning=true,进入if 分支,由于线程2的事务id=2,小于等于 syncMaxTxid,线程2直接返回

        • 线程3 执行 logSync 方法,和线程2 同理,线程3直接返回

        • 由于线程1 的刷盘操作耗时较长,此时线程4,线程5,线程6依次调用 logEdit 方法将操作日志写入 currentBuffer(线程4 写入前已经进行了缓冲交换)

        • 线程4 执行 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=false,线程4 不会进入 if(isWaitSync) 分支,之后将 isWaitSync 更新为 true,由于此时 isSyncRunning=true,线程4 调用 wait 方法进行等待并释放锁

        • 线程5 调用 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=true,线程5 进入if(isWaitSync) 分支后直接返回

        • 线程6 调用 logSync 方法,获取锁后进入if 分支,和线程5 同理直接返回

        • 线程4 由于调用了 wait 方法,只能等待时间到达或者其它线程唤醒

        • 假设此时线程1 执行刷盘操作完毕,再次获取锁后将标记 isSyncRunning 置为 false,表示刷盘操作已完成,并调用 notifyAll 方法唤醒等待的线程

        • 线程4 被线程1 唤醒,将标识 isWaitSync 置为 false,表示已经没有线程等待刷盘了,之后进行缓冲,继续执行上面的流程(此时线程7,线程8等已经将操作日志写入 currentBuffer)

        总结:
        • NameNode 通过双缓冲的机制,将写内存和刷写磁盘的操作进行分离,互不影响。
        • NameNode 通过分段加锁来减小锁的粒度,即使在高并发场景下,加锁操作都是纯内存操作,性能很高。而对于相对耗时的刷盘操作,由于该操作未加锁,某个线程刷盘时并不影响其它线程将操作日志写入写缓冲
        文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

        评论