我们都知道,controller-manager, scheduler, kubelet 会向 apiserver 监听感兴趣的对象,当监听对象的内容或状态发生变化后,对应的事件会立即推送到监听者。借由这套事件通知机制,kubernetes 才能良好地运转。那么这套事件通知机制是如何实现并驱动的呢?
1. etcd
在 k8s 中,apiserver 是 etcd 数据的出入口,其他组件通过 apiserver 提供的接口访问 etcd 中存储的数据,etcd 提供了 watch api,当 watch 的 key 发生变化后,etcd 会立即通知客户端。
开启一个终端监听 stock1 的 key
etcdctl watch stock1
开启一个终端设值
etcdctl put stock1 1000
开启一个终端监听前缀为 stock 的 key
etcdctl watch stock --prefix
开启一个终端设值
etcdctl put stock1 10 etcdctl put stock2 20
通过上面 2 个示例,我们大致了解了 etcd watch 的用法,可以猜想到 apisever 也是通过 watch api 监听 etcd 中存储的 pod, depolyment 等数据,etcd 将变化的数据推送到 apiserver,然后 apiserver 将这些数据分发给其他系统组件。
2. apiserver 之于 etcd
从某种角度看,apiserver 之于 etcd,犹如 mybatis 之于 mysql,本文借助一个测试用例描述 apiserver 是如何 watch 数据的。
// staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go:134 func TestDeleteTriggerWatch(t *testing.T) { // 启动 etcd ctx, store, _ := testSetup(t) // 写入一个 pod 数据 key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) // 开始监听 w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } // 删除 pod 数据 if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); err != nil { t.Fatalf("Delete failed: %v", err) } // 检测收到删除的事件 testCheckEventType(t, watch.Deleted, w) }
测试用例的流程如下:
对应的数据结构如下图:
代码执行流如下图:
上图中,事件交互的起点在 watch 和 run,详细分析源码:1 // startWatching does: 2 // - get current objects if initialRev=0; set initialRev to current rev 3 // - watch on given key and send events to process. 4 func (wc *watchChan) startWatching(watchClosedCh chan struct{}) { 5 if wc.initialRev == 0 { 6 if err := wc.sync(); err != nil { 7 klog.Errorf("failed to sync with latest state: %v", err) 8 wc.sendError(err) 9 return 10 } 11 } 12 opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()} 13 if wc.recursive { 14 opts = append(opts, clientv3.WithPrefix()) 15 } 16 if wc.progressNotify { 17 opts = append(opts, clientv3.WithProgressNotify()) 18 } 19 wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...) 20 for wres := range wch { 21 if wres.Err() != nil { 22 err := wres.Err() 23 // If there is an error on server (e.g. compaction), the channel will return it before closed. 24 logWatchChannelErr(err) 25 wc.sendError(err) 26 return 27 } 28 if wres.IsProgressNotify() { 29 wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision())) 30 metrics.RecordEtcdBookmark(wc.watcher.objectType) 31 continue 32 } 33 34 for _, e := range wres.Events { 35 parsedEvent, err := parseEvent(e) 36 if err != nil { 37 logWatchChannelErr(err) 38 wc.sendError(err) 39 return 40 } 41 wc.sendEvent(parsedEvent) 42 } 43 } 44 // When we come to this point, it's only possible that client side ends the watch. 45 // e.g. cancel the context, close the client. 46 // If this watch chan is broken and context isn't cancelled, other goroutines will still hang. 47 // We should notify the main thread that this goroutine has exited. 48 close(watchClosedCh) 49 }
在 19 行 watch 指定的 key,然后 for 循环遍历只读的 channel,并解析 etcd 传回的事件。
1 func (wc *watchChan) sendEvent(e *event) { 2 if len(wc.incomingEventChan) == incomingBufSize { 3 klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType) 4 } 5 select { 6 case wc.incomingEventChan <- e: 7 case <-wc.ctx.Done(): 8 } 9 } 10 11 // processEvent processes events from etcd watcher and sends results to resultChan. 12 func (wc *watchChan) processEvent(wg *sync.WaitGroup) { 13 defer wg.Done() 14 15 for { 16 select { 17 case e := <-wc.incomingEventChan: 18 res := wc.transform(e) 19 if res == nil { 20 continue 21 } 22 if len(wc.resultChan) == outgoingBufSize { 23 klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType) 24 } 25 // If user couldn't receive results fast enough, we also block incoming events from watcher. 26 // Because storing events in local will cause more memory usage. 27 // The worst case would be closing the fast watcher. 28 select { 29 case wc.resultChan <- *res: 30 case <-wc.ctx.Done(): 31 return 32 } 33 case <-wc.ctx.Done(): 34 return 35 } 36 } 37 }
事件流经 incomingEventChan 后最终来到了 resultChan,能想象到,如果谁对事件感兴趣,直接消费 resultChan 这个 channel 即可。
1 // watchChan implements watch.Interface. 2 type watchChan struct { 3 // 省略其他属性 4 watcher *watcher 5 incomingEventChan chan *event 6 resultChan chan watch.Event 7 errChan chan error 8 } 9 10 func (wc *watchChan) ResultChan() <-chan watch.Event { 11 return wc.resultChan 12 }
ResultChan() 方法返回了 resultChan,因此在这个方法中断点,就能知道谁对这个事件感兴趣,于是引出了 cacher 数据结构。
3. cacher
终于到了缓存,apiserver 在内存中对数据进行了缓存,避免过度读 etcd。以 "kubectl get namespaces" 为例, 在 apiserver 读取 namespace 缓存的代码处断点,调用栈如下:
从 GetList 的注释可以看出,当指定了 resource version 后,走缓存读取数据,保证返回值的版本大于等于指定的版本,否则仍然是读取 etcd。继续看缓存的数据结构,调用栈如下,结合调用栈,我们清晰地看到所谓缓存,最终也是一个 map。
OK 缓存的运行时代码走读到此为止,我们继续看缓存的创建,经过一番探查,定位到了缓存的创建时机点。
// vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:37 var ret RESTOptions ret.Decorator = genericregistry.StorageWithCacher()
4. watch
打开一个终端,执行命令 "kubectl get namespaces -w",则对 namespaces 进行了 list 和 watch,当 namepaces 发生变化时,变化会推送到终端,这是怎么实现的呢?
运行时的事件传播机制非常繁琐,一环套一环,简洁地讲,cacher 需要消费第3节中提到的 watchChan.resultChan,接收到来自 etcd 的事件,然后将事件传播给 cacheWatcher, 最后 http handler 处理该事件以 http chunked 的形式将事件转换后发送给 http 客户端。
打开一个终端 watch namespaces,开另一个终端创建新的 namespace,在关键代码处加上条件断点(event.Type == "ADDED"),通过走读源代码发现:cacher 消费了 watchChan.resultChan,完整的事件流路径为:
watchChan.resultChan -> Cacher.incoming -> cacheWatcher.input -> cacheWatcher.result
读取 watchChan.resultChan channel 的方法调用栈如下:
接下来的事件流转如下:
1 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:749 2 func (c *Cacher) processEvent(event *watchCacheEvent) { 3 if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) { 4 // Monitor if this gets backed up, and how much. 5 klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen) 6 } 7 c.incoming <- *event 8 } 9 10 // 中间经过 dispatchEvents 11 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:757 12 13 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1204 14 func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { 15 select { 16 case c.input <- event: 17 return true 18 default: 19 return false 20 } 21 } 22 23 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1415 24 func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { 25 // At this point we already start processing incoming watch events. 26 // However, the init event can still be processed because their serialization 27 // and sending to the client happens asynchrnously. 28 // TODO: As describe in the KEP, we would like to estimate that by delaying 29 // the initialization signal proportionally to the number of events to 30 // process, but we're leaving this to the tuning phase. 31 utilflowcontrol.WatchInitialized(ctx) 32 33 for { 34 select { 35 case event, ok := <-c.input: 36 if !ok { 37 return 38 } 39 // only send events newer than resourceVersion 40 if event.ResourceVersion > resourceVersion { 41 c.sendWatchCacheEvent(event) 42 } 43 case <-ctx.Done(): 44 return 45 } 46 } 47 } 48 49 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1319 50 // NOTE: sendWatchCacheEvent is assumed to not modify <event> !!! 51 func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { 52 watchEvent := c.convertToWatchEvent(event) 53 if watchEvent == nil { 54 // Watcher is not interested in that object. 55 return 56 } 57 58 // We need to ensure that if we put event X to the c.result, all 59 // previous events were already put into it before, no matter whether 60 // c.done is close or not. 61 // Thus we cannot simply select from c.done and c.result and this 62 // would give us non-determinism. 63 // At the same time, we don't want to block infinitely on putting 64 // to c.result, when c.done is already closed. 65 // 66 // This ensures that with c.done already close, we at most once go 67 // into the next select after this. With that, no matter which 68 // statement we choose there, we will deliver only consecutive 69 // events. 70 select { 71 case <-c.done: 72 return 73 default: 74 } 75 76 select { 77 case c.result <- *watchEvent: 78 case <-c.done: 79 } 80 } 81 82 // Implements watch.Interface. 83 func (c *cacheWatcher) ResultChan() <-chan watch.Event { 84 return c.result 85 }
执行上面的代码后,事件来到了 cacheWatcher.result,在 http handler 中消费 cacheWatcher.result 中的事件,将变化事件发送给 http 客户端。
1 // vendor/k8s.io/apiserver/pkg/endpoints/handlers/watch.go:164 2 // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked 3 // or over a websocket connection. 4 func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { 5 kind := s.Scope.Kind 6 metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() 7 defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() 8 9 if wsstream.IsWebSocketRequest(req) { 10 w.Header().Set("Content-Type", s.MediaType) 11 websocket.Handler(s.HandleWS).ServeHTTP(w, req) 12 return 13 } 14 15 flusher, ok := w.(http.Flusher) 16 if !ok { 17 err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) 18 utilruntime.HandleError(err) 19 s.Scope.err(errors.NewInternalError(err), w, req) 20 return 21 } 22 23 framer := s.Framer.NewFrameWriter(w) 24 if framer == nil { 25 // programmer error 26 err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType) 27 utilruntime.HandleError(err) 28 s.Scope.err(errors.NewBadRequest(err.Error()), w, req) 29 return 30 } 31 32 var e streaming.Encoder 33 var memoryAllocator runtime.MemoryAllocator 34 35 if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator { 36 memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) 37 defer runtime.AllocatorPool.Put(memoryAllocator) 38 e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator) 39 } else { 40 e = streaming.NewEncoder(framer, s.Encoder) 41 } 42 43 // ensure the connection times out 44 timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh() 45 defer cleanup() 46 47 // begin the stream 48 w.Header().Set("Content-Type", s.MediaType) 49 w.Header().Set("Transfer-Encoding", "chunked") 50 w.WriteHeader(http.StatusOK) 51 flusher.Flush() 52 53 var unknown runtime.Unknown 54 internalEvent := &metav1.InternalEvent{} 55 outEvent := &metav1.WatchEvent{} 56 buf := &bytes.Buffer{} 57 ch := s.Watching.ResultChan() 58 done := req.Context().Done() 59 60 embeddedEncodeFn := s.EmbeddedEncoder.Encode 61 if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator { 62 if memoryAllocator == nil { 63 // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call. 64 // instead, we allocate the buffer for the entire watch session and release it when we close the connection. 65 memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator) 66 defer runtime.AllocatorPool.Put(memoryAllocator) 67 } 68 embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error { 69 return encoder.EncodeWithAllocator(obj, w, memoryAllocator) 70 } 71 } 72 73 for { 74 select { 75 case <-done: 76 return 77 case <-timeoutCh: 78 return 79 case event, ok := <-ch: 80 if !ok { 81 // End of results. 82 return 83 } 84 metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() 85 86 obj := s.Fixup(event.Object) 87 if err := embeddedEncodeFn(obj, buf); err != nil { 88 // unexpected error 89 utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err)) 90 return 91 } 92 93 // ContentType is not required here because we are defaulting to the serializer 94 // type 95 unknown.Raw = buf.Bytes() 96 event.Object = &unknown 97 metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) 98 99 *outEvent = metav1.WatchEvent{} 100 101 // create the external type directly and encode it. Clients will only recognize the serialization we provide. 102 // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way 103 // and we get the benefit of using conversion functions which already have to stay in sync 104 *internalEvent = metav1.InternalEvent(event) 105 err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil) 106 if err != nil { 107 utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err)) 108 // client disconnect. 109 return 110 } 111 if err := e.Encode(outEvent); err != nil { 112 utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e)) 113 // client disconnect. 114 return 115 } 116 if len(ch) == 0 { 117 flusher.Flush() 118 } 119 120 buf.Reset() 121 } 122 } 123 }
对应的调用栈如下图,清楚地看到:
// s.Watching.ResultChan() 即 cacheWatcher.result ch := s.Watching.ResultChan()
总结,以上的这些过程可以用下图表示:
客户端发起 list and watch,本质是一个 http get 请求,watch 参数为 true,具体代码逻辑在
// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go:169 func ListResource(...)
客户端和 apiserver 之间的使用的是 http 长连接,apiserver 设置响应头 Transfer-Encoding 为 chunked,并保持住连接,apiserver 首先把 list 数据返回给客户端后,后续变化的事件以 chunked 块推送给客户端,这是一种长轮询的解法。
笔记发布在公众号上,没什么人看,链接:https://mp.weixin.qq.com/s?__biz=MzA3ODQwMDk5Mg==&mid=2247483730&idx=1&sn=02e7f5db7697b4a674371ad16272b67b&chksm=9f421ddda83594cb6aa89801dc62a2d22193e264ceefd0c039c301ca64a63265e8d36bf05620&token=1311797829&lang=zh_CN#rd
标签:wc,kubernetes,err,list,watch,apiserver,etcd,http From: https://www.cnblogs.com/allenwas3/p/17551336.html