不是很能看懂,但是又觉得很有用,不定什么时候能用到,先记录到这里吧
operation.go
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)
// Operation represents an ongoing action which the server is performing.
type Operation struct {
ID string
result interface{}
awaiting <-chan interface{}
finished *time.Time
lock sync.Mutex
notify chan bool
}
// Operations tracks all the ongoing operations.
type Operations struct {
// Access only using functions from atomic.
lastID int64
// 'lock' guards the ops map.
lock sync.Mutex
ops map[string]*Operation
}
// Returns a new Operations repository.
func NewOperations() *Operations {
ops := &Operations{
ops: map[string]*Operation{},
}
go util.Forever(func() { ops.expire(10 * time.Minute) }, 5*time.Minute)
return ops
}
// Add a new operation. Lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation {
id := atomic.AddInt64(&ops.lastID, 1)
op := &Operation{
ID: strconv.FormatInt(id, 10),
awaiting: from,
notify: make(chan bool, 1),
}
go op.wait()
go ops.insert(op)
return op
}
// Inserts op into the ops map.
func (ops *Operations) insert(op *Operation) {
ops.lock.Lock()
defer ops.lock.Unlock()
ops.ops[op.ID] = op
}
// List operations for an API client.
func (ops *Operations) List() api.ServerOpList {
ops.lock.Lock()
defer ops.lock.Unlock()
ids := []string{}
for id := range ops.ops {
ids = append(ids, id)
}
sort.StringSlice(ids).Sort()
ol := api.ServerOpList{}
for _, id := range ids {
ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
}
return ol
}
// Returns the operation with the given ID, or nil
func (ops *Operations) Get(id string) *Operation {
ops.lock.Lock()
defer ops.lock.Unlock()
return ops.ops[id]
}
// Garbage collect operations that have finished longer than maxAge ago.
func (ops *Operations) expire(maxAge time.Duration) {
ops.lock.Lock()
defer ops.lock.Unlock()
keep := map[string]*Operation{}
limitTime := time.Now().Add(-maxAge)
for id, op := range ops.ops {
if !op.expired(limitTime) {
keep[id] = op
}
}
ops.ops = keep
}
// Waits forever for the operation to complete; call via go when
// the operation is created. Sets op.finished when the operation
// does complete, and sends on the notify channel, in case there
// are any WaitFor() calls in progress.
// Does not keep op locked while waiting.
func (op *Operation) wait() {
defer util.HandleCrash()
result := <-op.awaiting
op.lock.Lock()
defer op.lock.Unlock()
op.result = result
finished := time.Now()
op.finished = &finished
op.notify <- true
}
// Wait for the specified duration, or until the operation finishes,
// whichever happens first.
func (op *Operation) WaitFor(timeout time.Duration) {
select {
case <-time.After(timeout):
case <-op.notify:
// Re-send on this channel in case there are others
// waiting for notification.
op.notify <- true
}
}
// Returns true if this operation finished before limitTime.
func (op *Operation) expired(limitTime time.Time) bool {
op.lock.Lock()
defer op.lock.Unlock()
if op.finished == nil {
return false
}
return op.finished.Before(limitTime)
}
// Return status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case.
func (op *Operation) StatusOrResult() (description interface{}, finished bool) {
op.lock.Lock()
defer op.lock.Unlock()
if op.finished == nil {
return api.Status{
Status: api.StatusWorking,
Details: op.ID,
}, false
}
return op.result, true
}
https://cit965.com/docs/k8s-course-code-start/glog
我们使用 Operations 来记录系统中目前的请求操作状态,操作是否执行完毕,是否超时,是否正在执行中。同时,我们正式让异步操作生效,将耗时操作放到MakeAsync中
对于 operation.go 的详细代码解释
用于在Kubernetes API服务器中管理和追踪正在进行的操作。以下是这段代码的详细解释:
Operation 结构代表服务器正在执行的一个操作。它包含了操作的ID、结果、等待通道、完成时间、同步锁和通知通道等成员。
Operations 结构用于跟踪所有正在进行的操作。它包含了最后一个操作的ID、同步锁和一个由操作ID到操作实例的映射。
NewOperations 函数用于创建一个新的操作跟踪器。这个函数创建一个新的操作跟踪器实例,并启动一个后台协程,用于每隔一段时间(例如5分钟)清理过期的操作。
NewOperation 方法用于添加一个新的操作。它使用原子操作增加最后一个操作的ID,创建一个新的操作实例,并启动两个后台协程,分别用于等待操作完成和将新操作插入到操作映射中。
insert 方法用于将一个新的操作插入到操作映射中。这个方法会在插入操作前后加锁,以确保并发安全。
List 方法用于列出所有操作的ID。这个方法也会在操作前后加锁,以确保并发安全。
Get 方法用于获取指定ID的操作。这个方法也会在操作前后加锁,以确保并发安全。
expire 方法用于清理过期的操作。这个方法会遍历所有操作,检查它们是否已经完成并且完成时间已经超过了最大年龄,如果是的话,就将它们从操作映射中移除。
wait 方法用于等待一个操作完成。这个方法会在一个操作完成后设置其完成时间,并发送一个通知信号。
WaitFor 方法用于等待一个操作完成或者超时。这个方法会等待一个通知信号或者超时信号,以确定一个操作是否已经完成。
expired 方法用于检查一个操作是否已经完成并且完成时间早于指定时间。
StatusOrResult 方法用于获取一个操作的状态信息或者结果。如果一个操作已经完成,它会返回操作的结果和一个表示已经完成的布尔值;否则,它会返回操作的状态信息和一个表示尚未完成的布尔值
标签:License,源码,用于,完成,操作,k8s,方法,ID,追踪 From: https://blog.51cto.com/landandan/6377734