暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

大规模集群下Hadoop NameNode如何承载每秒上千次的高并发访问

数据基石 2021-09-24
512


点击上方蓝字关注我们


一、问题起源

高并发请求 NameNode 会遇到什么样的问题?

现在大家都明白每次请求 NameNode 修改一条元数据(比如说申请上传一个文件,那么就需要在内存目录树中加入一个文件),都要写一条 edits log,包括两个步骤:

  • 写入本地磁盘。

  • 通过网络传输给 JournalNodes 集群。

但是如果并发请求,看见会设计到线程安全的问题!!!

NameNode 在写 edits log 时,必须保证每条 edits log 都有一个全局顺序递增的 transactionId(简称为 txid),这样才可以标识出来一条一条的 edits log 的先后顺序。

那么如何保证每条 edits log 的 txid 都是递增的?

回答:当然是加锁吖!!!

那么问题来了!!!

如果每次都是在一个加锁的代码块里,生成 txid,然后写磁盘文件 edits log,网络请求写入 journalnodes 一条 edits log,会咋样?

试想下结果:NN(NameNode)用多线程接收多个客户端的请求,接着修改完内存的元数据后,排着队写edits log!!

写磁盘、网络传输非常耗费性能!如果每秒并发很高过来,处理性能是扛不住的。如同老太太坐轮椅。

二、双缓冲机制+分段锁的源码篇

2.1 简单介绍

首先是因为虽然元数据是首先写入内存的,但是你要知道元数据在内存中并不是安全的,所以 namenode 要将元数据刷新到磁盘里面;

但是 namenode 并不是直接写入磁盘的,而是采用双缓冲机制,先将数据写入到内存中,然后在从内存中写入到磁盘里面

模型大概长这个样子:

大概的步骤简单概述一下:

1、首先将元数据写入内存(bufCurrent)中

2、当满足一定条件的时候,我们会将两个内存进行交换

3、我们将 bufCurrent 里面的数据交换到 bufReady 里面,然后 bufCurrent 里面为空,继续接收写入内存的数据

bufReady 里面保存的是写入内存里面的数据,然后偷偷刷到磁盘,刷完后清空内存

4、然后周而复始,bufCurrent 永远去接收数据,然后会把数据传递给 bufReady,bufReady 在继续偷偷刷磁盘

那么通过这个这种双缓冲机制,就将原本写磁盘的操作变成了写内存操作。从而大大提高了效率。

2.2 源码-hadoop 是如何实现双缓冲+分段锁的

public void logSync() {
    long syncStart = 0;//用来记录事务的最大ID

    //获取当前线程ID
    long mytxid = myTransactionId.get().txid;
    //默认不同步(意思是第二块内存还没有数据,所以不需要刷磁盘,既不需要做同步操作)
    boolean sync = false;
    try {
      EditLogOutputStream logStream = null;//EditLog的输出流
      synchronized (this) {//TODO 分段锁开始
        try {
          printStatistics(false);//打印静态信息

          /**
           * 如果当前工作的线程> 最大事务ID && 是同步状态的,那么说明当前线程正处于刷盘状态。
           * 说明此事正处于刷盘状态,则等待1s
           * */
          while (mytxid > synctxid && isSyncRunning) {
            try {
              wait(1000);
            } catch (InterruptedException ie) {
            }
          }

          //
          // If this transaction was already flushed, then nothing to do
          //如果当前的线程ID < 当前处理事务的最大ID,则说明当前线程的任务已经被其他线程完成了,什么也不用做了
          if (mytxid <= synctxid) {
            numTransactionsBatchedInSync++;
            if (metrics != null) {
              // Metrics is non-null only when used inside name node
              metrics.incrTransactionsBatchedInSync();
            }
            return;
          }

          //此事开启同步状态,开始刷盘
          syncStart = txid;
          isSyncRunning = true;//开启同步
          sync = true;

          //TODO 双缓冲区,交换数据
          try {
            if (journalSet.isEmpty()) {
              throw new IOException("No journals available to flush");
            }
            //双缓冲区交换数据
            editLogStream.setReadyToFlush();
          } catch (IOException e) {
            final String msg =
                    "Could not sync enough journals to persistent storage " +
                            "due to " + e.getMessage() + ". " +
                            "Unsynced transactions: " + (txid - synctxid);
            LOG.fatal(msg, new Exception());
            synchronized(journalSetLock) {
              IOUtils.cleanup(LOG, journalSet);
            }
            terminate(1, msg);
          }
        } finally {
          // 防止RuntimeException阻止其他日志编辑写入
          doneWithAutoSyncScheduling();
        }
        //editLogStream may become null,
        //so store a local variable for flush.
        logStream = editLogStream;
      }//TODO 分段锁结束

      // do the sync
      long start = monotonicNow();
      try {
        if (logStream != null) {
          //TODO 将缓冲区数据刷到磁盘(没有上锁)
          logStream.flush();///tmp/hadoop-angel/dfs/name/current
        }
      } catch (IOException ex) {
        synchronized (this) {
          final String msg =
                  "Could not sync enough journals to persistent storage. "
                          + "Unsynced transactions: " + (txid - synctxid);
          LOG.fatal(msg, new Exception());
          synchronized(journalSetLock) {
            IOUtils.cleanup(LOG, journalSet);
          }
          //TODO
          terminate(1, msg);
        }
      }
      long elapsed = monotonicNow() - start;

      if (metrics != null) { // Metrics non-null only when used inside name node
        metrics.addSync(elapsed);
      }
    } finally {
      // 持久化完毕之后,第二块内存空了,然后我们在修改下标志位,告诉程序现在没有做刷磁盘操作了
      synchronized (this) {//TODO 分段锁开始
        if (sync) {
          synctxid = syncStart;
          isSyncRunning = false;
        }
        this.notifyAll();
      }
    }
  }

三、总结

第一把锁,主要是判断isAutoSyncScheduled以及对isAutoSyncScheduled的赋值,这个主要是说明bufCurrent和bufReady开始交换内存了。

第二把锁,主要是判断isSyncRunning以及对isSyncRunning和isAutoSyncScheduled的赋值。isSyncRunning是用来判断是否在写磁盘,isAutoSyncScheduled用来判断是否在交换内存,如果在交换,就不能写入bufCurrent,如果在写磁盘,那就不能写磁盘。

第三把锁,赋值isSyncRunning,说明磁盘写入完成。
这期间最耗时的操作并没有加锁,其他内存操作的加锁,但是速度比较快,采用在这种分段加锁的方式和双缓冲机制,大大提高了性能。


大数据交流社群请加以下微信申请:




往期推荐

【Kafka SASL/SCRAM动态认证集群部署

【万字长文】详解Flink作业提交流程

服务器之间挂载共享磁盘目录

Kafka SASL集群部署

ElasticSearch核心知识讲解


文章转载自数据基石,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论