首页 > 其他分享 >Kubernetes API Codec 解析

Kubernetes API Codec 解析

时间:2023-11-12 23:13:44浏览次数:40  
标签:runtime Kubernetes Codec encoder API scheme Serializer

概述

Kubernetes API 多版本和序列化 这篇文章中,介绍了API多版本的功能和实现原理,其中Codec就是用来做序列化工作的,它主要用在两个地方:一个是通过HTTP协议跟客户端进行交互时,会对传输的数据进行序列化和反序列化,将字节流类型的数据转换成对应的API对象,或者是将API对象转换成对应格式的数据返回给客户端;一个是用在存储层的,即API对象存储到数据库时,也需要经过编码的,即经过序列化,默认是存储成 protobuf格式的数据,然后从数据库读出来数据时,又会反序列化为对应的API对象,下面我们来分析下Codec的实现机制。

Serializer

Serializer即是将API对象以某种数据格式进行序列化和反序列化,目前支持的数据格式有三种:json, yaml, protobuf,我们先来看看相关的接口定义:

// k8s.io/apimachinery/pkg/runtime/interfaces.go

// Encoder writes objects to a serialized form
type Encoder interface {
  Encode(obj Object, w io.Writer) error
  Identifier() Identifier
}

// Decoder attempts to load an object from data.
type Decoder interface {
  Decode(data []byte, defaults *schema.GroupVersionKind, into Object) (Object, *schema.GroupVersionKind, error)
}

// Serializer is the core interface for transforming objects into a serialized format and back.
type Serializer interface {
  Encoder
  Decoder
}

// Codec is a Serializer that deals with the details of versioning objects. It offers the same
// interface as Serializer, so this is a marker to consumers that care about the version of the objects
// they receive.
type Codec Serializer

Encoder接口中定义的Encode()方法是要将一个API对象以某种格式编码到输出中,而Decoder接口中定义的Decode()方法则是将字节类型的数据解码成某个版本的API对象,这两个编码解码的接口组合起来形成一个新的接口,叫Serializer,同时也叫 Codec

目前 Kubernetes 中有三种数据格式的Serializer,均实现了上面的接口,分别为json, yaml和protobuf,来看下他们的类图:

这几个Serializer定义在 k8s.io/apimachinery/pkg/runtime/serializer/ 目录下,分别实现了Json, Yaml和Protobuf数据格式的编码和解码操作,需要注意的是没有专门的Yaml Serializer的实现,因为Json跟Yaml的转换很容易,所以直接使用Json Serializer去实现了Yaml Serializer,具体json和protobuf是如何进行Encode和Decode的,这里我们不展开,这里只需要知道这几个Serializer的作用,做了什么事情即可。

CodecFactory

上面的接口中,为什么要再定义一个跟Serializer同名的接口Codec呢?Codec的注释有这么一句话,说明了它的作用:

Codec is a Serializer that deals with the details of versioning objects.

即Codec是专门用来处理多版本的API对象的序列化的,它除了需要做API对象的序列化操作之外,还需要做版本转换的操作,而上面介绍到的json/yaml/protobuf Serializer相对偏底层,只是做某个版本对象的序列化操作,Codec会引用Serializer做具体的序列化,然后再做版本转换。Codec既然跟对象版本有关,那肯定不同版本的API资源就要有不同的Codec了,所以我们就需要有个生产Codec的工厂类,即CodecFactory:

可以看到它实现了两个接口,NegotiatedSerializerStorageSerializer,正好对应了本小节开头提到的Codec的两个作用:一个作用于HTTP,用来跟客户端交互,一个作用于存储,用来跟数据库交互,其中的 EncoderForVersion()DecoderToVersion() 就是用来生产Codec的方法,来看看相关代码:

// k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

func NewCodecFactory(scheme *runtime.Scheme, mutators ...CodecFactoryOptionsMutator) CodecFactory {
  options := CodecFactoryOptions{Pretty: true}
  for _, fn := range mutators {
    fn(&options)
  }

  // 创建了 json/yaml/protobuf 三种serializer
  serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory, options)
  return newCodecFactory(scheme, serializers)
}

func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory {
  decoders := make([]runtime.Decoder, 0, len(serializers))
  var accepts []runtime.SerializerInfo

  var legacySerializer runtime.Serializer
  for _, d := range serializers {
    decoders = append(decoders, d.Serializer)
    for _, mediaType := range d.AcceptContentTypes {
      ......
      info := runtime.SerializerInfo{
        MediaType:        d.ContentType,
        EncodesAsText:    d.EncodesAsText,
        Serializer:       d.Serializer,
        PrettySerializer: d.PrettySerializer,
        StrictSerializer: d.StrictSerializer,
      }
      ......
      accepts = append(accepts, info)
      if mediaType == runtime.ContentTypeJSON {
        legacySerializer = d.Serializer
      }
    }
  }
  ......
  return CodecFactory{
    scheme:    scheme,
    universal: recognizer.NewDecoder(decoders...),

    accepts: accepts,

    legacySerializer: legacySerializer,
  }
}

上面的代码显示了CodecFactory是如何创建出来的,比较重要的是 serializers := newSerializersForScheme(scheme, json.DefaultMetaFactory, options) 这行代码,这是去创建 json/yaml/protobuf 三种Serializer,然后在 newCodecFactory() 方法中将他们转成了 SerializerInfo 对象,最终将他们放到了CodecFactory的 accepts 属性中。还有个universal 属性,是把三种Serializer放到了一个列表里,然后组成了一个统一的decoder,即它可以解析三种格式的数据。

再来看看CodecFactory生产Codec的方法:

// k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go

func (f CodecFactory) CodecForVersions(encoder runtime.Encoder, decoder runtime.Decoder, encode runtime.GroupVersioner, decode runtime.GroupVersioner) runtime.Codec {
  // TODO: these are for backcompat, remove them in the future
  if encode == nil {
    encode = runtime.DisabledGroupVersioner
  }
  if decode == nil {
    decode = runtime.InternalGroupVersioner
  }
  return versioning.NewDefaultingCodecForScheme(f.scheme, encoder, decoder, encode, decode)
}

// DecoderToVersion returns a decoder that targets the provided group version.
func (f CodecFactory) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
  return f.CodecForVersions(nil, decoder, nil, gv)
}

// EncoderForVersion returns an encoder that targets the provided group version.
func (f CodecFactory) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
  return f.CodecForVersions(encoder, nil, gv, nil)
}

这几个函数接收的参数,都是接口类型的,Encoder和Decoder上面我们介绍过了,是用来做具体的数据格式序列化的,还有个GroupVersioner来看下:

// k8s.io/apimachinery/pkg/runtime/interfaces.go

type GroupVersioner interface {
  KindForGroupVersionKinds(kinds []schema.GroupVersionKind) (target schema.GroupVersionKind, ok bool)
  Identifier() string
}

而实现了该接口的是一个叫 multiGroupVersioner 的结构体,位于 k8s.io/apimachinery/pkg/runtime/codec.go

这个multiGroupVersioner的作用是什么呢?可以看看它里面的属性,有一个GroupVersion类型的target,然后有一个[]GroupKind类型的accetedGroupKinds,然后 KindForGroupVersionKinds() 方法的作用就是,当接收一个GVK列表时,看它们的GroupKind哪一个在acceptGroupKinds里,然后就会把它的Kind取出来,跟target组成一个新的GVK返回,即期望输出的Group和Version是固定的,就是target所指定的,只需要找到匹配的Kind即可,比如:

target=mygroup/__internal, acceptedGroupKinds=mygroup/Foo, anothergroup/Bar
KindForGroupVersionKinds(yetanother/v1/Baz, anothergroup/v1/Bar) -> mygroup/__internal/Bar (matched preferred group/kind)

那它到底有什么用呢?要知道我们前面讲类型注册时,注册进scheme的类型,可能会对应多个GVK,即typeToGVK,multiGroupVersioner的作用就是在这,在进行版本转换时,已知一个类型,找到多个GVK时,能够唯一的确定一个GVK:

// k8s.io/apimachinery/pkg/runtime/scheme.go
func (s *Scheme) convertToVersion(copy bool, in Object, target GroupVersioner) (Object, error) {
  var t reflect.Type
  t = reflect.TypeOf(in)
  t = t.Elem()
  kinds, ok := s.typeToGVK[t]
  gvk, ok := target.KindForGroupVersionKinds(kinds)
  ......
}

理解了GroupVersioner的作用,再来看看 CodecForVersions() 这几个工厂方法,就是指定了做序列化的encoder或者decoder,以及目标版本,然后构造了一个能够处理版本转换的 versioning.codec,它就是这个工厂方法生产出来的Codec,再来具体看看它:

// k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go

func NewDefaultingCodecForScheme(
  scheme *runtime.Scheme,
  encoder runtime.Encoder,
  decoder runtime.Decoder,
  encodeVersion runtime.GroupVersioner,
  decodeVersion runtime.GroupVersioner,
) runtime.Codec {
  return NewCodec(encoder, decoder, runtime.UnsafeObjectConvertor(scheme), scheme, scheme, scheme, encodeVersion, decodeVersion, scheme.Name())
}

func NewCodec(
  encoder runtime.Encoder,
  decoder runtime.Decoder,
  convertor runtime.ObjectConvertor,
  creater runtime.ObjectCreater,
  typer runtime.ObjectTyper,
  defaulter runtime.ObjectDefaulter,
  encodeVersion runtime.GroupVersioner,
  decodeVersion runtime.GroupVersioner,
  originalSchemeName string,
) runtime.Codec {
  internal := &codec{
    encoder:   encoder,
    decoder:   decoder,
    convertor: convertor,
    creater:   creater,
    typer:     typer,
    defaulter: defaulter,

    encodeVersion: encodeVersion,
    decodeVersion: decodeVersion,

    identifier: identifier(encodeVersion, encoder),

    originalSchemeName: originalSchemeName,
  }
  return internal
}

type codec struct {
  encoder   runtime.Encoder
  decoder   runtime.Decoder
  convertor runtime.ObjectConvertor
  creater   runtime.ObjectCreater
  typer     runtime.ObjectTyper
  defaulter runtime.ObjectDefaulter

  encodeVersion runtime.GroupVersioner
  decodeVersion runtime.GroupVersioner

  identifier runtime.Identifier

  originalSchemeName string
}

codec实例中的encoder, decoder就是用来做具体序列化工作的json/yaml/protobuf Serializer,而creater, typer, defaulter均是scheme,encodeVersion, decodeVersion则是目标版本,还有convertor本质上也是scheme,只是在外面又包了一层,最终进行版本转换,调用的是scheme的UnsafeConvertToVersion()方法:

// k8s.io/apimachinery/pkg/runtime/helper.go

type unsafeObjectConvertor struct {
  *Scheme
}

func (c unsafeObjectConvertor) ConvertToVersion(in Object, outVersion GroupVersioner) (Object, error) {
  return c.Scheme.UnsafeConvertToVersion(in, outVersion)
}

func UnsafeObjectConvertor(scheme *Scheme) ObjectConvertor {
  return unsafeObjectConvertor{scheme}
}

来大致看看这个codec提供的Encode和Decode方法的逻辑:

// k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go

func (c *codec) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error {
  ......
  encodeFn := c.encoder.Encode
  ......
  out, err := c.convertor.ConvertToVersion(obj, c.encodeVersion)
  ......
  return encodeFn(out, w)
}

func (c *codec) Decode(data []byte, defaultGVK *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  decodeInto := into
  obj, gvk, err := c.decoder.Decode(data, defaultGVK, decodeInto)
  if into != nil {
    // perform defaulting if requested
    if c.defaulter != nil {
      c.defaulter.Default(obj)
    }

    // Short-circuit conversion if the into object is same object
    if into == obj {
      return into, gvk, strictDecodingErr
    }

    if err := c.convertor.Convert(obj, into, c.decodeVersion); err != nil {
      return nil, gvk, err
    }

    return into, gvk, strictDecodingErr
  }  
  ......
}

可以看到Encode时,是先进行版本转换,然后再用encoder进行序列化,而Decode时,先用decoder进行反序列化,将字节类型的数据Decode到某一个版本的API对象中,然后再对其进行赋默认值操作,还有进行版本转换,转换到目标版本,版本转换就是调用到上面提到的 unsafeObjectConvertor,它又调用scheme中注册的各种版本转换方法进行转换了。

这个Codec虽然逻辑有点绕,但是总结来说,它做的工作就是利用Serializer + Scheme,来做序列化和版本转换的工作。

Codec使用场景

我们再来看看Codec是怎么使用的,前面提到过,Codec在两个地方被用到:一个是客户端通过HTTP协议跟APIServer交互时,需要进行Codec,一个是将API对象存储到数据库时,需要进行Codec,我们来分别看下这两个场景是怎么用Codec的,简单走下代码的流程即可(Code Walk-through)。

跟数据库进行交互

Kubernetes APIServer Storage 框架解析 这篇文章中,就介绍过API对象是怎么存储到数据库中的,其中,在为每个API资源构建etcd store时,会通过 DefaultStorageFactory 来为其构建存储配置,而Codec相关的逻辑就在这:

// k8s.io/apiserver/pkg/server/storage/storage_factory.go

func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
    chosenStorageResource := s.getStorageGroupResource(groupResource)

    // operate on copy
    storageConfig := s.StorageConfig
    codecConfig := StorageCodecConfig{
        StorageMediaType:  s.DefaultMediaType,
        StorageSerializer: s.DefaultSerializer,
    }

    if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
        override.Apply(&storageConfig, &codecConfig)
    }
    if override, ok := s.Overrides[chosenStorageResource]; ok {
        override.Apply(&storageConfig, &codecConfig)
    }

    codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
    codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
    codecConfig.Config = storageConfig

    storageConfig.Codec, storageConfig.EncodeVersioner, err = s.newStorageCodecFn(codecConfig)

    return &storageConfig, nil
}

codecConfig中会保存存储序列化相关的一些配置,StorageMediaType默认为 application/vnd.kubernetes.protobuf,而StorageSerializer则为 legacyscheme.Codecs,还有StorageVersionMemoryVersion,分别表示该资源存储到数据库时使用的版本,以及加载到内存中使用的版本,我们来看看这两个方法:

// k8s.io/apiserver/pkg/server/storage/resource_encoding_config.go

func (o *DefaultResourceEncodingConfig) StorageEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
  if !o.scheme.IsGroupRegistered(resource.Group) {
    return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
  }

  resourceOverride, resourceExists := o.resources[resource]
  if resourceExists {
    return resourceOverride.ExternalResourceEncoding, nil
  }

  // return the most preferred external version for the group
  return o.scheme.PrioritizedVersionsForGroup(resource.Group)[0], nil
}

func (o *DefaultResourceEncodingConfig) InMemoryEncodingFor(resource schema.GroupResource) (schema.GroupVersion, error) {
  if !o.scheme.IsGroupRegistered(resource.Group) {
    return schema.GroupVersion{}, fmt.Errorf("group %q is not registered in scheme", resource.Group)
  }

  resourceOverride, resourceExists := o.resources[resource]
  if resourceExists {
    return resourceOverride.InternalResourceEncoding, nil
  }
  return schema.GroupVersion{Group: resource.Group, Version: runtime.APIVersionInternal}, nil
}

可以看到存储使用的版本,是通过scheme的PrioritizedVersionsForGroup()方法获得的,这个方法我们在scheme中介绍过,是获取该组中所有的版本,他们会按照优先级排序,排在第一位的是优先级最高的,一般是稳定版本是优先级最高的,这里取第0个值,即取的是版本优先级最高的版本,而内存版本则是内部版本,version为 __internal,所谓内存版本是指从数据库读出来原始的数据之后,要转换成的版本。

NewConfig()最终调用 newStorageCodecFn(codecConfig) 创建了Codec,我们来看看该方法:

// k8s.io/apiserver/pkg/server/storage/storage_codec.go

func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, runtime.GroupVersioner, error) {
  mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
  if err != nil {
    return nil, nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
  }

  supportedMediaTypes := opts.StorageSerializer.SupportedMediaTypes()
  serializer, ok := runtime.SerializerInfoForMediaType(supportedMediaTypes, mediaType)
  if !ok {
    supportedMediaTypeList := make([]string, len(supportedMediaTypes))
    for i, mediaType := range supportedMediaTypes {
      supportedMediaTypeList[i] = mediaType.MediaType
    }
    return nil, nil, fmt.Errorf("unable to find serializer for %q, supported media types: %v", mediaType, supportedMediaTypeList)
  }

  s := serializer.Serializer

  // Give callers the opportunity to wrap encoders and decoders.  For decoders, each returned decoder will
  // be passed to the recognizer so that multiple decoders are available.
  var encoder runtime.Encoder = s
  if opts.EncoderDecoratorFn != nil {
    encoder = opts.EncoderDecoratorFn(encoder)
  }
  decoders := []runtime.Decoder{
    // selected decoder as the primary
    s,
    // universal deserializer as a fallback
    opts.StorageSerializer.UniversalDeserializer(),
    // base64-wrapped universal deserializer as a last resort.
    // this allows reading base64-encoded protobuf, which should only exist if etcd2+protobuf was used at some point.
    // data written that way could exist in etcd2, or could have been migrated to etcd3.
    // TODO: flag this type of data if we encounter it, require migration (read to decode, write to persist using a supported encoder), and remove in 1.8
    runtime.NewBase64Serializer(nil, opts.StorageSerializer.UniversalDeserializer()),
  }
  if opts.DecoderDecoratorFn != nil {
    decoders = opts.DecoderDecoratorFn(decoders)
  }

  encodeVersioner := runtime.NewMultiGroupVersioner(
    opts.StorageVersion,
    schema.GroupKind{Group: opts.StorageVersion.Group},
    schema.GroupKind{Group: opts.MemoryVersion.Group},
  )

  // Ensure the storage receives the correct version.
  encoder = opts.StorageSerializer.EncoderForVersion(
    encoder,
    encodeVersioner,
  )
  decoder := opts.StorageSerializer.DecoderToVersion(
    recognizer.NewDecoder(decoders...),
    runtime.NewCoercingMultiGroupVersioner(
      opts.MemoryVersion,
      schema.GroupKind{Group: opts.MemoryVersion.Group},
      schema.GroupKind{Group: opts.StorageVersion.Group},
    ),
  )

  return runtime.NewCodec(encoder, decoder), encodeVersioner, nil
}

这里用到的方法基本上就都是我们前面介绍过的了,首先根据MediaType拿到对应的Serializer,然后创建了GroupVersioner目标版本,目标版本分别是codecConfig中的数据库存储版本和内存版本,然后通过StorageSerializer,即Codecs,即CodecFactory,使用EncoderForVersion(), DecoderToVersion()工厂方法,构建出带版本转换的encoder和decoder,最后这两者再组装成一个新的Codec返回:

// k8s.io/apimachinery/pkg/runtime/codec.go

type codec struct {
  Encoder
  Decoder
}

// NewCodec creates a Codec from an Encoder and Decoder.
func NewCodec(e Encoder, d Decoder) Codec {
  return codec{e, d}
}

这个codec就是简单的封装,只是为了对外提供统一的接口而已,它就是最终etcd store对API对象进行数据库存储和读取时,使用到的Codec了,例如下例中的codec便是这里NewCodec()创建出来的:

// k8s.io/apiserver/pkg/storage/etcd3/store.go

func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
  if _, err := conversion.EnforcePtr(objPtr); err != nil {
    return fmt.Errorf("unable to convert output object to pointer: %v", err)
  }
  _, _, err := codec.Decode(value, nil, objPtr)
  ......
}

跟客户端进行交互

以GET某个API对象为例,在install GET请求的Handler时,构建了一个reqScope,它里面包含了跟序列化相关的变量:

// k8s.io/apiserver/pkg/endpoints/installer.go

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
  ......
  fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
  ......
  reqScope := handlers.RequestScope{
    Serializer:      a.group.Serializer, // localscheme.Codecs
    ParameterCodec:  a.group.ParameterCodec,
    Creater:         a.group.Creater, // scheme
    Convertor:       a.group.Convertor, // scheme
    Defaulter:       a.group.Defaulter, // scheme
    Typer:           a.group.Typer, // scheme
    UnsafeConvertor: a.group.UnsafeConvertor, // wrapper of scheme
    Authorizer:      a.group.Authorizer,
    ......
    Kind:        fqKindToRegister,
    ......
  }
  ......
  switch action.Verb {
    case "GET": 
      var handler restful.RouteFunction
      handler = restfulGetResource(getter, reqScope)
      ......
  }
  ......
}

fqKindToRegister, Kind为该API资源所对应的GVK,是scheme根据REST storage从注册的类型中识别出来的,reqScope中的 Serializer 实际上是 localscheme.CodecsCreater, Convertor, Defaulter, Typeer, UnsafeConvertor实际上都指向的是全局的scheme,最终,构造了GET Handler的入口函数:

// k8s.io/apiserver/pkg/endpoints/handlers/get.go

func getResourceHandler(scope *RequestScope, getter getterFunc) http.HandlerFunc {
  return func(w http.ResponseWriter, req *http.Request) {
    ......
    namespace, name, err := scope.Namer.Name(req)
    ......
    outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
    ......
    result, err := getter(ctx, name, req)
    ......
    transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
  }
}

getter()是从数据库中获取到对应name的API对象,并且进行了版本转换,转换成了内部版本,然后在 transformResponseObject() 方法中,又会根据outPutMediaType,以及scope中的Kind, Serializer等,将其转换为目标版本:

// vendor/k8s.io/apiserver/pkg/endpoints/handlers/response.go

func transformResponseObject(ctx context.Context, scope *RequestScope, req *http.Request, w http.ResponseWriter, statusCode int, mediaType negotiation.MediaTypeOptions, result runtime.Object) {
  ......
  kind, serializer, _ := targetEncodingForTransform(scope, mediaType, req)
  responsewriters.WriteObjectNegotiated(serializer, scope, kind.GroupVersion(), w, req, statusCode, obj, false)
}

kind即为目标GVK,serializer即为codecs,实际来自scope中的Serializer。

// k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go

func WriteObjectNegotiated(s runtime.NegotiatedSerializer, restrictions negotiation.EndpointRestrictions, gv schema.GroupVersion, w http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object, listGVKInContentType bool) {
  ......
  mediaType, serializer, err := negotiation.NegotiateOutputMediaType(req, s, restrictions)

  encoder := s.EncoderForVersion(serializer.Serializer, gv)

  request.TrackSerializeResponseObjectLatency(req.Context(), func() {
    if listGVKInContentType {
      SerializeObject(generateMediaTypeWithGVK(serializer.MediaType, mediaType.Convert), encoder, w, req, statusCode, object)
    } else {
      SerializeObject(serializer.MediaType, encoder, w, req, statusCode, object)
    }
  })
}

func SerializeObject(mediaType string, encoder runtime.Encoder, hw http.ResponseWriter, req *http.Request, statusCode int, object runtime.Object) {
  ......
  err := encoder.Encode(object, w)
  ......
}

encoder := s.EncoderForVersion() 则是调用了CodecFactory的工厂方法,创建了一个versioning.codec,然后调用 encoder.Encode() 进行序列化以及版本转换,然后将结果输出到HTTP ResponseWriter中,返回给客户端。

总结

Codec承担着序列化的工作,除了做序列化,还承担着调用scheme的逻辑进行版本转换的工作,所以Codec其实也是实现API多版本的重要机制,跟Scheme可以说是相辅相成。本篇文章分析了Codec的实现原理,本质上Codec是一个工厂方法类,它会为各个API资源进行版本转换和序列化创建一个实例,然后会用在两个场景上,一个是通过HTTP协议跟客户端进行交互,一个是跟数据库进行交互,同时本篇文章也对Codec在这两个场景的相关代码进行了简单的梳理。

如果想更好的理解Codec在这两个场景的作用,就需要了解APIServer的框架以及存储框架,推荐阅读下列文章:

  • Kubernetes APIServer 机制概述中我们介绍到了APIServer的本质其实是一个实现了RESTful API的WebServer,它使用golang的net/http的Server构建,并且Handler是其中非常重要的概念,此外,又简单介绍了APIServer的扩展机制,即Aggregator, APIExtensions以及KubeAPIServer这三者之间通过Delegation的方式实现了扩展。
  • Kubernetes APIServer Storage 框架解析中,我们介绍了APIServer相关的存储框架,每个API对象,都有对应的REST store以及etcd store,它们是如何存储进数据库的。
  • Kubernetes APIServer GenericAPIServer中介绍了GenericAPIServer的作用,以及它的Handler是如何构建,API对象是如何以APIGroupInfo的形式注册进Handler中的,以及PostStartHook的机制。
  • Kubernetes APIServer API Resource Installation中,介绍了KubeAPIServer, Aggregator, APIExtensions中的API对象资源是如何构建成REST Store,并且组织成APIGroupInfo,然后注册进GenericAPIServer中的,然后又盘点了下当前版本的Kubernetes中都有哪些API对象资源。

标签:runtime,Kubernetes,Codec,encoder,API,scheme,Serializer
From: https://www.cnblogs.com/hackerain/p/17828122.html

相关文章

  • Kubernetes API Scheme 解析
    概述在KubernetesAPI多版本和序列化这篇文章中,介绍了API多版本的功能和实现原理,其中Scheme就是其实现原理的一项重要机制,在平时的开发中也经常会遇到,本篇文章就对其进行下分析。Scheme起到了一个类型(Type)注册中心的作用,在APIServer内部,全局只有一个Scheme实例,各个版本的API......
  • Kubernetes API 多版本和序列化
    前言三年前在分析KuberneteAPIServer时,就经常遇到两个东西,一个是Scheme,一个是Codec,当时对它们并不是很理解,也没有去细究,但是后来越来越多的能够遇见它们,尤其是在做KubernetesAPI相关的开发时,Scheme的出镜率很高,于是查了下资料才知道,原来他们跟Kubernetes的API多版本和序列化有......
  • kube-apiserver源码阅读
    kubernetes代码版本:v1.20.2个人认为kube-apiserver是k8s中最核心的组件,承上启下,无论是k8s其他组件还是是外部客户端都需要跟kube-apiserver组件进行交互,kube-apiserver负责接受请求并将数据持久化到后端存储(一般来说就是etcd.)。下面是个人关于kube-apiserver代码阅读的一些记......
  • 使用 AJAX、PHP 和服务器发送事件从 OpenAI 的 API 流式传输数据
    如何使用服务器发送事件(SSE)将数据从上述API流式传输到使用JavaScript和PHP的浏览器客户端?我已经研究这个问题好几个小时了,但我似乎无法弄清楚出了什么问题。作为参考,我尝试在这里调整解决方案:StreamDATAFromopenaiGPT-3APIusingPHP我的代码的其余部分或多或少与上......
  • 接口开放太麻烦?试试阿里云API网关吧
    前言我在多方合作时,系统间的交互是怎么做的?这篇文章中写过一些多方合作时接口的调用规则和例子,然而,接口开放所涉及的安全、权限、监控、流量控制等问题,可不是简简单单就可以解决的,这一般需要专业的开放平台来支撑。但为了开放几个接口就要做一个开放平台,实在是不合算。为此阿里云为......
  • API
    常用APIAPI:应用程序编程接口就是JAVA帮我们已经写好了一些程序,如:类,方法,等,Object类Object是所有类的祖宗toString()//返回对象的字符串表现形式equals(Objecto)//判断两个对象是否相等代码实例publicstaticvoidmain(String[]args){Students=newStu......
  • chatgpt的api联网报错问题解决:openai公司的api联网报错解决
    chatgpt是啥,这里不讲,openai是啥这里也不讲。要知道我们不论是通过网页web使用chatgpt还是使用api方式通过客户端使用chatgpt都是需要使用外国IP的,     ......
  • 二进制安装Kubernetes(k8s)v1.28.3
    二进制安装Kubernetes(k8s)v1.28.3https://github.com/cby-chen/Kubernetes开源不易,帮忙点个star,谢谢了介绍kubernetes(k8s)二进制高可用安装部署,支持IPv4+IPv6双栈。我使用IPV6的目的是在公网进行访问,所以我配置了IPV6静态地址。若您没有IPV6环境,或者不想使用IPv6,不对主机进行......
  • 云原生架构实战07 Kubernetes的核心实战 下
    7、存储抽象pod如果挂掉,在其他的机器启动新pod,原来pod的数据是无法迁移到新机器的;所以使用单独的存储层来解决。将节点上的文件或目录挂载到pod上,此时该目录会变成持久化存储目录,即使Pod被删除后重启,也可以重新加载到该目录,该目录下的文件不会丢失。nfs卷能将NFS(网络文件系统)挂载......
  • 【Windows】WinForms程序调用WinRT的API清空剪切板
    首先这是.NETFramework4.6项目才用的方法,高版本直接安装Microsoft.Windows.SDK.Contracts包来使用就行了。此代码实现了手动调用API获取WinRT的剪切板对象,并通过虚函数指针调用ClearHistory方法清空剪切板历史记录(Win+V)。1usingSystem;2usingSystem.Runtime.Compile......