5、store的具体操作【续】复制
// 当前store的版本.
func (s *store) Version() int {
return s.CurrentVersion
}// 当前store的index.
// 通过使用store的world lock【读锁】锁定当前store的 防止读取当前store的index出现数据安全问题
func (s *store) Index() uint64 {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
return s.CurrentIndex
}// Get操作
// 当recursive=true时,即将获取指定的node下面所有的内容 否则只获取当前node内容(不包括子node内容)
// 当stored=true,将按照key的自然排序输出node的内容
func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
var err *etcdErr.Error // 定义etcd的 error
get操作时为了防止内容读取过程中出现变更 通过使用读取锁store的world lock,来锁定当前store复制
s.worldLock.RLock()
defer s.worldLock.RUnlock()defer func() {
// 读取成功的操作复制
if err == nil {
// 变更stats的内容 增加成次数复制
s.Stats.Inc(GetSuccess)
if recursive { // 若是recurise=true
reportReadSuccess(GetRecursive)
} else {
reportReadSuccess(Get)
}
return
}// 读取失败
增长fail次数
s.Stats.Inc(GetFail)if recursive {
reportReadFailure(GetRecursive)
} else {
reportReadFailure(Get)
}
}()
n, err := s.internalGet(nodePath)
if err != nil {
return nil, err
}
若是get操作成功 则需返回一个event
e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.Node.loadInternalNode(n, recursive, sorted, s.clock)
return e, nil
}
// 辅助类: 获取指定的nodepath对应的node
func (s *store) internalGet(nodePath string) (*node, *etcdErr.Error) {
// 得到当前node的文件path形式内容
nodePath = path.Clean(path.Join("/", nodePath))
根据指定node获取指定name是否在当前的node的子node中存在
walkFunc := func(parent *node, name string) (*node, *etcdErr.Error) {
当前给定的父node类型不为directory 是不能够进行添加子node
if !parent.IsDir() {
err := etcdErr.NewError(etcdErr.EcodeNotDir, parent.Path, s.CurrentIndex)
return nil, err
}// 当前给定的父node=directory 则判断当前父node下面的子node是否存在对应符合要求的node
// 存在则直接返回对应的node和nil【没有error出现】child, ok := parent.Children[name]
if ok {
return child, nil
}
当不存在对应的node时 直接返回对应的error: Ecode key not found
return nil, etcdErr.NewError(etcdErr.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
}// 若是key-value,则直接返回对应的root对应的node
// 若是directory,则需要迭代node path中的每个node 直到找到最后一个nodef, err := s.walk(nodePath, walkFunc)
if err != nil { 出现error 只需要返回对应的error内容 无合适node返回
return nil, err
}
return f, nil 返回符合要求的node path对应的最后一个node的内容 即为查询所需的内容
}// 辅助方法:walk 遍历所有nodepath并在每个directory上应用walkfunc
func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *etcdErr.Error)) (*node, *etcdErr.Error) {
拆分指定node path
components := strings.Split(nodePath, "/")
当前store的对应root
curr := s.Root
var err *etcdErr.Error
遍历node path
for i := 1; i < len(components); i++ {
if len(components[i]) == 0 { // 忽略空字符串 代表当前的nodepath只有root目录 不含有子node
return curr, nil
}
// 迭代获取node path中最后一个node
curr, err = walkFunc(curr, components[i])
if err != nil {
return nil, err
}
}
return curr, nil
}
// 在node path新增node,同时创建出来的node默认是没有ttl
// 若是node已经存在node path中,则创建失败 返回error
// 若是node path中的任意一个node是file 则创建失败 返回error
func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
// 该操作属于安全的
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() {
if err == nil { // 创建成功
s.Stats.Inc(CreateSuccess) 变更stats 记录create success记录
reportWriteSuccess(Create) 记录写成功
return
}
s.Stats.Inc(CreateFail) // 失败 变更stats中的create fail
reportWriteFailure(Create) 记录写失败
}()
e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
if err != nil {
return nil, err
}
e.EtcdIndex = s.CurrentIndex
s.WatcherHub.notify(e)
return e, nil}
// 辅助方法:内部执行create
func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
expireTime time.Time, action string) (*Event, *etcdErr.Error) {
// 获取store当前的index及下一个新的index【默认在当前index+1】currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
// 当unique=true 在当前node path追加唯一项
if unique { // append unique item under the node path
nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
}
nodePath = path.Clean(path.Join("/", nodePath))
// 防止用户改变"/"if s.readonlySet.Contains(nodePath) {
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", currIndex)
}
// Assume expire times that are way in the past areThis can occur when the time is serialized to JS
if expireTime.Before(minExpireTime) {
expireTime = Permanent
}
dirName, nodeName := path.Split(nodePath)
遍历nodePath,创建dirs并获得最后一个目录节点
d, err := s.walk(dirName, s.checkDir)
创建dir失败
if err != nil {
s.Stats.Inc(SetFail)
reportWriteFailure(action)
err.Index = currIndex
return nil, err
}
// 目录创建成功时 生成对应的event
e := newEvent(action, nodePath, nextIndex, nextIndex)
eNode := e.Node
n, _ := d.GetChild(nodeName)
// force will try to replace an existing file
if n != nil {
if replace { 是否替换文件
if n.IsDir() { 类型=directory时 不能完成替换原有文件的 直接返回etcd error
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)}
e.PrevNode = n.Repr(false, false, s.clock)
n.Remove(false, false, nil)
} else {
return nil, etcdErr.NewError(etcdErr.EcodeNodeExist, nodePath, currIndex)
}
}
if !dir { // 创建文件 key-value
// copy the value for safety
valueCopy := value
eNode.Value = &valueCopy
n = newKV(s, nodePath, value, nextIndex, d, expireTime)
} else { // 创建文档结构 directory
eNode.Dir = true
n = newDir(s, nodePath, nextIndex, d, expireTime)
}
// we are sure d is a directory and does not have the children with name n.Name
d.Add(n)
// node with TTL
if !n.IsPermanent() {
s.ttlKeyHeap.push(n)
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
}
s.CurrentIndex = nextIndex
return e, nil
}// 辅助方法: Repr方法
// 关联内部node与其对应的node extern
// 1、默认创建NodeExtern实例
// 2、根据node是否需要迭代子node path以及排序输出不同外部呈现的nodeextern
// a、当node类型是directory时 指定不需要循环处理其子node 则直接输出NodeExtern(包括key、dir、modifiedindex、createindex、expire、ttl)
// b、需要循环处理其子node
// b1、获取到当前node的所有子node// b2、将child nodes中的hidden node剔除 迭代执行repr筛选
// b3、针对b2中符合要求node结合sort进行是否排序输出
// 最终内部node与外部展示node extern关联完成
func (n *node) Repr(recursive, sorted bool, clock clockwork.Clock) *NodeExtern {
if n.IsDir() { directory
node := &NodeExtern{
Key: n.Path, node path
Dir: true, // directory
ModifiedIndex: n.ModifiedIndex, // 修改index
CreatedIndex: n.CreatedIndex, 创建index}
node.Expiration, node.TTL = n.expirationAndTTL(clock)
if !recursive { 不需要循环迭代 则直接返回node
return node
}若是需要循环迭代
1、获取该node下的所有子node
2、根据child node的大小生成对应的nodes集合
children, _ := n.List()
node.Nodes = make(NodeExterns, len(children))
不使用children node的slice directly的内容
// 隐藏的node需要跳过的 i := 0
for _, child := range children {
if child.IsHidden() { // 过滤到hidden node
continue
}
// 迭代关联符合要求的node
node.Nodes[i] = child.Repr(recursive, sorted, clock)
i++
}
// 清除原有的hidden node
node.Nodes = node.Nodes[:i]
if sorted { node排序
sort.Sort(node.Nodes)
}
// 返回结果
return node
}
// since n.Value could be changed later, so we need to copy the value out
value := n.Value
node := &NodeExtern{
Key: n.Path,
Value: &value,
ModifiedIndex: n.ModifiedIndex,
CreatedIndex: n.CreatedIndex,
}
node.Expiration, node.TTL = n.expirationAndTTL(clock)
return node}
辅助方法:获取node的expire和ttl
func (n *node) expirationAndTTL(clock clockwork.Clock) (*time.Time, int64) {if !n.IsPermanent() {
// 非永久性node 需要计算ttl: ceiling((expireTime - timeNow) nanosecondsPerSecond)
* compute ttl as:
// 若是node的expire time与clock.now相比的时间差%time.Second != 0 则ttl++
ceiling( (expireTime - timeNow) nanosecondsPerSecond )
which ranges from 1..n
rather than as:
( (expireTime - timeNow) nanosecondsPerSecond ) + 1
which ranges 1..n+1
*/
ttlN := n.ExpireTime.Sub(clock.Now())
ttl := ttlN time.Second
if (ttlN % time.Second) > 0 {
ttl++
}
t := n.ExpireTime.UTC()
return &t, int64(ttl) 返回expire time 和 ttl
}
return nil, 0 // 默认返回
}
// 创建或替换node path中的node
// store的操作行为 都属于一个event
func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error 定义error
针对store的操作 需要锁定对应的world 保证安全性
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() { 根据当前操作时是否触发error 变更对应的stats和write的成败与否
if err == nil {
s.Stats.Inc(SetSuccess)
reportWriteSuccess(Set)
return
}
s.Stats.Inc(SetFail)
reportWriteFailure(Set)
}()
// 获取node上一个value
n, getErr := s.internalGet(nodePath)
if getErr != nil && getErr.ErrorCode != etcdErr.EcodeKeyNotFound {
err = getErr
return nil, err
}
if expireOpts.Refresh {
if getErr != nil {
err = getErr
return nil, err
} else {
value = n.Value
}
}
// 设置新值
e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
if err != nil {
return nil, err
}
e.EtcdIndex = s.CurrentIndex
// event关联前一个nodeif getErr == nil {
prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
prev.Node.loadInternalNode(n, false, false, s.clock)
e.PrevNode = prev.Node
}
if !expireOpts.Refresh { 是否refresh
s.WatcherHub.notify(e)
} else { 第一次设置refresh 并添加watcher hub便于后续的watch
e.SetRefresh()
s.WatcherHub.add(e)
}
return e, nil
}// returns user-readable cause of failed comparison
// 返回用户可读比较失败原因【一般比较index和value】
func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
switch which {
case CompareIndexNotMatch: // index不匹配
return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
case CompareValueNotMatch: // value不匹配
return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
default: // 默认
return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
}
}
//
func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
value string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() {
if err == nil {
s.Stats.Inc(CompareAndSwapSuccess)
reportWriteSuccess(CompareAndSwap)
return
}
s.Stats.Inc(CompareAndSwapFail)
reportWriteFailure(CompareAndSwap)
}()
// 获取node path最后一个node
nodePath = path.Clean(path.Join("/", nodePath))
// 将只读set中nodepath是否存在;注意是不允许用户改变路径 "/"
if s.readonlySet.Contains(nodePath) {
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
}
// 获取node对应的内容
n, err := s.internalGet(nodePath)
if err != nil {
return nil, err
}
if n.IsDir() { // 比较交换操作仅用于key-value 不能用于directory
err = etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
return nil, err
}// If both of the prevValue and prevIndex are given, we will test both of them.
将node与给定的index和value进行比较 只有在value和index比较都是ok的 结果才属于正常
否则输出fail的原因和error
if ok, which := n.Compare(prevValue, prevIndex); !ok {
cause := getCompareFailCause(n, which, prevValue, prevIndex)
err = etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
return nil, err
}
if expireOpts.Refresh {
value = n.Value
}
// update etcd index
s.CurrentIndex++
定义操作event
e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false, s.clock)
eNode := e.Node
// 当successed时 需要变更value、currentIndex以及更新ttl
n.Write(value, s.CurrentIndex)
n.UpdateTTL(expireOpts.ExpireTime)
// 为了安全起见 需要备份value 后续操作也是针对备份value操作
valueCopy := value
eNode.Value = &valueCopy
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}
return e, nil
}
// Delete删除给定路径上的节点.
// 若是对应的node类型=directory 需要对应的recursive=true方可执行delete操作.
func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() { 记录删除成功、失败时的计数
if err == nil { 成功
s.Stats.Inc(DeleteSuccess)
reportWriteSuccess(Delete)
return
}
// 失败
s.Stats.Inc(DeleteFail)
reportWriteFailure(Delete)
}()
nodePath = path.Clean(path.Join("/", nodePath))
// 并不希望改变 "/"
if s.readonlySet.Contains(nodePath) {
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
}
//用于当node属于directory类型时
if recursive {
dir = true
}
// 获取node内容
n, err := s.internalGet(nodePath)
if err != nil { // 当对应的node不存在时, 返回error
return nil, err
}
// 最新的index = 当前index + 1 递增nextIndex := s.CurrentIndex + 1
// 操作evente := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
e.EtcdIndex = nextIndex
e.PrevNode = n.Repr(false, false, s.clock)
eNode := e.Node
if n.IsDir() {
eNode.Dir = true
}
callback := func(path string) { // notify函数
// 当删除时 通知notify当前node的watchers
s.WatcherHub.notifyWatchers(e, path, true)
}
// remove 并执行watch
err = n.Remove(dir, recursive, callback)
if err != nil {
return nil, err
}
// 更新index
s.CurrentIndex++
s.WatcherHub.notify(e)
return e, nil
}
// 可参考delete实现
func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() {
if err == nil {
s.Stats.Inc(CompareAndDeleteSuccess)
reportWriteSuccess(CompareAndDelete)
return
}
s.Stats.Inc(CompareAndDeleteFail)
reportWriteFailure(CompareAndDelete)
}()
nodePath = path.Clean(path.Join("/", nodePath))
n, err := s.internalGet(nodePath)
if err != nil { // 当node不存在时, 返回 error
return nil, err
}
if n.IsDir() { // node类型不能为directory 只有key-value才能完成比较和交换
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
}
// 给定prevValue和prevIndex,将node与其进行比较.
if ok, which := n.Compare(prevValue, prevIndex); !ok {
cause := getCompareFailCause(n, which, prevValue, prevIndex)
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
}
// 更新index
s.CurrentIndex++
e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = n.Repr(false, false, s.clock)
callback := func(path string) { // notify 函数
s.WatcherHub.notifyWatchers(e, path, true)
}
err = n.Remove(false, false, callback)
if err != nil {
return nil, err
}
s.WatcherHub.notify(e)
return e, nil
}
// watch机制:主要用于监控node path对应的内容进行变更【create、add、delete、update】
func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
s.worldLock.RLock()
defer s.worldLock.RUnlock()
key = path.Clean(path.Join("/", key))
if sinceIndex == 0 {
sinceIndex = s.CurrentIndex + 1
}
//注: WatcherHub并不知道current index,需要人工提供的
w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
if err != nil {
return nil, err
}
return w, nil
}
// Update:更新node的ttl和value.// 当node=file时 可更新value和ttl
// 当node=directory时, 只能更新ttl
func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
var err *etcdErr.Error
s.worldLock.Lock()
defer s.worldLock.Unlock()
defer func() {
if err == nil {
s.Stats.Inc(UpdateSuccess)
reportWriteSuccess(Update)
return
}
s.Stats.Inc(UpdateFail)
reportWriteFailure(Update)
}()
nodePath = path.Clean(path.Join("/", nodePath))
if s.readonlySet.Contains(nodePath) {
return nil, etcdErr.NewError(etcdErr.EcodeRootROnly, "/", s.CurrentIndex)
}
// 当前index和接下来新建的index
currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
n, err := s.internalGet(nodePath)
if err != nil { // 当node不存在时返回error
return nil, err
}
if n.IsDir() && len(newValue) != 0 { 当node=directory且包含内容 其对应的value不能更新非空
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
}
if expireOpts.Refresh {
newValue = n.Value
}
// 操作event
e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
e.EtcdIndex = nextIndex
e.PrevNode = n.Repr(false, false, s.clock)
eNode := e.Node
n.Write(newValue, nextIndex)
if n.IsDir() {
eNode.Dir = true
} else {
// 安全copy值
newValueCopy := newValue
eNode.Value = &newValueCopy
}
// update ttl
n.UpdateTTL(expireOpts.ExpireTime)
eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
if !expireOpts.Refresh {
s.WatcherHub.notify(e)
} else {
e.SetRefresh()
s.WatcherHub.add(e)
}
s.CurrentIndex = nextIndex
return e, nil
}// 删除过期的key
func (s *store) DeleteExpiredKeys(cutoff time.Time) {
s.worldLock.Lock()
s.worldLock.Lock()
defer s.worldLock.Unlock()for {
ttlKeyHeap的top keynode := s.ttlKeyHeap.top()
// 直至对应node不存在或node对应的expire time还未失效
if node == nil || node.ExpireTime.After(cutoff) {
break
}
增加current indexs.CurrentIndex++
操作evente := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
e.EtcdIndex = s.CurrentIndex
e.PrevNode = node.Repr(false, false, s.clock)
if node.IsDir() {
e.Node.Dir = true
}
callback := func(path string) { // 通知watcher
/ 触发watch
s.WatcherHub.notifyWatchers(e, path, true)
}
s.ttlKeyHeap.pop()
node.Remove(true, true, callback)
reportExpiredKey()
s.Stats.Inc(ExpireCount)
s.WatcherHub.notify(e)
}
}
// checkDir:检查dirName是否是parent node下面的.
// node = directory, 将返回node的指针.
// node不存在, 将创建一个新的directory并将node的指针返回
// 若是node=file, 将返回error.
func (s *store) checkDir(parent *node, dirName string) (*node, *etcdErr.Error) {
node, ok := parent.Children[dirName] // 是否存在parent node下
if ok {
if node.IsDir() { // 存在且是directory 直接返回node指针
return node, nil
}
// 否则返回error
return nil, etcdErr.NewError(etcdErr.EcodeNotDir, node.Path, s.CurrentIndex)
}
// 若不存在parent node下 则创建对应的directory 并返回对应node的指针
n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
parent.Children[dirName] = n
return n, nil
}
// Save:保存存储系统的static状态.
// 不能保存watcher的state.
// 不能保存parent node的字段. 或会出现json包的循环依赖.
func (s *store) Save() ([]byte, error) {
b, err := json.Marshal(s.Clone())
if err != nil {
return nil, err
}
return b, nil
}
// 直接操作store 而非备份
func (s *store) SaveNoCopy() ([]byte, error) {
b, err := json.Marshal(s)
if err != nil {
return nil, err
}
return b, nil
}
// clone:通过新建store 获取对应的current index、root、watcherHub、stats、current version
func (s *store) Clone() Store {
s.worldLock.Lock()
clonedStore := newStore()
clonedStore.CurrentIndex = s.CurrentIndex
clonedStore.Root = s.Root.Clone()
clonedStore.WatcherHub = s.WatcherHub.clone()
clonedStore.Stats = s.Stats.clone()
clonedStore.CurrentVersion = s.CurrentVersion
s.worldLock.Unlock()
return clonedStore
}
// Recovery:从静态状态恢复存储系统
// 需要回复这些node的parent fields.
// 需要删除失效的node同时也需要额外创建routine来完成监控.
func (s *store) Recovery(state []byte) error {
s.worldLock.Lock()
defer s.worldLock.Unlock()
err := json.Unmarshal(state, s)
if err != nil {
return err
}
s.ttlKeyHeap = newTtlKeyHeap()
s.Root.recoverAndclean()
return nil
}复制
后记:本文主要为了补充Etcd源码剖析(一)中store的内容