场景1:同步时连不上kube-apiserver
WaitForCacheSync一直阻塞,直到成功或者控制器停止。
W0115 13:01:55.329881 18339 reflector.go:535] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
E0115 13:01:55.329948 18339 reflector.go:147] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: Failed to watch *v1.Node: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
^CF0115 13:02:05.291665 18339 multi_config_multi_clientset.go:48] failed to wait for caches to sync
设置5s同步缓存的超时时间
timeout := 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
curStopCh := make(chan struct{})
go func() {
<-ctx.Done()
close(curStopCh)
}()
klog.Infof("begin to wait cache sync")
if ok := cache.WaitForCacheSync(curStopCh, hasSynced); !ok {
klog.Fatalf("failed to wait for caches to sync")
}
I0116 20:32:19.375547 19877 multi_config_multi_clientset.go:60] begin to wait cache sync
W0116 20:32:19.376501 19877 reflector.go:535] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
E0116 20:32:19.377906 19877 reflector.go:147] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: Failed to watch *v1.Node: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
W0116 20:32:20.664333 19877 reflector.go:535] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
E0116 20:32:20.664422 19877 reflector.go:147] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: Failed to watch *v1.Node: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
W0116 20:32:23.771688 19877 reflector.go:535] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
E0116 20:32:23.771743 19877 reflector.go:147] pkg/mod/k8s.io/[email protected]/tools/cache/reflector.go:229: Failed to watch *v1.Node: failed to list *v1.Node: Get "https://192.168.0.105:6443/api/v1/nodes?limit=500&resourceVersion=0": dial tcp 192.168.0.105:6443: connect: connection refused
F0116 20:32:24.375933 19877 multi_config_multi_clientset.go:62] failed to wait for caches to sync
场景2:shutdown后发送请求
在factory shutdown之后继续发送创建请求,可以成功,因为测试发现大概90s之后才断连。
即使超过90s断连之后,clientset继续发送请求时,依然可以建立连接,请求处理后再过90s断连。
func addListWatchCfgAndClient(stopCh chan struct{}) {
cfg, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
hasSynced := informerFactory.Core().V1().Nodes().Informer().HasSynced
informerFactory.Core().V1().Nodes().Lister()
informerFactory.Start(stopCh)
// defer informerFactory.Shutdown()
klog.Infof("begin to wait cache sync")
if ok := cache.WaitForCacheSync(stopCh, hasSynced); !ok {
klog.Fatalf("failed to wait for caches to sync")
}
close(stopCh)
informerFactory.Shutdown()
ns := &core_v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
}
if _, err := kubeClient.CoreV1().Namespaces().Create(context.Background(), ns, metav1.CreateOptions{}); err != nil {
klog.Infof("create ns test failed, err is %v", err)
} else {
klog.Infof("create ns test success")
}
}
I0116 13:16:22.873697 29268 multi_config_multi_clientset.go:49] begin to wait cache sync
I0116 13:16:22.978257 29268 multi_config_multi_clientset.go:65] create ns test success
func addListWatchCfgAndClient() {
cfg, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
hasSynced := informerFactory.Core().V1().Nodes().Informer().HasSynced
informerFactory.Core().V1().Nodes().Lister()
newStopCh := make(chan struct{})
informerFactory.Start(newStopCh)
klog.Infof("begin to wait cache sync")
if ok := cache.WaitForCacheSync(newStopCh, hasSynced); !ok {
klog.Fatalf("failed to wait for caches to sync")
}
close(newStopCh)
informerFactory.Shutdown()
klog.Infof("shut down completed")
time.Sleep(2 * time.Minute)
klog.Infof("after 2 minutes")
ns := &core_v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
}
if _, err := kubeClient.CoreV1().Namespaces().Create(context.Background(), ns, metav1.CreateOptions{}); err != nil {
klog.Infof("create ns test failed, err is %v", err)
} else {
klog.Infof("create ns test success")
}
}
I0116 21:44:24.638815 66878 multi_config_multi_clientset.go:50] begin to wait cache sync
I0116 21:44:24.740917 66878 multi_config_multi_clientset.go:58] shut down completed
I0116 21:46:24.743508 66878 multi_config_multi_clientset.go:61] after 2 minutes
I0116 21:46:24.753175 66878 multi_config_multi_clientset.go:69] create ns test failed, err is namespaces "test" already exists
client-go和kube-apiserver建立连接后,每隔30s向kube-apiserver发送报文。
建立https连接90s后,client-go向kube-apiserver发送了keep-alive探测报文,之后通过RST报文来关闭tcp连接。
RST用于关闭异常连接,接收端收到RST包后,不需要发送ACK来确认。
例如,向不存在指定端口的地方发送请求,对端发RST,报错connection refused。