点击上方蓝字关注我们
一、问题起源
高并发请求 NameNode 会遇到什么样的问题?
现在大家都明白每次请求 NameNode 修改一条元数据(比如说申请上传一个文件,那么就需要在内存目录树中加入一个文件),都要写一条 edits log,包括两个步骤:
写入本地磁盘。
通过网络传输给 JournalNodes 集群。
但是如果并发请求,看见会设计到线程安全的问题!!!
NameNode 在写 edits log 时,必须保证每条 edits log 都有一个全局顺序递增的 transactionId(简称为 txid),这样才可以标识出来一条一条的 edits log 的先后顺序。
那么如何保证每条 edits log 的 txid 都是递增的?
回答:当然是加锁吖!!!
那么问题来了!!!
如果每次都是在一个加锁的代码块里,生成 txid,然后写磁盘文件 edits log,网络请求写入 journalnodes 一条 edits log,会咋样?
试想下结果:NN(NameNode)用多线程接收多个客户端的请求,接着修改完内存的元数据后,排着队写edits log!!!
写磁盘、网络传输非常耗费性能!如果每秒并发很高过来,处理性能是扛不住的。如同老太太坐轮椅。
二、双缓冲机制+分段锁的源码篇
2.1 简单介绍
首先是因为虽然元数据是首先写入内存的,但是你要知道元数据在内存中并不是安全的,所以 namenode 要将元数据刷新到磁盘里面;
但是 namenode 并不是直接写入磁盘的,而是采用双缓冲机制,先将数据写入到内存中,然后在从内存中写入到磁盘里面
模型大概长这个样子:
大概的步骤简单概述一下:
1、首先将元数据写入内存(bufCurrent)中
2、当满足一定条件的时候,我们会将两个内存进行交换
3、我们将 bufCurrent 里面的数据交换到 bufReady 里面,然后 bufCurrent 里面为空,继续接收写入内存的数据
bufReady 里面保存的是写入内存里面的数据,然后偷偷刷到磁盘,刷完后清空内存
4、然后周而复始,bufCurrent 永远去接收数据,然后会把数据传递给 bufReady,bufReady 在继续偷偷刷磁盘
那么通过这个这种双缓冲机制,就将原本写磁盘的操作变成了写内存操作。从而大大提高了效率。
2.2 源码-hadoop 是如何实现双缓冲+分段锁的
public void logSync() {
long syncStart = 0;//用来记录事务的最大ID
//获取当前线程ID
long mytxid = myTransactionId.get().txid;
//默认不同步(意思是第二块内存还没有数据,所以不需要刷磁盘,既不需要做同步操作)
boolean sync = false;
try {
EditLogOutputStream logStream = null;//EditLog的输出流
synchronized (this) {//TODO 分段锁开始
try {
printStatistics(false);//打印静态信息
/**
* 如果当前工作的线程> 最大事务ID && 是同步状态的,那么说明当前线程正处于刷盘状态。
* 说明此事正处于刷盘状态,则等待1s
* */
while (mytxid > synctxid && isSyncRunning) {
try {
wait(1000);
} catch (InterruptedException ie) {
}
}
//
// If this transaction was already flushed, then nothing to do
//如果当前的线程ID < 当前处理事务的最大ID,则说明当前线程的任务已经被其他线程完成了,什么也不用做了
if (mytxid <= synctxid) {
numTransactionsBatchedInSync++;
if (metrics != null) {
// Metrics is non-null only when used inside name node
metrics.incrTransactionsBatchedInSync();
}
return;
}
//此事开启同步状态,开始刷盘
syncStart = txid;
isSyncRunning = true;//开启同步
sync = true;
//TODO 双缓冲区,交换数据
try {
if (journalSet.isEmpty()) {
throw new IOException("No journals available to flush");
}
//双缓冲区交换数据
editLogStream.setReadyToFlush();
} catch (IOException e) {
final String msg =
"Could not sync enough journals to persistent storage " +
"due to " + e.getMessage() + ". " +
"Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
terminate(1, msg);
}
} finally {
// 防止RuntimeException阻止其他日志编辑写入
doneWithAutoSyncScheduling();
}
//editLogStream may become null,
//so store a local variable for flush.
logStream = editLogStream;
}//TODO 分段锁结束
// do the sync
long start = monotonicNow();
try {
if (logStream != null) {
//TODO 将缓冲区数据刷到磁盘(没有上锁)
logStream.flush();///tmp/hadoop-angel/dfs/name/current
}
} catch (IOException ex) {
synchronized (this) {
final String msg =
"Could not sync enough journals to persistent storage. "
+ "Unsynced transactions: " + (txid - synctxid);
LOG.fatal(msg, new Exception());
synchronized(journalSetLock) {
IOUtils.cleanup(LOG, journalSet);
}
//TODO
terminate(1, msg);
}
}
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
}
} finally {
// 持久化完毕之后,第二块内存空了,然后我们在修改下标志位,告诉程序现在没有做刷磁盘操作了
synchronized (this) {//TODO 分段锁开始
if (sync) {
synctxid = syncStart;
isSyncRunning = false;
}
this.notifyAll();
}
}
}
三、总结
第一把锁,主要是判断isAutoSyncScheduled以及对isAutoSyncScheduled的赋值,这个主要是说明bufCurrent和bufReady开始交换内存了。
第二把锁,主要是判断isSyncRunning以及对isSyncRunning和isAutoSyncScheduled的赋值。isSyncRunning是用来判断是否在写磁盘,isAutoSyncScheduled用来判断是否在交换内存,如果在交换,就不能写入bufCurrent,如果在写磁盘,那就不能写磁盘。
第三把锁,赋值isSyncRunning,说明磁盘写入完成。
这期间最耗时的操作并没有加锁,其他内存操作的加锁,但是速度比较快,采用在这种分段加锁的方式和双缓冲机制,大大提高了性能。
大数据交流社群请加以下微信申请:
往期推荐