暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kubernetes informer机制深入剖析

工程师站台 2021-10-13
1172

Kubernetes informer机制深入剖析



client-go提供了各种各样的开发自定义controller的机制,各种机制的定义详见https://github.com/kubernetes/client-go/tree/master/tools/cache

从目录的名字可以看出,其本质是实现了本地缓存(cache)


Kubernetes使用ETCD作为持久化存储, API server将ETCD保护起来在前端提供http服务,那么客户端如何不依赖中间件的情况下保证消息的实时性,可靠性和顺序性等呢?答案就是利用了Informer机制。Informer的机制,降低了Kubernetes各个组件跟ETCD以及API Server 的通信压力。

(https://github.com/kubernetes/client-go/tree/master/informers)


到此,势必会有一个疑问,informer是如何降低压力?答案其实已经在开篇时提到过:“本地缓存”。informer通过indexer进行了本地的内存缓存。


下图描述了自定义控制器如何和client-go交互工作以及client-go内部如何实现的。




这张图分为两部分,黄色图标是开发者需要自行开发的部分,而其它的部分是 client-go 已经提供的,直接使用即可。


Kubernetes控制平面大量使用事件和松散耦合的组件。其他分布式系统常使用远程调用(RPC)来触发行为。但Kubernetes并没有这么做。Kubernetes controller监听API server中Kubernetes对象的操作:添加,更新和删除。当发生此类事件时,controller 将执行其业务逻辑。


例如,为了通过deploymen 来启动pod,就涉及到许多controlle 和其他控制平面组件协同工作:


1. Deployment controller(在kube-controller-manager内部)感知到(通过deployment informer)用户创建了一个deployment。根据其业务逻辑,它将创建一个replica set。

2. Replica set controller(同样在kube-controller-manager内部)感知到(通过replica set informer)新的replica set被创建了。并随后运行其业务逻辑,它将创建一个pod对象。

3. Scheduler(在kube-scheduler二进制文件内部)同样是一个 controller,感知到(通过 pod informer)pod设置了一个空的`spec.nodeName`字段。根据其业务逻辑,它将该pod放入其调度队列中。

4. 与此同时,另一个controller kubelet(通过其pod informer)感知到有新的 pod 出现,但是新 pod的`spec.nodeName`字段为空,因此与kubelet的node name不匹配。它会忽视该pod并返回休眠状态(直到下一个事件)。

5. Scheduler更新pod中的`spec.nodeName`字段,并将该字段写入API server,由此将pod从工作队列中移出,并调度到具有足够可用资源的node上。

6. 由于pod的更新事件,kubelet将被再次唤醒,这次再将pod的`spec.nodeName`与自己的`node name`进行比较,会发现是匹配的,接着kubelet将启动pod中的容器,并将容器已启动的信息写入`pod status`中, 由此上报给API server。

7. Replica set controller会感知到已更新的pod,但并不会做什么。

8. 如果pod终止,kubelet将感知到该事件,进而从API server获取pod对象,并把`pod status`设置为 “terminated”,然后将其写回到API server。

9. Replica set controller会感知到终止的 pod,并决定必须更换此pod。它将在API server上删除终止了的pod,然后创建一个新的pod。

10. 依此类推。


许多独立的控制循环只通过 API server 上对象的变化进行通信,这些变化通过 informer 触发事件。


从最底层的实现可以发现,`List-watch` 是 `K8S` 统一的异步消息处理机制,保证了消息的实时性,可靠性,顺序性,性能等等,为声明式风格的`API` 奠定了良好的基础,它是优雅的通信方式,是 `K8S 架构`的精髓。


简单的watch实现:


1. 基于http的**Chunked transfer encoding**(必须使用Transfer-Encoding:chunked,这样才能保证服务器可以向客户端不断发送消息)

2. 基于长连接(Connection: Keep-Alive, 使用长连接可以减少tcp握手重新连接的开销,降低服务端的qps)

3. 基于多路复用(HTTP/2定义了多路复用的协议, 使得一个链接多可支持多路key的watch,减少server端的连接数)


API server是k8s的一级缓存 (相关配置--watch-cache --default-watch-cache-size, --watch-cache-size)


informer则是k8s的二级缓存



client-go components

官方的controller例子中有对上图中各个模块的说明解释,把原文和加上个人理解记录再次。


https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md



- Reflector: A reflector, which is defined in [type *Reflector* inside package *cache*](https://github.com/kubernetes/client-go/blob/master/tools/cache/reflector.go), watches the Kubernetes API for the specified resource type (kind). The function in which this is done is *ListAndWatch*. The watch could be for an in-built resource or it could be for a custom resource. When the reflector receives notification about existence of new resource instance through the watch API, it gets the newly created object using the corresponding listing API and puts it in the Delta Fifo queue inside the *watchHandler* function.

- 反射器: 通过ListAndWatch实现,既可以watch内置资源,也可以watch CR。当反射器通过watch接口收到资源更新通知,它会通过listing API获取相关对象并将对象压入Deltafifo队列。


- Informer: An informer defined in the [base controller inside package *cache*](https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go) pops objects from the Delta Fifo queue. The function in which this is done is *processLoop*. The job of this base controller is to save the object for later retrieval, and to invoke our controller passing it the object.

- informer:通过processLoop完成,1.缓存对象到Indexer 2.调用controller的钩子


- Indexer: An indexer provides indexing functionality over objects. It is defined in [type *Indexer* inside package *cache*](https://github.com/kubernetes/client-go/blob/master/tools/cache/index.go). A typical indexing use-case is to create an index based on object labels. Indexer can maintain indexes based on several indexing functions. Indexer uses a thread-safe data store to store objects and their keys. There is a default function named *MetaNamespaceKeyFunc* defined in [type Store inside package cache](https://github.com/kubernetes/client-go/blob/master/tools/cache/store.go) that generates an object’s key as `<namespace>/<name>` combination for that object.


- Indexer:通过对象键值进行对象索引,可以加速后续的查找。从DeltaFIFO中将消费出来的资源对象存储到Indexer(当然同时也会调用controller注册的钩子函数),Indexer与ETCD集群中的数据完全保持一致。从而 client-go 可以本地读取,以减少Kubernetes API和ETCD集群的压力。



### SharedInformerFactory


SharedInformerFactory定义的所有k8s内置(基础)的对象接口

// staging/src/k8s.io/client-go/informers/factory.go
// 这里使用了工厂模式
// 对于k8s内置的对象进行了所有对象的定义, 实现都在informers子库中
// 比如Core.V1.Pod的实现路径
// - staging/src/k8s.io/client-go/informers/core/interface.go
// - staging/src/k8s.io/client-go/informers/core/v1/interface.go
// - staging/src/k8s.io/client-go/informers/core/v1/pod.go


// sharedInformerFactory Core().V1().Pods() 接口的实现:
// core.New(f, f.namespace, f.tweakListOptions)
// return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
// return v1.New(g.factory, g.namespace, g.tweakListOptions)
// return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
// return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
// // 提供 根据schema定义的list wather接口


type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool


Admissionregistration() admissionregistration.Interface
Apps() apps.Interface
Auditregistration() auditregistration.Interface
Autoscaling() autoscaling.Interface
Batch() batch.Interface
Certificates() certificates.Interface
Coordination() coordination.Interface
Core() core.Interface
Discovery() discovery.Interface
Events() events.Interface
Extensions() extensions.Interface
Flowcontrol() flowcontrol.Interface
Networking() networking.Interface
Node() node.Interface
Policy() policy.Interface
Rbac() rbac.Interface
Scheduling() scheduling.Interface
Settings() settings.Interface
Storage() storage.Interface
}


所有对象(pod,deploy,volumeattachment等等)的Informer()接口都会调用NewSharedIndexInformer来初始化一个informer, 这个informer会被sharedInformerFactory这个工厂实例使用map做记录,这样一来,一种类型的对象只会产生一个informer实例。这个NewSharedIndexInformer也更像是一个工厂,规定好了接口,然后通过各个对象传入的接口进行工作。



### SharedIndexInformer


上一步我们分析了informer工厂的接口,那么接下来我们继续探究一下这个工厂里的车床是如何工作的。


sharedIndexInformer 通过各个对象传入的接口主要完成3个功能:


1. indexer:完成本地内存cache功能


2. controller:通过listerWatcher拉取监测对象的状态,并把这些状态事件放入deltaFifo中。此外controller还有另外一个routine不断消费deltaFifo,对于每一个事件首先更新本地cache(即更新indexer),其次通过channel扔给sharedProcessor做后一步处理。


3. sharedProcessor: 把对象状态的变化通知给相应注册的client,即controller

// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
// objects/notifications using the ListerWatcher and pushes them into
// a DeltaFIFO --- whose knownObjects is the informer's local cache
// --- while concurrently Popping Deltas values from that fifo and
// processing them with `sharedIndexInformer::HandleDeltas`. Each
// invocation of HandleDeltas, which is done with the fifo's lock
// held, processes each Delta in turn. For each Delta this both
// updates the local cache and stuffs the relevant notification into
// the sharedProcessor. The third main component is that
// sharedProcessor, which is responsible for relaying those
// notifications to each of the informer's clients.
type sharedIndexInformer struct {
indexer Indexer
controller Controller


processor *sharedProcessor
cacheMutationDetector MutationDetector


listerWatcher ListerWatcher


// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime.Object


// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock


started, stopped bool
startedLock sync.Mutex


// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
// staging/src/k8s.io/client-go/tools/cache/controller.go


// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector( // controller的config是shareIndexInformer初始化时设置的
c.config.ListerWatcher, // object本身实现的ListerWatcher
c.config.ObjectType, // 对象类型
c.config.Queue, // 实质是deltaFifo
c.config.FullResyncPeriod, // reysync的周期
)
r.ShouldResync = c.config.ShouldResync // 具体实现时遍历了所有的listner,只要有一个到期就会进行resync
r.clock = c.clock


c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()


var wg wait.Group
defer wg.Wait()


wg.StartWithChannel(stopCh, r.Run) // 启动reflector


wait.Until(c.processLoop, time.Second, stopCh) // processLoop会消费deltaFifo
}


// relector 入口
// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh) // 一旦watch有错, 重新backoff进行listWatch。主要逻辑:1. list(list结束之后会调用syncWith使用Relplace接口把所有的item以replace事件塞给deltaFifo) 2. 启动resync routine 3. 启动watch routine
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}


// deltaFifo消费端
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times
// concurrently.
//
// TODO: Plumb through the stopCh here (and down to the queue) so that this can
// actually exit when the controller is stopped. Or just give up on this stuff
// ever being stoppable. Converting this whole package to use Context would
// also be helpful.
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // c.config.Process也是在shareIndexInformer初始化时设置的,是shareIndexInformer.HandleDeltas
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj) // 这个obj是某一个对象的所有event的list,所以对一批events看上去得像一个原子操作。
}
}
}
}


func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()


// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object) // 默认情况下这个mutationDetector没有启动,是的dummyDetector,什么也没做。 defaultCacheMutationDetector也只是调用了obj的DeepCopy接口,存了一份原始值和一份深拷贝的值,然后周期性的对这两份值做relect.DeepEqual, 有问题就打error log
// 先从indexer中查找否有该对象,有则update,否则add。 都先更新本地cache,然后通知client
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}


isSync := false
switch {
case d.Type == Sync: // resycn是使用Sync类型
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced: // 初始化时也会使用Replaced类型
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) // 通知processor的listeners
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}


func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()


if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}



这里把代码段分割一下,通过`distribute`可以看到根据sync变量分别给`p.syncingListeners``p.listeners`进行消息的分发,接下去再看代码,发现这两个listener本质就是同一个,但从组件分割上的逻辑来看的确是一个不错的想法,如果今后对不通消息需要不通listener处理,就可以只在sharedProcessor层面进行修改。

func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()


p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
// 发现这两个listener已经同流合污,就是同一个实例
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}


而listeners是如何添加的呢?是client通过调用`func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler)`加入的。

// ResourceEventHandler can handle notifications for events that happen to a
// resource. The events are informational only, so you can't return an
// error.
// * OnAdd is called when an object is added.
// * OnUpdate is called when an object is modified. Note that oldObj is the
// last known state of the object-- it is possible that several changes
// were combined together, so you can't use this to see every single
// change. OnUpdate is also called when a re-list happens, and it will
// get called even if nothing changed. This is useful for periodically
// evaluating or syncing something.
// * OnDelete will get the final state of the item if it is known, otherwise
// it will get an object of type DeletedFinalStateUnknown. This can
// happen if the watch is closed and misses the delete event and we don't
// notice the deletion until the subsequent re-list.
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}


func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}


func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()


if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}


// 一个informer会share同一个resyncPeriod,以小值为准
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}


if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}


// 新建listener
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)


// 如果informer还没有启动,则之间添加listener
if !s.started {
s.processor.addListener(listener)
return
}


// 如果informer已经启动,那么会暂停deltaFifo的消费,保证时序的一致性
// 本地cache中的所有item都以add方式先推送给新的listener,然后再重新恢复deltaFifo的消费
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()


s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}


func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run) // 启动一个routine 消费 listener的nextCh
p.wg.Start(listener.pop) // 启动一个routine 消费 listener的addCh(通过informer的调用processor.distribute再调用每一个listener.add进行生产。通过addCh 接受消息,使用 pendingNotifications(大小为initialBufferSize,1024)bufferRing向nextCh注入消息。)
}
p.listenersStarted = true // 对于后加入的listener会根据这个flag决定是都要启动上面两个routinue(run,pop)
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}




简单提一下controllermanager是如何初始化的

//cmd/kube-controller-manager/app/controllermanager.go


func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
......
informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)()) // controller-manager初始化sharedInformer入口
......
}


StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux) // NewControllerInitializers会初始化所有的controller, 此时会调用Informer()接口进行真正的实例化(Tips:如果不使用controller,而只是简单的使用lister,由于lister的indexer也是从informer中获取,因此此时也会实例化informer。)。
controllerContext.InformerFactory.Start(controllerContext.Stop) // 启动入口, 所有的informer的Run接口会被调用,此时会初始化这些informer,如果此时没有informer注册(调用),那么啥也不干。直到有client调用 类似sharedInformer.Core().V1().Pods().Informer() 进行实例化时,在调用Informer()接口时才会进行真正的初始化informer。 (Tips:对一个factory新增加了informer,需要重新调用factory的Start方法来启动对应的informer)






func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {


// 初始化sharedIndexInformer 3件套以及fifo
// 启动reflector processor等等




至此,informer的cache的整体流程差不多分析完了,下面我们再看下内部的两个关键数据结构indexer和deltaFifo

//staging/src/k8s.io/client-go/tools/cache/index.go


// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String // 索引值-> value e.g. idxVal(kube-system) -> sets of matched objs keys


// Indexers maps a name to a IndexFunc
type Indexers map[string]IndexFunc // 类型->获取该类型的索引值的func e.g. "namespace"-> idxVal=getNameSpace(obj), e.g. idxVal = kube-system


// Indices maps a name to an Index
type Indices map[string]Index // 类型->该类型的索引 e.g. "namespace" -> Index of "namespace"


//staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{} // 以key为索引的hash表


// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}




//staging/src/k8s.io/client-go/tools/cache/delta_fifo.go


// DeltaFIFO is like FIFO, but differs in two ways. One is that the
// accumulator associated with a given object's key is not that object
// but rather a Deltas, which is a slice of Delta values for that
// object. Applying an object to a Deltas means to append a Delta
// except when the potentially appended Delta is a Deleted and the
// Deltas already ends with a Deleted. In that case the Deltas does
// not grow, although the terminal Deleted will be replaced by the new
// Deleted if the older Deleted's object is a
// DeletedFinalStateUnknown.
//
// The other difference is that DeltaFIFO has an additional way that
// an object can be applied to an accumulator, called Sync.
//
// DeltaFIFO is a producer-consumer queue, where a Reflector is
// intended to be the producer, and the consumer is whatever calls
// the Pop() method.
//
// DeltaFIFO solves this use case:
// * You want to process every object change (delta) at most once.
// * When you process an object, you want to see everything
// that's happened to it since you last processed it.
// * You want to process the deletion of some of the objects.
// * You might want to periodically reprocess objects.
//
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
// interface{} to satisfy the Store/Queue interfaces, but they
// will always return an object of type Deltas.
//
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
// to list Store keys and to get objects by Store key. The objects in
// question are called "known objects" and this set of objects
// modifies the behavior of the Delete, Replace, and Resync methods
// (each in a different way).
//
// A note on threading: If you call Pop() in parallel from multiple
// threads, you could end up with multiple threads processing slightly
// different versions of the same object.
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond // 用来通知进行Pop


// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas // 根据key(即不同的对象,以pod为例,就是pod的namespace/podname)把所有的event作为Deltas(slice列表)
queue []string // 根据先进先出原则,存放key


// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int


// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc


// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter // 实际会指向shareIndexInformer的indexer


// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex


// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool // 目前shareIndexInformer使用是初始化为true
}



https://github.com/cloudnativeto/sig-kubernetes/issues/11

https://cloudnative.to/blog/client-go-informer-source-code/

https://cloudnative.to/blog/client-go-informer/

https://cloudnative.to/blog/client-go-informer-arch/


文章转载自工程师站台,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论