暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Etcd源码剖析(二)

考拉苑 2018-08-27
180
5、store的具体操作【续】
复制
// 当前store的版本.
func (s *store) Version() int {
return s.CurrentVersion
}

// 当前store的index.

// 通过使用store的world lock【读锁】锁定当前store的 防止读取当前store的index出现数据安全问题

func (s *store) Index() uint64 {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
return s.CurrentIndex
}


// Get操作

// 当recursive=true时,即将获取指定的node下面所有的内容 否则只获取当前node内容(不包括子node内容)

// 当stored=true,将按照key的自然排序输出node的内容

func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
var err *etcdErr.Error  // 定义etcd的 error
  get操作时为了防止内容读取过程中出现变更 通过使用读取锁store的world lock,来锁定当前store
复制
  s.worldLock.RLock()  
defer s.worldLock.RUnlock()

defer func() {

// 读取成功的操作
复制

if err == nil {

// 变更stats的内容 增加成次数
复制
         s.Stats.Inc(GetSuccess)
       
if recursive {  // 若是recurise=true
           reportReadSuccess(
GetRecursive)
        }
else {
           reportReadSuccess(
Get)
        }
       
return
     }

   // 读取失败

    增长fail次数

      s.Stats.Inc(GetFail)
     if recursive {
        reportReadFailure(
GetRecursive)
     }
else {
        reportReadFailure(
Get)
     }
  }()
 
  n
, err := s.internalGet(nodePath)
 
if err != nil {
     
return nil, err
  }
  若是get操作成功 则需返回一个event
  e := newEvent(
Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
  e.EtcdIndex = s.CurrentIndex
  e.Node.loadInternalNode(n
, recursive, sorted, s.clock)

 
return e, nil
}


// 辅助类: 获取指定的nodepath对应的node

func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {

 // 得到当前node的文件path形式内容

nodePath = path.Clean(path.Join("/", nodePath))
根据指定node获取指定name是否在当前的node的子node中存在
walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
当前给定的父node类型不为directory 是不能够进行添加子node
if !parent.IsDir() {
err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
return nil, err
}

   // 当前给定的父node=directory 则判断当前父node下面的子node是否存在对应符合要求的node

// 存在则直接返回对应的node和nil【没有error出现】
     child, ok := parent.Children[name]
if ok {
return child, nil
}
当不存在对应的node时 直接返回对应的error: Ecode key not found
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
}

   // 若是key-value,则直接返回对应的root对应的node

// 若是directory,则需要迭代node path中的每个node 直到找到最后一个node
    f, err := s.walk(nodePath, walkFunc)

if err != nil { 出现error 只需要返回对应的error内容 无合适node返回
return nil, err
}
return f, nil 返回符合要求的node path对应的最后一个node的内容 即为查询所需的内容
}

// 辅助方法:walk 遍历所有nodepath并在每个directory上应用walkfunc

func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {

  拆分指定node path

  components := strings.Split(nodePath, "/")
  当前store的对应root
  curr := s.Root
 
var err *etcdErr.Error
  遍历node path
  for i := 1; i < len(components); i++ {
     
if len(components[i]) == 0 { // 忽略空字符串 代表当前的nodepath只有root目录 不含有子node
        return curr, nil
     }
     // 迭代获取node path中最后一个node
     curr
, err = walkFunc(curr, components[i])
     
if err != nil {
       
return nil, err
     }
  }

 
return curr, nil
}


// 在node path新增node,同时创建出来的node默认是没有ttl

// 若是node已经存在node path中,则创建失败 返回error
// 若是node path中的任意一个node是file 则创建失败 返回error
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
 
var err *etcdErr.Error
   // 该操作属于安全的
  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() {
     
if err == nil {  // 创建成功
        s.Stats.Inc(
CreateSuccess) 变更stats 记录create success记录
        reportWriteSuccess(
Create) 记录写成功
       
return
     }

     s.Stats.Inc(
CreateFail)  // 失败 变更stats中的create fail
     reportWriteFailure(
Create) 记录写失败
  }()

  e
, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
 
if err != nil {
     
return nil, err
  }

  e.EtcdIndex = s.CurrentIndex
  s.WatcherHub.notify(e)

 
return e, nil

}


// 辅助方法:内部执行create

func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
  expireTime time.Time, action string) (*Event, *etcdErr.Error) {
   // 获取store当前的index及下一个新的index【默认在当前index+1】

  currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1


   // 当unique=true 在当前node path追加唯一项
  if unique { // append unique item under the node path
     nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
  }

  nodePath = path.Clean(path.Join(
"/", nodePath))


    // 防止用户改变"/"
  if s.readonlySet.Contains(nodePath) {
     
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
  }

 
// Assume expire times that are way in the past are

  This can occur when the time is serialized to JS

  if expireTime.Before(minExpireTime) {
     expireTime = Permanent
  }

  dirName
, nodeName := path.Split(nodePath)


    遍历nodePath,创建dirs并获得最后一个目录节点
 
d, err := s.walk(dirName, s.checkDir)


  创建dir失败 

  if err != nil {
     s.Stats.Inc(
SetFail)
     reportWriteFailure(action)
     err.Index = currIndex
     
return nil, err
  }
   // 目录创建成功时 生成对应的event
  e := newEvent(action
, nodePath, nextIndex, nextIndex)
  eNode := e.Node

  n
, _ := d.GetChild(nodeName)

 
// force will try to replace an existing file
  if n != nil {
     
if replace { 是否替换文件
       
if n.IsDir() { 类型=directory时  不能完成替换原有文件的 直接返回etcd error
           
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)

        }


        e.PrevNode = n.Repr(
false, false, s.clock)

        n.Remove(
false, false, nil)
     }
else {
       
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
     }
  }

 
if !dir { // 创建文件  key-value
     // copy the value for safety
     valueCopy := value
     eNode.Value = &valueCopy

     n = newKV(s
, nodePath, value, nextIndex, d, expireTime)

  }
else { // 创建文档结构 directory
     eNode.Dir = true

     n = newDir(s, nodePath, nextIndex, d, expireTime)
  }

 
// we are sure d is a directory and does not have the children with name n.Name
  d.Add(n)

 
// node with TTL
  if !n.IsPermanent() {
     s.ttlKeyHeap.push(n)

     eNode.Expiration
, eNode.TTL = n.expirationAndTTL(s.clock)
  }

  s.CurrentIndex = nextIndex

 
return e, nil
}

// 辅助方法: Repr方法

// 关联内部node与其对应的node extern

// 1、默认创建NodeExtern实例

// 2、根据node是否需要迭代子node path以及排序输出不同外部呈现的nodeextern

// a、当node类型是directory时 指定不需要循环处理其子node 则直接输出NodeExtern(包括key、dir、modifiedindex、createindex、expire、ttl)

// b、需要循环处理其子node
//   b1、获取到当前node的所有子node

//   b2、将child nodes中的hidden node剔除 迭代执行repr筛选

//   b3、针对b2中符合要求node结合sort进行是否排序输出

// 最终内部node与外部展示node extern关联完成

func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {
if n.IsDir() { directory
node := &NodeExtern{
Key: n.Path,         node path
        Dir: true,          // directory
        ModifiedIndex: n.ModifiedIndex,  // 修改index
        CreatedIndex: n.CreatedIndex,   创建index

     }


node.Expiration, node.TTL = n.expirationAndTTL(clock)

if !recursive { 不需要循环迭代 则直接返回node
return node
}

    若是需要循环迭代

    1、获取该node下的所有子node

    2、根据child node的大小生成对应的nodes集合

     
      children, _ := n.List()  
     node.Nodes =
make(NodeExterns, len(children))

 

      不使用children node的slice directly的内容

     // 隐藏的node需要跳过的      i := 0

     for _, child := range children {
            
if child.IsHidden() { // 过滤到hidden node
               continue
          }
            // 迭代关联符合要求的node
            node.Nodes[i] = child.Repr(recursive
, sorted, clock)

            i++
     }

     
// 清除原有的hidden node
     node.Nodes = node.Nodes[:i]
     
if sorted { node排序
        sort.Sort(node.Nodes)
     }
     // 返回结果
     
return node
  }

 
// since n.Value could be changed later, so we need to copy the value out
  value := n.Value
  node := &
NodeExtern{
     Key:           n.Path
,
     Value:         &value,
     ModifiedIndex: n.ModifiedIndex,
     CreatedIndex:  n.CreatedIndex,
  }
  node.Expiration
, node.TTL = n.expirationAndTTL(clock)
 
return node

}


辅助方法:获取node的expire和ttl

func (n *node) expirationAndTTL(clock clockwork.Clock) (*time.Time, int64) {

if !n.IsPermanent() {  

   // 非永久性node 需要计算ttl: ceiling((expireTime - timeNow) nanosecondsPerSecond)
   // 若是node的expire time与clock.now相比的时间差%time.Second != 0 则ttl++

    * compute ttl as:
        ceiling( (expireTime - timeNow) nanosecondsPerSecond )
        which ranges from 1..n
        rather than as:
        ( (expireTime - timeNow) nanosecondsPerSecond ) + 1
        which ranges 1..n+1
     */
     ttlN := n.ExpireTime.Sub(clock.Now())
ttl := ttlN time.Second
     if (ttlN % time.Second) > 0 {
ttl++
}
t := n.ExpireTime.UTC()
return &t, int64(ttl) 返回expire time 和 ttl
}
return nil, 0  // 默认返回
}



// 创建或替换node path中的node

// store的操作行为 都属于一个event 

func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
 
var err *etcdErr.Error 定义error


  针对store的操作 需要锁定对应的world 保证安全性

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() { 根据当前操作时是否触发error 变更对应的stats和write的成败与否
     
if err == nil {
        s.Stats.Inc(
SetSuccess)
        reportWriteSuccess(
Set)
       
return
     }

     s.Stats.Inc(
SetFail)
     reportWriteFailure(
Set)
  }()

 
// 获取node上一个value
  n, getErr := s.internalGet(nodePath)
 
if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
     err = getErr
     
return nil, err
  }

 
if expireOpts.Refresh {
     
if getErr != nil {
        err = getErr
       
return nil, err
     }
else {
        value = n.Value
     }
  }

 
// 设置新值
  e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
 
if err != nil {
     
return nil, err
  }
  e.EtcdIndex = s.CurrentIndex


    // event关联前一个node
  if getErr == nil {
     prev := newEvent(
Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
     prev.Node.loadInternalNode(n
, false, false, s.clock)
     e.PrevNode = prev.Node
  }

 
if !expireOpts.Refresh { 是否refresh
     s.WatcherHub.notify(e)
  }
else {                 第一次设置refresh 并添加watcher hub便于后续的watch
     e.SetRefresh()
     s.WatcherHub.add(e)
  }

 
return e, nil
}

// returns user-readable cause of failed comparison

// 返回用户可读比较失败原因【一般比较index和value】

func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
 
switch which {
 
case CompareIndexNotMatch:  // index不匹配
     
return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
 
case CompareValueNotMatch:  // value不匹配
     
return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
 
default:                    // 默认
     
return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
  }
}
//
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
  value string, expireOpts TTLOptionSet) (*Event, error) {

 
var err *etcdErr.Error

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() {
     
if err == nil {
        s.Stats.Inc(
CompareAndSwapSuccess)
        reportWriteSuccess(
CompareAndSwap)
       
return
     }

     s.Stats.Inc(
CompareAndSwapFail)
     reportWriteFailure(
CompareAndSwap)
  }()
   // 获取node path最后一个node
  nodePath = path.Clean(path.Join(
"/", nodePath))
 
// 将只读set中nodepath是否存在;注意是不允许用户改变路径 "/"
  if s.readonlySet.Contains(nodePath) {
     
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  }
   // 获取node对应的内容
  n
, err := s.internalGet(nodePath)
 
if err != nil {
     
return nil, err
  }
 
if n.IsDir() { // 比较交换操作仅用于key-value 不能用于directory
     err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
     
return nil, err
  }

  // If both of the prevValue and prevIndex are given, we will test both of them.

  将node与给定的index和value进行比较 只有在value和index比较都是ok的  结果才属于正常 

  否则输出fail的原因和error 

  if ok, which := n.Compare(prevValue, prevIndex); !ok {
     cause := getCompareFailCause(n
, which, prevValue, prevIndex)
     err = etcdErr.NewError(etcdErr.
EcodeTestFailed, cause, s.CurrentIndex)
     
return nil, err
  }

 
if expireOpts.Refresh {
     value = n.Value
  }

 
// update etcd index
  s.CurrentIndex++


  定义操作event

  e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
  e.EtcdIndex = s.CurrentIndex
  e.PrevNode = n.Repr(
false, false, s.clock)
  eNode := e.Node

 
// 当successed时 需要变更value、currentIndex以及更新ttl
  n.Write(value, s.CurrentIndex)
  n.UpdateTTL(expireOpts.ExpireTime)

 
// 为了安全起见 需要备份value 后续操作也是针对备份value操作
  valueCopy := value
  eNode.Value = &valueCopy
  eNode.Expiration
, eNode.TTL = n.expirationAndTTL(s.clock)

 
if !expireOpts.Refresh {
     s.WatcherHub.notify(e)
  }
else {
     e.SetRefresh()
     s.WatcherHub.add(e)
  }

 
return e, nil
}

// Delete删除给定路径上的节点.
// 若是对应的node类型=directory 需要对应的recursive=true方可执行delete操作.
func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
 
var err *etcdErr.Error

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() { 记录删除成功、失败时的计数
     
if err == nil { 成功
        s.Stats.Inc(
DeleteSuccess)
        reportWriteSuccess(
Delete)
       
return
     }
         // 失败
     s.Stats.Inc(
DeleteFail)
     reportWriteFailure(
Delete)
  }()

  nodePath = path.Clean(path.Join(
"/", nodePath))
 
// 并不希望改变 "/"
  if s.readonlySet.Contains(nodePath) {
     
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  }

 
//用于当node属于directory类型时
  if recursive {
     dir =
true
  }
   // 获取node内容
  n
, err := s.internalGet(nodePath)
 
if err != nil { // 当对应的node不存在时, 返回error
     return nil, err
  }
   // 最新的index = 当前index + 1 递增

   nextIndex := s.CurrentIndex + 1

    // 操作event 
  e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
  e.EtcdIndex = nextIndex
  e.PrevNode = n.Repr(
false, false, s.clock)
  eNode := e.Node

 
if n.IsDir() {
     eNode.Dir =
true
  }

  callback :=
func(path string) { // notify函数
     // 当删除时  通知notify当前node的watchers
     s.WatcherHub.notifyWatchers(e, path, true)
  }
   // remove 并执行watch
  err = n.Remove(dir
, recursive, callback)
 
if err != nil {
     
return nil, err
  }

 
// 更新index
  s.CurrentIndex++
   
  s.WatcherHub.notify(e)

 
return e, nil
}
// 可参考delete实现
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
 
var err *etcdErr.Error

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() {
     
if err == nil {
        s.Stats.Inc(
CompareAndDeleteSuccess)
        reportWriteSuccess(
CompareAndDelete)
       
return
     }

     s.Stats.Inc(
CompareAndDeleteFail)
     reportWriteFailure(
CompareAndDelete)
  }()

  nodePath = path.Clean(path.Join(
"/", nodePath))

  n
, err := s.internalGet(nodePath)
 
if err != nil { // 当node不存在时, 返回 error
     return nil, err
  }
 
if n.IsDir() { // node类型不能为directory 只有key-value才能完成比较和交换
     return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
  }

 
// 给定prevValue和prevIndex,将node与其进行比较.
  if ok, which := n.Compare(prevValue, prevIndex); !ok {
     cause := getCompareFailCause(n
, which, prevValue, prevIndex)
     
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
  }

 
// 更新index
  s.CurrentIndex++

  e := newEvent(
CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
  e.EtcdIndex = s.CurrentIndex
  e.PrevNode = n.Repr(
false, false, s.clock)

  callback :=
func(path string) { // notify 函数
     s.WatcherHub.notifyWatchers(e, path, true)
  }

  err = n.Remove(
false, false, callback)
 
if err != nil {
     
return nil, err
  }

  s.WatcherHub.notify(e)

 
return e, nil
}


// watch机制:主要用于监控node path对应的内容进行变更【create、add、delete、update】

func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
  s.worldLock.RLock()
 
defer s.worldLock.RUnlock()

  key = path.Clean(path.Join(
"/", key))
 
if sinceIndex == 0 {
     sinceIndex = s.CurrentIndex +
1
  }
 
//注: WatcherHub并不知道current index,需要人工提供的
  w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
 
if err != nil {
     
return nil, err
  }

 
return w, nil
}

// Update:更新node的ttl和value.

// 当node=file时 可更新value和ttl

// 当node=directory时, 只能更新ttl

func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
 
var err *etcdErr.Error

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

 
defer func() {
     
if err == nil {
        s.Stats.Inc(
UpdateSuccess)
        reportWriteSuccess(
Update)
       
return
     }

     s.Stats.Inc(
UpdateFail)
     reportWriteFailure(
Update)
  }()

  nodePath = path.Clean(path.Join(
"/", nodePath))
  if s.readonlySet.Contains(nodePath) {
     
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
  }
   // 当前index和接下来新建的index
  currIndex
, nextIndex := s.CurrentIndex, s.CurrentIndex+1

  n, err := s.internalGet(nodePath)
 
if err != nil { // 当node不存在时返回error
     return nil, err
  }
 
if n.IsDir() && len(newValue) != 0 { 当node=directory且包含内容 其对应的value不能更新非空
     return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
  }

 
if expireOpts.Refresh {
     newValue = n.Value
  }
   // 操作event
  e := newEvent(
Update, nodePath, nextIndex, n.CreatedIndex)
  e.EtcdIndex = nextIndex
  e.PrevNode = n.Repr(
false, false, s.clock)
  eNode := e.Node

  n.Write(newValue
, nextIndex)

 
if n.IsDir() {
     eNode.Dir =
true
  } else {
     
// 安全copy值
     newValueCopy := newValue
     eNode.Value = &newValueCopy
  }

 
// update ttl
  n.UpdateTTL(expireOpts.ExpireTime)

  eNode.Expiration
, eNode.TTL = n.expirationAndTTL(s.clock)

 
if !expireOpts.Refresh {
     s.WatcherHub.notify(e)
  }
else {
     e.SetRefresh()
     s.WatcherHub.add(e)
  }

  s.CurrentIndex = nextIndex

 
return e, nil
}

// 删除过期的key

func (s *store) DeleteExpiredKeys(cutoff time.Time) {
  s.worldLock.Lock()
 

  s.worldLock.Lock()
 
defer s.worldLock.Unlock()

  for {
      ttlKeyHeap的top key
     

     node := s.ttlKeyHeap.top()

     // 直至对应node不存在或node对应的expire time还未失效

     if node == nil || node.ExpireTime.After(cutoff) {

        break
     }
        增加current index

      s.CurrentIndex++

          操作event
     e := newEvent(
Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
     e.EtcdIndex = s.CurrentIndex
     e.PrevNode = node.Repr(
false, false, s.clock)
     
if node.IsDir() {
        e.Node.Dir =
true
     }

     callback :=
func(path string) { // 通知watcher
        / 触发watch
        s.WatcherHub.notifyWatchers(e, path, true)
     }

     s.ttlKeyHeap.pop()
     node.Remove(
true, true, callback)

     reportExpiredKey()
     s.Stats.Inc(
ExpireCount)

     s.WatcherHub.notify(e)
  }

}

// checkDir:检查dirName是否是parent node下面的.
// node = directory, 将返回node的指针.
// node不存在, 将创建一个新的directory并将node的指针返回
// 若是node=file, 将返回error.
func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
  node
, ok := parent.Children[dirName] // 是否存在parent node下

 
if ok {
     
if node.IsDir() {  // 存在且是directory 直接返回node指针
       
return node, nil
     }
        // 否则返回error
     
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
  }
   // 若不存在parent node下 则创建对应的directory 并返回对应node的指针
  n := newDir(s
, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)

  parent.Children[dirName] = n

 
return n, nil
}

// Save:保存存储系统的static状态.
// 不能保存watcher的state.
// 不能保存parent node的字段. 或会出现json包的循环依赖.
func (s *store) Save() ([]byte, error) {
  b
, err := json.Marshal(s.Clone())
 
if err != nil {
     
return nil, err
  }

 
return b, nil
}
// 直接操作store 而非备份
func (s *store) SaveNoCopy() ([]byte, error) {
  b
, err := json.Marshal(s)
 
if err != nil {
     
return nil, err
  }

 
return b, nil
}
// clone:通过新建store 获取对应的current index、root、watcherHub、stats、current version
func (s *store) Clone() Store {
  s.worldLock.Lock()

  clonedStore := newStore()
  clonedStore.CurrentIndex = s.CurrentIndex
  clonedStore.Root = s.Root.Clone()
  clonedStore.WatcherHub = s.WatcherHub.clone()
  clonedStore.Stats = s.Stats.clone()
  clonedStore.CurrentVersion = s.CurrentVersion

  s.worldLock.Unlock()
 
return clonedStore
}

// Recovery:从静态状态恢复存储系统
// 需要回复这些node的parent fields.
// 需要删除失效的node同时也需要额外创建routine来完成监控.
func (s *store) Recovery(state []byte) error {
  s.worldLock.Lock()
 
defer s.worldLock.Unlock()
  err := json.Unmarshal(state
, s)

 
if err != nil {
     
return err
  }

  s.ttlKeyHeap = newTtlKeyHeap()

  s.Root.recoverAndclean()
 
return nil
}
复制


后记:本文主要为了补充Etcd源码剖析(一)中store的内容






文章转载自考拉苑,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论