首页 > 编程语言 >k8s源码中给操作添加追踪功能

k8s源码中给操作添加追踪功能

时间:2023-05-30 12:04:53浏览次数:57  
标签:License 源码 用于 完成 操作 k8s 方法 ID 追踪

不是很能看懂,但是又觉得很有用,不定什么时候能用到,先记录到这里吧

k8s源码中给操作添加追踪功能_操作追踪

operation.go

/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver

import (
	"sort"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

	"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
	"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// Operation represents an ongoing action which the server is performing.
type Operation struct {
	ID       string
	result   interface{}
	awaiting <-chan interface{}
	finished *time.Time
	lock     sync.Mutex
	notify   chan bool
}

// Operations tracks all the ongoing operations.
type Operations struct {
	// Access only using functions from atomic.
	lastID int64

	// 'lock' guards the ops map.
	lock sync.Mutex
	ops  map[string]*Operation
}

// Returns a new Operations repository.
func NewOperations() *Operations {
	ops := &Operations{
		ops: map[string]*Operation{},
	}
	go util.Forever(func() { ops.expire(10 * time.Minute) }, 5*time.Minute)
	return ops
}

// Add a new operation. Lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation {
	id := atomic.AddInt64(&ops.lastID, 1)
	op := &Operation{
		ID:       strconv.FormatInt(id, 10),
		awaiting: from,
		notify:   make(chan bool, 1),
	}
	go op.wait()
	go ops.insert(op)
	return op
}

// Inserts op into the ops map.
func (ops *Operations) insert(op *Operation) {
	ops.lock.Lock()
	defer ops.lock.Unlock()
	ops.ops[op.ID] = op
}

// List operations for an API client.
func (ops *Operations) List() api.ServerOpList {
	ops.lock.Lock()
	defer ops.lock.Unlock()

	ids := []string{}
	for id := range ops.ops {
		ids = append(ids, id)
	}
	sort.StringSlice(ids).Sort()
	ol := api.ServerOpList{}
	for _, id := range ids {
		ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
	}
	return ol
}

// Returns the operation with the given ID, or nil
func (ops *Operations) Get(id string) *Operation {
	ops.lock.Lock()
	defer ops.lock.Unlock()
	return ops.ops[id]
}

// Garbage collect operations that have finished longer than maxAge ago.
func (ops *Operations) expire(maxAge time.Duration) {
	ops.lock.Lock()
	defer ops.lock.Unlock()
	keep := map[string]*Operation{}
	limitTime := time.Now().Add(-maxAge)
	for id, op := range ops.ops {
		if !op.expired(limitTime) {
			keep[id] = op
		}
	}
	ops.ops = keep
}

// Waits forever for the operation to complete; call via go when
// the operation is created. Sets op.finished when the operation
// does complete, and sends on the notify channel, in case there
// are any WaitFor() calls in progress.
// Does not keep op locked while waiting.
func (op *Operation) wait() {
	defer util.HandleCrash()
	result := <-op.awaiting

	op.lock.Lock()
	defer op.lock.Unlock()
	op.result = result
	finished := time.Now()
	op.finished = &finished
	op.notify <- true
}

// Wait for the specified duration, or until the operation finishes,
// whichever happens first.
func (op *Operation) WaitFor(timeout time.Duration) {
	select {
	case <-time.After(timeout):
	case <-op.notify:
		// Re-send on this channel in case there are others
		// waiting for notification.
		op.notify <- true
	}
}

// Returns true if this operation finished before limitTime.
func (op *Operation) expired(limitTime time.Time) bool {
	op.lock.Lock()
	defer op.lock.Unlock()
	if op.finished == nil {
		return false
	}
	return op.finished.Before(limitTime)
}

// Return status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case.
func (op *Operation) StatusOrResult() (description interface{}, finished bool) {
	op.lock.Lock()
	defer op.lock.Unlock()

	if op.finished == nil {
		return api.Status{
			Status:  api.StatusWorking,
			Details: op.ID,
		}, false
	}
	return op.result, true
}

https://cit965.com/docs/k8s-course-code-start/glog

我们使用 Operations 来记录系统中目前的请求操作状态,操作是否执行完毕,是否超时,是否正在执行中。同时,我们正式让异步操作生效,将耗时操作放到MakeAsync中

对于 operation.go 的详细代码解释

用于在Kubernetes API服务器中管理和追踪正在进行的操作。以下是这段代码的详细解释:

Operation 结构代表服务器正在执行的一个操作。它包含了操作的ID、结果、等待通道、完成时间、同步锁和通知通道等成员。

Operations 结构用于跟踪所有正在进行的操作。它包含了最后一个操作的ID、同步锁和一个由操作ID到操作实例的映射。

NewOperations 函数用于创建一个新的操作跟踪器。这个函数创建一个新的操作跟踪器实例,并启动一个后台协程,用于每隔一段时间(例如5分钟)清理过期的操作。

NewOperation 方法用于添加一个新的操作。它使用原子操作增加最后一个操作的ID,创建一个新的操作实例,并启动两个后台协程,分别用于等待操作完成和将新操作插入到操作映射中。

insert 方法用于将一个新的操作插入到操作映射中。这个方法会在插入操作前后加锁,以确保并发安全。

List 方法用于列出所有操作的ID。这个方法也会在操作前后加锁,以确保并发安全。

Get 方法用于获取指定ID的操作。这个方法也会在操作前后加锁,以确保并发安全。

expire 方法用于清理过期的操作。这个方法会遍历所有操作,检查它们是否已经完成并且完成时间已经超过了最大年龄,如果是的话,就将它们从操作映射中移除。

wait 方法用于等待一个操作完成。这个方法会在一个操作完成后设置其完成时间,并发送一个通知信号。

WaitFor 方法用于等待一个操作完成或者超时。这个方法会等待一个通知信号或者超时信号,以确定一个操作是否已经完成。

expired 方法用于检查一个操作是否已经完成并且完成时间早于指定时间。

StatusOrResult 方法用于获取一个操作的状态信息或者结果。如果一个操作已经完成,它会返回操作的结果和一个表示已经完成的布尔值;否则,它会返回操作的状态信息和一个表示尚未完成的布尔值

标签:License,源码,用于,完成,操作,k8s,方法,ID,追踪
From: https://blog.51cto.com/landandan/6377734

相关文章

  • 【QCustomPlot】性能提升之修改源码(版本 V2.x.x)
    说明使用QCustomPlot绘图库的过程中,有时候觉得原生的功能不太够用,比如它没有曲线平滑功能;有时候又觉得更新绘图数据时逐个赋值效率太低,如果能直接操作内存就好了;还有时候希望减轻CPU压力,启用GPU加速。好在QCustomPlot是开源项目,源码编写十分规范,想要理解它的可视化思路不......
  • 基于JAVA的springboot+vue医院信息管理系统、医院挂号管理系统,附源码+数据库+论文+PPT
    1、项目介绍任何系统都要遵循系统设计的基本流程,本系统也不例外,同样需要经过市场调研,需求分析,概要设计,详细设计,编码,测试这些步骤,基于java语言设计并实现了医院信管系统。该系统基于B/S即所谓浏览器/服务器模式,应用java技术,选择MySQL作为后台数据库。系统主要包括首页,个人中心,用户......
  • jwt原理,jwt开发流程,drf-jwt快速使用,drf-jwt定制返回格式,drf-jwt自定义用户表签发,drf-j
    jwt原理:  JWT就是一段字符串,由三段信息构成的,将这三段信息文本用.链接一起就构成了Jwt字符串1headerjwt的头部承载两部分信息:声明类型,这里是jwt声明加密的算法通常直接使用HMACSHA256公司信息......
  • jwt原理开发,drf-jwt快速使用和自定义使用,jwt签发认证源码分析
    一眼弄懂cookieSeesiontoken区别彻底弄懂cookie,session和token区别1jwt原理1.1使用jwt认证和使用session认证的区别1.2三段式eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.TJVA95OrM7E2cBab30RMHrHDcEf......
  • k8s探针详解
    一、探针类型介绍:(1)、K8s中存在三种类型的探针:livenessprobe、readinessprobe和startup探针。每类探针都支持三种探测方法liveness探针:影响的是单个容器,如果检查失败,将杀死容器,根据pod的restartPolicy来操作。readiness探针:影响的是整个pod,即如果pod中有多个容器,只要有一......
  • 3D眼动追踪软件行业市场调研分析报告2023
    2023-2029全球3D眼动追踪软件行业调研及趋势分析报告据调研机构恒州诚思(YH)研究统计,2022年全球3D眼动追踪软件市场规模约亿元,2018-2022年年复合增长率CAGR约为%,预计未来将持续保持平稳增长的态势,到2029年市场规模将接近亿元,未来六年CAGR为%。从核心市场看,中国3D眼动追踪软件市......
  • K8s Etcd 性能慢,调整这个参数快多了!
    本文最终的解决方式很简单,就是将现有卷升级为支持更高IOPS的卷,但解决问题的过程值得推荐。我们的团队看管着大约30套自建的Kubernetes集群,最近需要针对etcd集群进行性能分析。每个etcd集群有5个成员,实例型号为 m6i.xlarge,最大支持6000IOPS。每个成员有3个卷:root......
  • K8S学习笔记
    K8S官网文档基本概念节点|Kubernetes(p2hp.com)使用kubectl来查看节点状态和其他细节信息:kubectldescribenode<节点名称>容器状态要检查Pod中容器的状态,你可以使用kubectldescribepod<pod名称>其输出中包含Pod中每个容器的状态。pod配置文件详解创建......
  • 视频直播app源码,Android顶部导航栏制作
    视频直播app源码,Android顶部导航栏制作图片准备与样式变更因为目前版本UI默认以紫色色调为主,而在此我们希望使用更为和谐的淡蓝色调,此时需要对themes.xml文件执行修改; 首先我们要设置颜色,打开文件colors.xml文件中默认存在一些默认色彩,我们按照格式新增两个主色调blue_prima......
  • K8s Etcd 性能慢,调整这个参数快多了!
    本文最终的解决方式很简单,就是将现有卷升级为支持更高IOPS的卷,但解决问题的过程值得推荐。我们的团队看管着大约30套自建的Kubernetes集群,最近需要针对etcd集群进行性能分析。每个etcd集群有5个成员,实例型号为 m6i.xlarge,最大支持6000IOPS。每个成员有3个卷:r......