在这样的场景下,NameNode 必然会面临高并发的访问,与此同时,为了保证元数据的安全可靠,不能仅仅保存在内存中,而是需要刷写到磁盘。那么,NameNode 如何在高并发的场景下既保证高性能地处理各种元数据操作,又可以将元数据持久化到磁盘来保证安全?
这里 NameNode 巧妙地使用了 双缓冲 机制。示意图如下:
在分析 双缓冲 机制前,首先了解下 NameNode 是如何对元数据进行持久化的。集群启动时,NN 会将磁盘存储的元数据信息(FsImage 文件)加载到内存,之后所有对元数据的操作都会记录其操作日志,并将这些日志持久化到 EditLog 文件中。后续就可以在 FsImage 的基础上执行 EditLog 中的操作来恢复集群元数据。
如果每次对元数据的操作都进行同步刷盘,性能会很低。这里 NameNode 会先把操作日志写入写缓冲(上图中的Current-Buffer),由于是纯内存操作,速度很快,当日志量达到一定阈值时,进行批量刷盘。为了解决刷盘过程中无法继续执行写操作的问题,NameNode 采用了双缓冲机制,也就是刷盘之前将 Sync-Buffer 和 Current-Buffer 进行交换,Current-Buffer 继续执行写操作,Sync-Buffer 进行刷盘,互不影响,刷盘完成后清空 Sync-Buffer,等待下次交换。
同时,为了提高并发处理能力,采用了分段加锁的方式,对于刷盘这种耗时的操作是不加锁的,其余操作由于是纯内存操作,即使加了锁速度也很快。这样既保证了高性能地处理并发访问,又可以将元数据持久化到磁盘来保证数据安全性。
NameNode 写入缓冲并进行缓冲交换和刷盘的具体逻辑在 FSEditLog.logSync() 方法中,这里采用伪代码对其原理进行说明:
1. 将操作日志封装成 EditLog 类。该类有两个基本属性:
txid 表示操作的事务id
context 表示操作日志的内容
/**
* 定义日志类型,每条操作日志就是一个 EditLog 对象
*/
class EditLog{
private long txid ;
private String context ;
public EditLog(long txid, String context) {
this.txid = txid;
this.context = context;
}
//get 和 set 方法
@Override
public String toString() {
return "EditLog{" +
"txid=" + txid +
", context='" + context + '\'' +
'}';
}
}
复制
class DoubleBuffer{
//执行日志写入操作的缓冲
private LinkedList<EditLog> currentBufffer = new LinkedList<EditLog>();
//执行日志同步刷盘的缓冲
private LinkedList<EditLog> syncBufffer = new LinkedList<EditLog>();
//get 和 set 方法
/**
* 向currentBuffer缓冲中写入操作日志
* @param log
*/
public void write(EditLog log){
currentBufffer.add(log);
}
/**
* 返回同步缓冲中的最大txid
* @return
*/
public long getLastTxid(){
return syncBufffer.getLast().getTxid();
}
/**
* 交换缓冲
*/
public void setReadyToFlush(){
LinkedList<EditLog> tmp = currentBufffer;
currentBufffer=syncBufffer;
syncBufffer=tmp;
}
/**
* 将同步缓冲中的数据刷盘,这里象征性地打印一下
*/
public void flush(){
for (EditLog editLog : syncBufffer) {
System.out.println("存入操作日志:"+editLog);
}
//清空缓冲
syncBufffer.clear();
}
}
复制
logEdit:该方法对外部开发,用于写入操作日志 logSync:在 logEdit 方法中会调用该方法,用于缓冲交换及日志刷盘
public class DoubleBufferDemo {
private long txid = 0L;
//双缓冲操作对象
private DoubleBuffer buffer = new DoubleBuffer();
//标记当前是否正在往磁盘里面刷写数据
private volatile Boolean isSyncRunning = false;
//标记是否有线程正在等待同步
private volatile Boolean isWaitSync = false;
//初始化刷写的最大事务id
private volatile Long syncMaxTxid = 0L;
//存储每个线程内部的事务id
private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();
...
/**
* 写入操作日志
* @param content
*/
public void logEdit(String context){
//加锁
synchronized (this){
//线程1:txid=1
//线程2:txid=2
//线程3:txid=3
txid++;
//线程1:localTxid=1
//线程2:localTxid=2
//线程3:localTxid=3
localTxid.set(txid);
EditLog editLog = new EditLog(txid, content);
//将操作日志写入缓冲
buffer.write(editLog);
}//释放锁
logSync();
}
/**
* 将缓冲中的数据刷盘
*/
public void logSync(){
synchronized (this){
//当第一个线程执行到这里,isSyncRunning 为false,不执行分支内的逻辑
if(isSyncRunning){
//线程二:2
//线程三:3
long txid = localTxid.get();
//当线程2和线程3进来时,由于txid<=syncMaxTxid(也就是3),说明线程1进行刷盘时,sync缓冲中
//已经包含了线程2和线程3的操作日志,所以不需要线程2和线程3再进行刷盘,方法直接返回
//当线程4进来时,该条件不满足,接着往下执行
if(txid <= syncMaxTxid){
return;
}
//线程4执行到这里,isWaitSync=false,继续向下执行
//线程5执行到这里,isWaitSync=true,方法返回
if(isWaitSync){
//线程5直接返回
return;
}
//线程4将isWaitSync赋值为true
isWaitSync = true;
//线程4执行到这里,isSyncRunning=true,执行里面的逻辑
while(isSyncRunning){
try {
//线程4就在这儿等:退出等待有两种情况
//1)等待时间到了 2)被人唤醒了
wait(2000);//wait操作会释放锁
}catch (Exception e){
e.printStackTrace();
}
}
//线程4结束等待,更新isWaitSync=false
isWaitSync = false;
}
//进行内存交换
//当第一个线程执行缓冲交换时,假设 currentBuffer //中已经完成了线程1,线程2
//中已经完成了线程1,线程2线程3 的操作日志写入
buffer.setReadyToFlush();
//如果同步缓冲中存在操作日志,获取其最大的事务id
//这里由于假设线程1-3已经完成了日志写入,所以第一次获取到的最大事务id就是3
if(buffer.getSyncBufffer().size() > 0) {
syncMaxTxid = buffer.getLastTxid();
}
//更新标识,表明正在往磁盘中同步数据
isSyncRunning = true;
} //线程1 释放锁
/**
* 分段加锁
* 线程1 刷写数据
* 刷盘相对较慢
*/
//这里耗时的刷盘操作并未加锁
buffer.flush();
//重新加锁
synchronized (this) {
//线程1刷写磁盘完成,将isSyncRunning 更新为false
isSyncRunning = false;
//唤醒等待线程。
notifyAll();
}
}
}
复制
线程1 调用 logEdit 方法,获取锁并将操作日志写入currentBuffer,之后释放锁并调用 logSync 方法
线程1 释放锁后,线程2,线程3依次获取锁,将操作日志写入 currentBuffer
线程1 执行 logSync 方法,首先会获取锁,由于 isSyncRunning 初始值为 false,if 条件不满足,直接跳过该分支执行缓冲交换(注意此时交换后的 syncBuffer 中包含线程2 和线程3 的操作日志),获取 syncBuffer 中的最大事务syncMaxTxid=3,然后将 isSyncRunning 标识更新为 true ,表示有线程在进行刷盘操作
线程1 释放锁并执行刷盘操作,该操作耗时相对较长
线程2 执行 logSync 方法,此时 isSyncRunning=true,进入if 分支,由于线程2的事务id=2,小于等于 syncMaxTxid,线程2直接返回
线程3 执行 logSync 方法,和线程2 同理,线程3直接返回
由于线程1 的刷盘操作耗时较长,此时线程4,线程5,线程6依次调用 logEdit 方法将操作日志写入 currentBuffer(线程4 写入前已经进行了缓冲交换)
线程4 执行 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=false,线程4 不会进入 if(isWaitSync) 分支,之后将 isWaitSync 更新为 true,由于此时 isSyncRunning=true,线程4 调用 wait 方法进行等待并释放锁
线程5 调用 logSync 方法,获取锁后进入 if 分支,由于 isWaitSync=true,线程5 进入if(isWaitSync) 分支后直接返回
线程6 调用 logSync 方法,获取锁后进入if 分支,和线程5 同理直接返回
线程4 由于调用了 wait 方法,只能等待时间到达或者被其它线程唤醒
假设此时线程1 执行刷盘操作完毕,再次获取锁后将标记 isSyncRunning 置为 false,表示刷盘操作已完成,并调用 notifyAll 方法唤醒等待的线程
线程4 被线程1 唤醒,将标识 isWaitSync 置为 false,表示已经没有线程等待刷盘了,之后进行缓冲交换,继续执行上面的流程(此时线程7,线程8等已经将操作日志写入 currentBuffer)
NameNode 通过双缓冲的机制,将写内存和刷写磁盘的操作进行分离,互不影响。 NameNode 通过分段加锁来减小锁的粒度,即使在高并发场景下,加锁操作都是纯内存操作,性能很高。而对于相对耗时的刷盘操作,由于该操作未加锁,某个线程刷盘时并不影响其它线程将操作日志写入写缓冲。