作者:段朦, 中国移动云能力中心软件开发工程师,专注于云原生领域。
01
kubeapiserver对etcd的listwatch机制
说到kube-apiserver对etcd的list-watch,不得不提到一个关键的struct:cacher。为了减轻etcd的压力,kube-apiserver本身对etcd实现了list-watch机制,将所有对象的最新状态和最近的事件存放到cacher里,所有外部组件对资源的访问都经过cacher。我们看下cacher的数据结构(为了篇幅考虑,这里保留了几个关键的子结构):
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type Cacher struct {
// incoming 事件管道, 会被分发给所有的watchers
incoming chan watchCacheEvent
//storage 的底层实现
storage storage.Interface
// 对象类型
objectType reflect.Type
// watchCache 滑动窗口,维护了当前kind的所有的资源,和一个基于滑动窗口的最近的事件数组
watchCache *watchCache
// reflector list并watch etcd 并将事件和资源存到watchCache中
reflector *cache.Reflector
// watchersBuffer 代表着所有client-go客户端跟apiserver的连接
watchersBuffer []*cacheWatcher
....
}
下面看下cacher的创建过程
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func NewCacherFromConfig(config Config) (*Cacher, error) {
...
cacher := &Cacher{
...
incoming: make(chan watchCacheEvent, 100),
...
}
...
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, objType)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
cacher.watchCache = watchCache
cacher.reflector = reflector
go cacher.dispatchEvents() // 1
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh) // 2
}
}, time.Second, stopCh,
)
}()
return cacher, nil
}
可以看到,在创建cacher的时候,也创建了watchCache(用于保存事件和所有资源)和reflactor(执行对etcd的list-watch并更新watchCache)。创建cacher的时候同时开启了两个协程,注释1 处cacher.dispatchEvents()用于从cacher的incoming管道里获取事件,并放到cacheWatcher的input里。
处理逻辑可以看下面两段代码
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) dispatchEvents() {
...
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
if event.Type != watch.Bookmark {
// 从incoming通道中获取事件,并发送给交给dispatchEvent方法处理
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
metrics.EventsCounter.WithLabelValues(c.objectType.String()).Inc()
...
case <-c.stopCh:
return
}
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
wcEvent := *event
setCachingObjects(&wcEvent, c.versioner)
event = &wcEvent
c.blockedWatchers = c.blockedWatchers[:0]
// watchersBuffer 是一个数组,维护着所有client-go跟apiserver的watch连接,产生的cacheWatcher。
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
c.blockedWatchers = append(c.blockedWatchers, watcher)
}
}
...
}
}
watchersBuffer 是一个数组,维护着所有client-go跟apiserver的watch连接产生的cacheWatcher,因此CacheWatcher跟发起watch请求的client-go的客户端是一对一的关系。当apiserver收到一个etcd的事件之后,会将这个事件发送到所有的cacheWatcher的input channel里。
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
select {
case c.input <- event:
return true
default:
return false
}
}
cacherWatcher的struct结构如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event
done chan struct{}
filter filterWithAttrsFunc
stopped bool
forget func()
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
// Object type of the cache watcher interests
objectType reflect.Type
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
}
可以看到,cacherWatcher不用于存储数据,只是实现了watch接口,并且维护了两个channel,input channel用于获取从cacher中的incoming通道中的事件,result channel 用于跟client-go的客户端交互,客户端的informer发起watch请求后,会从这个chanel里获取事件进行后续的处理。
注释2处开启了另外一个协程,cacher.startCaching(stopCh) ,实际上调用了cacher的reflector的listAndWatch方法,这里的reflector跟informer的reflector一样,list方法是获取etcd里的所有资源并对reflector的store做一次整体的replace替换,这里的store就是上面说的watchCache,watchCache实现了store接口,watch方法是watch etcd的资源,并从watcher的resultChan里拿到事件,根据事件的类型,调用watchCache的add,update,或delete方法。startCaching 执行对etcd的listAndWatch
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
...
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.objectType.String(), err)
}
}
reflector的list方法里的syncWith方法将list得到的结果替换放到watchCache里
staging/src/k8s.io/client-go/tools/cache/reflector.go
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
reflector的list方法里的watchHandler函数传入watch etcd得到的watcher和store(即watchCache),并根据watcher的resultChan通道里收到的事件类型执行watchCache相应的方法(Add,Delete,Update)。
staging/src/k8s.io/client-go/tools/cache/reflector.go
func watchHandler(start time.Time,
w watch.Interface,
store Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
clock clock.Clock,
errc chan error,
stopCh <-chan struct{},
) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if expectedType != nil {
if e, a := expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))
continue
}
}
if expectedGVK != nil {
if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))
continue
}
resourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
...
setLastSyncResourceVersion(resourceVersion)
if rvu, ok := store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(resourceVersion)
}
eventCount++
}
}
}
上文说到,reflector执行ListAndWatch更新watchCache保存的资源数据,下面看下watchCache的replace和add 方法,看下reflector是如何操作watchCache保存的资源的。
replace 执行了watchCache的store的replace方法,store是threadSafeMap的实现,实际上更新了底层的threadSafeMap,用于当前资源的所有实例。
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
...
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
...
}
add方法同样更新了底层了threadSafeMap,同时执行了一个processEvent 方法,上文说到watchCache维护了一个基于事件的数组[]*watchCacheEvent,数组采用滑动窗口算法,长度固定为100,processEvent 会一直更新这个数组,后面的事件会挤掉最前面的事件,代码如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
...
if err := func() error {
// TODO: We should consider moving this lock below after the watchCacheEvent
// is created. In such situation, the only problematic scenario is Replace(
// happening after getting object from store and before acquiring a lock.
// Maybe introduce another lock for this purpose.
w.Lock()
defer w.Unlock()
w.updateCache(wcEvent)
w.resourceVersion = resourceVersion
defer w.cond.Broadcast()
...
return updateFunc(elem)
}(); err != nil {
return err
}
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
}
staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go
func (w *watchCache) updateCache(event *watchCacheEvent) {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
至此,cacher 创建时创建的两个协程处理过程分析完了,我们做下简单的总结,创建cacher的时候开启了两个协程,第一个协程从cacher的incoming 通道里取出事件放到cacheWatcher的input通道里,而cacheWatcher是本地客户端创建一个watch请求都会生成一个,这个我们下一章再说。另外一个协程主要做的事情就是reflector执行listAndWatch 方法并更新cacher里的watchCache,具体的来说,就是更新watchCache里的基于滑动窗口算法的事件数组和维护当前kind的资源的所有实例的treadSafeMap。这里还有两个点我们没有明确:1.cacher是什么时候及谁创建的 2.cacher的incoming 通道里的事件是哪里来的,这个通道里的时间跟reflector的listAndWatch方法里的执行对etcd的watch请求的watcher的通道里事件是否同步?带着这些问题,我们继续看下代码,第一个问题,可以看到apiserver在创建storage的时候创建了cacher,说明apiserver在GVK注册到apiserver的时候就创建了相应资源的cacher,这里调用链太深,因此不贴代码了。第二个问题,我们先看下incoming通道里事件是如何来的,注意这里是cacher的processEvent 方法处理的。
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
}
c.incoming <- *event
}
看下这个方法是哪里用到的,由上面的NewCacherFromConfig方法可以看到是创建cacher的时候,创建watchCache的时候传入的。watchCache定义了一个eventHandler 用于处理listAndWatch收到的事件,由上面的代码 watchCache 的processEvent方法可以看到,在更新watchCache之后,会根据是否有eventHandler 执行eventHandler的func,即上面的cacher的processEvent。至此,第二个问题也变的很清晰,cacher的incoming 通道里的事件是watch etcd收到的事件更新watchCache之后处理的。这一章讲了apiserver对etcd的list和watch机制,apiserver收到事件之后本身做了缓存,并将事件发送给cacheWatcher的input通道里,由cacherWatcher处理跟客户端的连接,下一章我们讲一下本地客户端跟apiserver的watch机制的实现。
02
客户端对apiserver的watch机制实现
apiserver对list接口增加了一个watch参数,客户端可以向apiserver通过增加一个watch=true 参数发起watch请求,https://{host:6443}/apis/apps/v1/namespaces/default/deployments?watch=true
apiserver 的hander 在解析到watch参数为true时,进行watch请求的处理
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
...
if opts.Watch || forceWatch {
if rw == nil {
...
watcher, err := rw.Watch(ctx, &opts)
...
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
...
}
}
可以看到,当客户端发起watch请求时,实际上调用了watcher的watch接口,这里的watcher实际上是watch接口的实现,apiserver根据url的路径参数,针对不同的watch请求强转为不同类型的watcher实现,k8s的内置资源大都继承了REST结构体,他的底层storage就是cacher,因此这里实际上就是调用了cacher的watch方法,在看一下serveWatch的实现
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go
func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions negotiation.MediaTypeOptions, req *http.Request, w http.ResponseWriter, timeout time.Duration) {
...
server := &WatchServer{
Watching: watcher,
Scope: scope,
UseTextFraming: useTextFraming,
MediaType: mediaType,
Framer: framer,
Encoder: encoder,
EmbeddedEncoder: embeddedEncoder,
Fixup: func(obj runtime.Object) runtime.Object {
result, err := transformObject(ctx, obj, options, mediaTypeOptions, scope, req)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to transform object %v: %v", reflect.TypeOf(obj), err))
return obj
}
// When we are transformed to a table, use the table options as the state for whether we
// should print headers - on watch, we only want to print table headers on the first object
// and omit them on subsequent events.
if tableOptions, ok := options.(*metav1.TableOptions); ok {
tableOptions.NoHeaders = true
}
return result
},
TimeoutFactory: &realTimeoutFactory{timeout},
}
server.ServeHTTP(w, req)
}
创建了一个watchServer,并执行watchServer的ServeHTTP方法,看一下ServeHTTP的实现
staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kind := s.Scope.Kind
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.MediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
utilruntime.HandleError(err)
s.Scope.err(errors.NewInternalError(err), w, req)
return
}
framer := s.Framer.NewFrameWriter(w)
if framer == nil {
// programmer error
err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
utilruntime.HandleError(err)
s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
var e streaming.Encoder
var memoryAllocator runtime.MemoryAllocator
if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
} else {
e = streaming.NewEncoder(framer, s.Encoder)
}
// ensure the connection times out
timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
defer cleanup()
// begin the stream
w.Header().Set("Content-Type", s.MediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
var unknown runtime.Unknown
internalEvent := &metav1.InternalEvent{}
outEvent := &metav1.WatchEvent{}
buf := &bytes.Buffer{}
ch := s.Watching.ResultChan()
done := req.Context().Done()
embeddedEncodeFn := s.EmbeddedEncoder.Encode
if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
defer runtime.AllocatorPool.Put(memoryAllocator)
}
embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
}
}
for {
select {
case <-done:
return
case <-timeoutCh:
return
case event, ok := <-ch:
if !ok {
// End of results.
return
}
metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
obj := s.Fixup(event.Object)
if err := embeddedEncodeFn(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
return
}
// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown
metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
*outEvent = metav1.WatchEvent{}
// create the external type directly and encode it. Clients will only recognize the serialization we provide.
// The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
// and we get the benefit of using conversion functions which already have to stay in sync
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
// client disconnect.
return
}
if err := e.Encode(outEvent); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
// client disconnect.
return
}
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
}
}
可以看到,这里主要就是处理长连接发送给客户端的事件,读取watcher的resultChan里的事件,持续不断的放到http response的流当中,如果客户端发起的是websocket请求,则直接处理watcher的resultChan里的事件,如果是正常的http请求则需要修改请求头建立http 1.1 的长连接。
上面说到,客户端发起watch请求时,apiserver实际上调用的是cacher的Watch方法,下面看一下Watch方法
> staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
...
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType, identifier)
...
cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
return watcher, nil
}
可以看到,当客户端发起watch请求时,apiserver调用cacher的watch方法的时候创建了CacheWatcher,因此客户端的watch请求和cachWatcher是一一对应的。cacheInterval, err := c.watchCache.getAllEventsSinceLocked(watchRV) 是指根据客户端传过来的resourceVersion 获取watchCache滑动窗口里大于当前resourceVersion的事件,并发送给后续的协程go watcher.processInterval(ctx, cacheInterval, watchRV) 处理,防止客户端的watch连接断开可能导致的事件丢失。go watcher.processInterval(ctx, cacheInterval, watchRV) 协程中将首次watch时滑动窗口中的事件和后续watch input通道中收到的事件放到cacheWatcher的resultChan里。代码如下
staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, resourceVersion uint64) {
...
initEventCount := 0
/* 首次watch 获取cacheInterval 的事件并发送到resultChan*/
for {
event, err := cacheInterval.Next()
if err != nil {
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
resourceVersion = event.ResourceVersion
initEventCount++
}
/* 后续建立watch连接之后,将input通道中的事件发送到resultChan*/
c.process(ctx, resourceVersion)
}
至此,k8s的apiserver对etcd的list 和watch 以及对客户端的list watch 处理逻辑完成了闭环,我们可以用一张图表示
参考文档:
https://github.com/kubernetes/kubernetes
▲ 点击上方卡片关注K8s技术圈,掌握前沿云原生技术