首页 > 其他分享 >kubernetes 实现 list-watch 的底层原理

kubernetes 实现 list-watch 的底层原理

时间:2023-07-13 16:46:53浏览次数:40  
标签:wc kubernetes err list watch apiserver etcd http

我们都知道,controller-manager, scheduler, kubelet 会向 apiserver 监听感兴趣的对象,当监听对象的内容或状态发生变化后,对应的事件会立即推送到监听者。借由这套事件通知机制,kubernetes 才能良好地运转。那么这套事件通知机制是如何实现并驱动的呢?

1. etcd

在 k8s 中,apiserver 是 etcd 数据的出入口,其他组件通过 apiserver 提供的接口访问 etcd 中存储的数据,etcd 提供了 watch api,当 watch 的 key 发生变化后,etcd 会立即通知客户端。

开启一个终端监听 stock1 的 key

etcdctl watch stock1

开启一个终端设值

etcdctl put stock1 1000

 

 开启一个终端监听前缀为 stock 的 key

etcdctl watch stock --prefix

开启一个终端设值

etcdctl put stock1 10
etcdctl put stock2 20

 

通过上面 2 个示例,我们大致了解了 etcd watch 的用法,可以猜想到 apisever 也是通过 watch api 监听 etcd 中存储的 pod, depolyment 等数据,etcd 将变化的数据推送到 apiserver,然后 apiserver 将这些数据分发给其他系统组件。

2. apiserver 之于 etcd

从某种角度看,apiserver 之于 etcd,犹如 mybatis 之于 mysql,本文借助一个测试用例描述 apiserver 是如何 watch 数据的。

// staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go:134
func TestDeleteTriggerWatch(t *testing.T) {
  // 启动 etcd
  ctx, store, _ := testSetup(t)
  // 写入一个 pod 数据
  key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
  // 开始监听
  w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
  if err != nil {
    t.Fatalf("Watch failed: %v", err)
  }
  // 删除 pod 数据
  if err := store.Delete(ctx, key, &example.Pod{}, nil, storage.ValidateAllObjectFunc, nil); err != nil {
    t.Fatalf("Delete failed: %v", err)
  }
  // 检测收到删除的事件
  testCheckEventType(t, watch.Deleted, w)
}

测试用例的流程如下:

 

 对应的数据结构如下图:

 

 代码执行流如下图:

  上图中,事件交互的起点在 watch 和 run,详细分析源码:
 1 //  startWatching does:
 2 // - get current objects if initialRev=0; set initialRev to current rev
 3 // - watch on given key and send events to process.
 4 func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
 5   if wc.initialRev == 0 {
 6     if err := wc.sync(); err != nil {
 7       klog.Errorf("failed to sync with latest state: %v", err)
 8       wc.sendError(err)
 9       return
10     }
11   }
12   opts := []clientv3.OpOption{clientv3.WithRev(wc.initialRev + 1), clientv3.WithPrevKV()}
13   if wc.recursive {
14     opts = append(opts, clientv3.WithPrefix())
15   }
16   if wc.progressNotify {
17     opts = append(opts, clientv3.WithProgressNotify())
18   }
19   wch := wc.watcher.client.Watch(wc.ctx, wc.key, opts...)
20   for wres := range wch {
21     if wres.Err() != nil {
22       err := wres.Err()
23       // If there is an error on server (e.g. compaction), the channel will return it before closed.
24       logWatchChannelErr(err)
25       wc.sendError(err)
26       return
27     }
28     if wres.IsProgressNotify() {
29       wc.sendEvent(progressNotifyEvent(wres.Header.GetRevision()))
30       metrics.RecordEtcdBookmark(wc.watcher.objectType)
31       continue
32     }
33 
34     for _, e := range wres.Events {
35       parsedEvent, err := parseEvent(e)
36       if err != nil {
37         logWatchChannelErr(err)
38         wc.sendError(err)
39         return
40       }
41       wc.sendEvent(parsedEvent)
42     }
43   }
44   // When we come to this point, it's only possible that client side ends the watch.
45   // e.g. cancel the context, close the client.
46   // If this watch chan is broken and context isn't cancelled, other goroutines will still hang.
47   // We should notify the main thread that this goroutine has exited.
48   close(watchClosedCh)
49 }

在 19 行 watch 指定的 key,然后 for 循环遍历只读的 channel,并解析 etcd 传回的事件。

 1 func (wc *watchChan) sendEvent(e *event) {
 2   if len(wc.incomingEventChan) == incomingBufSize {
 3     klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow decoding, user not receiving fast, or other processing logic", "incomingEvents", incomingBufSize, "objectType", wc.watcher.objectType)
 4   }
 5   select {
 6   case wc.incomingEventChan <- e:
 7   case <-wc.ctx.Done():
 8   }
 9 }
10 
11 // processEvent processes events from etcd watcher and sends results to resultChan.
12 func (wc *watchChan) processEvent(wg *sync.WaitGroup) {
13   defer wg.Done()
14 
15   for {
16     select {
17     case e := <-wc.incomingEventChan:
18       res := wc.transform(e)
19       if res == nil {
20         continue
21       }
22       if len(wc.resultChan) == outgoingBufSize {
23         klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", wc.watcher.objectType)
24       }
25       // If user couldn't receive results fast enough, we also block incoming events from watcher.
26       // Because storing events in local will cause more memory usage.
27       // The worst case would be closing the fast watcher.
28       select {
29       case wc.resultChan <- *res:
30       case <-wc.ctx.Done():
31         return
32       }
33     case <-wc.ctx.Done():
34       return
35     }
36   }
37 }

事件流经 incomingEventChan 后最终来到了 resultChan,能想象到,如果谁对事件感兴趣,直接消费 resultChan 这个 channel 即可。

 1 // watchChan implements watch.Interface.
 2 type watchChan struct {
 3   // 省略其他属性
 4   watcher           *watcher
 5   incomingEventChan chan *event
 6   resultChan        chan watch.Event
 7   errChan           chan error
 8 }
 9 
10 func (wc *watchChan) ResultChan() <-chan watch.Event {
11   return wc.resultChan
12 }

ResultChan() 方法返回了 resultChan,因此在这个方法中断点,就能知道谁对这个事件感兴趣,于是引出了 cacher 数据结构。

 3. cacher

终于到了缓存,apiserver 在内存中对数据进行了缓存,避免过度读 etcd。以 "kubectl get namespaces" 为例, 在 apiserver 读取 namespace 缓存的代码处断点,调用栈如下:

 从 GetList 的注释可以看出,当指定了 resource version 后,走缓存读取数据,保证返回值的版本大于等于指定的版本,否则仍然是读取 etcd。继续看缓存的数据结构,调用栈如下,结合调用栈,我们清晰地看到所谓缓存,最终也是一个 map。

 OK 缓存的运行时代码走读到此为止,我们继续看缓存的创建,经过一番探查,定位到了缓存的创建时机点。

// vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go:37
var ret RESTOptions
ret.Decorator = genericregistry.StorageWithCacher()

4. watch 

打开一个终端,执行命令 "kubectl get namespaces -w",则对 namespaces 进行了 list 和 watch,当 namepaces 发生变化时,变化会推送到终端,这是怎么实现的呢?

运行时的事件传播机制非常繁琐,一环套一环,简洁地讲,cacher 需要消费第3节中提到的 watchChan.resultChan,接收到来自 etcd 的事件,然后将事件传播给 cacheWatcher, 最后 http handler 处理该事件以 http chunked 的形式将事件转换后发送给 http 客户端。

打开一个终端 watch namespaces,开另一个终端创建新的 namespace,在关键代码处加上条件断点(event.Type == "ADDED"),通过走读源代码发现:cacher 消费了 watchChan.resultChan,完整的事件流路径为:

watchChan.resultChan -> Cacher.incoming 
  -> cacheWatcher.input -> cacheWatcher.result

读取 watchChan.resultChan channel 的方法调用栈如下:

 接下来的事件流转如下:

 1 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:749
 2 func (c *Cacher) processEvent(event *watchCacheEvent) {
 3   if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
 4     // Monitor if this gets backed up, and how much.
 5     klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
 6   }
 7   c.incoming <- *event
 8 }
 9 
10 // 中间经过 dispatchEvents
11 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:757
12 
13 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1204
14 func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
15   select {
16   case c.input <- event:
17     return true
18   default:
19     return false
20   }
21 }
22 
23 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1415
24 func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) {
25   // At this point we already start processing incoming watch events.
26   // However, the init event can still be processed because their serialization
27   // and sending to the client happens asynchrnously.
28   // TODO: As describe in the KEP, we would like to estimate that by delaying
29   //   the initialization signal proportionally to the number of events to
30   //   process, but we're leaving this to the tuning phase.
31   utilflowcontrol.WatchInitialized(ctx)
32 
33   for {
34     select {
35     case event, ok := <-c.input:
36       if !ok {
37         return
38       }
39       // only send events newer than resourceVersion
40       if event.ResourceVersion > resourceVersion {
41         c.sendWatchCacheEvent(event)
42       }
43     case <-ctx.Done():
44       return
45     }
46   }
47 }
48 
49 // vendor/k8s.io/apiserver/pkg/storage/cacher/cacher.go:1319
50 // NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
51 func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
52   watchEvent := c.convertToWatchEvent(event)
53   if watchEvent == nil {
54     // Watcher is not interested in that object.
55     return
56   }
57 
58   // We need to ensure that if we put event X to the c.result, all
59   // previous events were already put into it before, no matter whether
60   // c.done is close or not.
61   // Thus we cannot simply select from c.done and c.result and this
62   // would give us non-determinism.
63   // At the same time, we don't want to block infinitely on putting
64   // to c.result, when c.done is already closed.
65   //
66   // This ensures that with c.done already close, we at most once go
67   // into the next select after this. With that, no matter which
68   // statement we choose there, we will deliver only consecutive
69   // events.
70   select {
71   case <-c.done:
72     return
73   default:
74   }
75 
76   select {
77   case c.result <- *watchEvent:
78   case <-c.done:
79   }
80 }
81 
82 // Implements watch.Interface.
83 func (c *cacheWatcher) ResultChan() <-chan watch.Event {
84   return c.result
85 }

执行上面的代码后,事件来到了 cacheWatcher.result,在 http handler 中消费 cacheWatcher.result 中的事件,将变化事件发送给 http 客户端。

  1 // vendor/k8s.io/apiserver/pkg/endpoints/handlers/watch.go:164
  2 // ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
  3 // or over a websocket connection.
  4 func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  5   kind := s.Scope.Kind
  6   metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
  7   defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec()
  8 
  9   if wsstream.IsWebSocketRequest(req) {
 10     w.Header().Set("Content-Type", s.MediaType)
 11     websocket.Handler(s.HandleWS).ServeHTTP(w, req)
 12     return
 13   }
 14 
 15   flusher, ok := w.(http.Flusher)
 16   if !ok {
 17     err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
 18     utilruntime.HandleError(err)
 19     s.Scope.err(errors.NewInternalError(err), w, req)
 20     return
 21   }
 22 
 23   framer := s.Framer.NewFrameWriter(w)
 24   if framer == nil {
 25     // programmer error
 26     err := fmt.Errorf("no stream framing support is available for media type %q", s.MediaType)
 27     utilruntime.HandleError(err)
 28     s.Scope.err(errors.NewBadRequest(err.Error()), w, req)
 29     return
 30   }
 31 
 32   var e streaming.Encoder
 33   var memoryAllocator runtime.MemoryAllocator
 34 
 35   if encoder, supportsAllocator := s.Encoder.(runtime.EncoderWithAllocator); supportsAllocator {
 36     memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
 37     defer runtime.AllocatorPool.Put(memoryAllocator)
 38     e = streaming.NewEncoderWithAllocator(framer, encoder, memoryAllocator)
 39   } else {
 40     e = streaming.NewEncoder(framer, s.Encoder)
 41   }
 42 
 43   // ensure the connection times out
 44   timeoutCh, cleanup := s.TimeoutFactory.TimeoutCh()
 45   defer cleanup()
 46 
 47   // begin the stream
 48   w.Header().Set("Content-Type", s.MediaType)
 49   w.Header().Set("Transfer-Encoding", "chunked")
 50   w.WriteHeader(http.StatusOK)
 51   flusher.Flush()
 52 
 53   var unknown runtime.Unknown
 54   internalEvent := &metav1.InternalEvent{}
 55   outEvent := &metav1.WatchEvent{}
 56   buf := &bytes.Buffer{}
 57   ch := s.Watching.ResultChan()
 58   done := req.Context().Done()
 59 
 60   embeddedEncodeFn := s.EmbeddedEncoder.Encode
 61   if encoder, supportsAllocator := s.EmbeddedEncoder.(runtime.EncoderWithAllocator); supportsAllocator {
 62     if memoryAllocator == nil {
 63       // don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
 64       // instead, we allocate the buffer for the entire watch session and release it when we close the connection.
 65       memoryAllocator = runtime.AllocatorPool.Get().(*runtime.Allocator)
 66       defer runtime.AllocatorPool.Put(memoryAllocator)
 67     }
 68     embeddedEncodeFn = func(obj runtime.Object, w io.Writer) error {
 69       return encoder.EncodeWithAllocator(obj, w, memoryAllocator)
 70     }
 71   }
 72 
 73   for {
 74     select {
 75     case <-done:
 76       return
 77     case <-timeoutCh:
 78       return
 79     case event, ok := <-ch:
 80       if !ok {
 81         // End of results.
 82         return
 83       }
 84       metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc()
 85 
 86       obj := s.Fixup(event.Object)
 87       if err := embeddedEncodeFn(obj, buf); err != nil {
 88         // unexpected error
 89         utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v", obj, err))
 90         return
 91       }
 92 
 93       // ContentType is not required here because we are defaulting to the serializer
 94       // type
 95       unknown.Raw = buf.Bytes()
 96       event.Object = &unknown
 97       metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw)))
 98 
 99       *outEvent = metav1.WatchEvent{}
100 
101       // create the external type directly and encode it.  Clients will only recognize the serialization we provide.
102       // The internal event is being reused, not reallocated so its just a few extra assignments to do it this way
103       // and we get the benefit of using conversion functions which already have to stay in sync
104       *internalEvent = metav1.InternalEvent(event)
105       err := metav1.Convert_v1_InternalEvent_To_v1_WatchEvent(internalEvent, outEvent, nil)
106       if err != nil {
107         utilruntime.HandleError(fmt.Errorf("unable to convert watch object: %v", err))
108         // client disconnect.
109         return
110       }
111       if err := e.Encode(outEvent); err != nil {
112         utilruntime.HandleError(fmt.Errorf("unable to encode watch object %T: %v (%#v)", outEvent, err, e))
113         // client disconnect.
114         return
115       }
116       if len(ch) == 0 {
117         flusher.Flush()
118       }
119 
120       buf.Reset()
121     }
122   }
123 }

对应的调用栈如下图,清楚地看到:

// s.Watching.ResultChan() 即 cacheWatcher.result
 ch := s.Watching.ResultChan() 

 总结,以上的这些过程可以用下图表示:

 

 客户端发起 list and watch,本质是一个 http get 请求,watch 参数为 true,具体代码逻辑在

// staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go:169
func ListResource(...)

客户端和 apiserver 之间的使用的是 http 长连接,apiserver 设置响应头 Transfer-Encoding 为 chunked,并保持住连接,apiserver 首先把 list 数据返回给客户端后,后续变化的事件以 chunked 块推送给客户端,这是一种长轮询的解法。

 

笔记发布在公众号上,没什么人看,链接:https://mp.weixin.qq.com/s?__biz=MzA3ODQwMDk5Mg==&mid=2247483730&idx=1&sn=02e7f5db7697b4a674371ad16272b67b&chksm=9f421ddda83594cb6aa89801dc62a2d22193e264ceefd0c039c301ca64a63265e8d36bf05620&token=1311797829&lang=zh_CN#rd

标签:wc,kubernetes,err,list,watch,apiserver,etcd,http
From: https://www.cnblogs.com/allenwas3/p/17551336.html

相关文章

  • MyBatis返回resultType=Map的用法, 返回List<Map<String,String>>
    <selectid="statOnlineAndNotlineNumber"resultType="java.util.Map"parameterType="java.lang.String">SELECTonline_stateasstate,COUNT(online_state)asnumberFROMwl_rm_t_vehicle_state<iftest="operatorCode!=nu......
  • (二)kubernetes部署kafka
    与前面的zookeeper一样需要持久化存储,因此还是先创建PV,然后再部署kafka。2、配置nfs创建需要作为nfs共享的目录mkdir/home/nfs/kafka-0添加权限chmod777/home/nfs/kafka-0编辑配置root@ubuntu:#vim/etc/export#/etc/exports:theaccesscontrollistforfilesyste......
  • C#使用泛型方法将Datatable转换成List对象集合
     在项目中遇到需要将Datatable转换成对象的需求,通过dr[0]取下标这种获取,如果数据的顺序发生了改变则需要改变全部,工作量大foreach(DataRowdrindt.Rows){CheckDetailinfo=newCheckDetail();info.org_id=dr[0].ToStrin......
  • Kubernetes轻量级日志工具Loki安装及踩坑记录
    Loki简介Loki是Grafana出品的一个轻量级日志系统,熟悉ELK的都知道ELK使用起来的成本,而且仅仅是日志检索使用ELK的话有点大材小用了。Loki8技术栈中使用了以下组件。Promtail用来将容器日志发送到Loki或者Grafana服务上的日志收集工具,该工具主要包括发现采集目标以及给日志......
  • ArrayList源码阅读
    ArrayList源码分析ArrayList简介ArrayList的底层是数组队列,相当于动态数组。与Java中的数组相比,它的容量能动态增长。在添加大量元素前,应用程序可以使用ensureCapacity操作来增加ArrayList实例的容量。这可以减少递增式再分配的数量。ArrayList继承于AbstractList,实现......
  • C++ STL容器之vector、list
    (1)vector连续存储的容器,动态数组,在堆上分配空间底层实现:数组扩容机制:vector增加(插入)新元素时,如果未超过当时的容量,则还有剩余空间,那么直接添加到最后(插入指定位置),然后调整迭代器。如果没有剩余空间了,则会重新配置原有元素个数的两倍空间,然后将原空间元素通过复制的方式初始......
  • DevExpress WinForms TreeList控件,让业务数据展示更清晰!(一)
    DevExpressWinForms的TreeList控件是一个功能齐全、数据感知的TreeView-ListView的混合体,它可以以树形、网格或两者结合的形式显示数据信息。无论是数据绑定模式还是非绑定模式,都具有完整的数据编辑支持。PS:DevExpressWinForm拥有180+组件和UI库,能为WindowsForms平台创建具有......
  • DataTable转为List集合
    publicclassTabletoList{publicstaticList<T>TableToListModel<T>(DataTabledt)whereT:new(){//定义集合List<T>ts=newList<T>();//获得此模型的类型Type......
  • kubernetes之 存储卷
    第八部分kubernetes之存储卷脱离节点而存在共享存储。存储卷不属于容器,他属于pod缓存,宿主机,不具备真正意义上存储,宿主机退役后,存储资源随之丢失,除非宿主机上也挂载独立的卷信息。容器真正意义上的存储卷类型emptyDir:pod删除,存储内容也删除,只能当临时存储空间或缓存使用,无真正意......
  • 对目标元素进行监听 - addListener和IntersectionObserver
    在web的构建中,经常需要对元素进行监听,例如监听元素是否出现在可视范围内。我们可以通过addEventListener来监听滚动,计算元素距离顶部的位置对元素的变更来做出反应。但是长时间大量的触发事件反而对网页性能影响很大,使用节流的话其实也只是浅浅的优化一下性能。有没有其他思路可......