首页 > 其他分享 >go pipeline

go pipeline

时间:2023-02-15 16:22:04浏览次数:55  
标签:pipeline return err nil CreateProcessor cluster v2 go

// Copyright © 2021 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package processor

import (
"context"
"errors"
"fmt"

"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/labring/sealos/pkg/bootstrap"
"github.com/labring/sealos/pkg/buildah"
"github.com/labring/sealos/pkg/checker"
"github.com/labring/sealos/pkg/clusterfile"
"github.com/labring/sealos/pkg/config"
"github.com/labring/sealos/pkg/constants"
"github.com/labring/sealos/pkg/filesystem"
"github.com/labring/sealos/pkg/guest"
"github.com/labring/sealos/pkg/runtime"
v2 "github.com/labring/sealos/pkg/types/v1beta1"
"github.com/labring/sealos/pkg/utils/logger"
"github.com/labring/sealos/pkg/utils/rand"
"github.com/labring/sealos/pkg/utils/yaml"
)

type CreateProcessor struct {
ClusterFile clusterfile.Interface
Buildah buildah.Interface
Runtime runtime.Interface
Guest guest.Interface
}

func (c *CreateProcessor) Execute(cluster *v2.Cluster) error {
pipeLine, err := c.GetPipeLine()
if err != nil {
return err
}
for _, f := range pipeLine {
if err = f(cluster); err != nil {
return err
}
}

return nil
}

func (c *CreateProcessor) GetPipeLine() ([]func(cluster *v2.Cluster) error, error) {
var todoList []func(cluster *v2.Cluster) error
todoList = append(todoList,
// c.GetPhasePluginFunc(plugin.PhaseOriginally),
c.Check,
c.PreProcess,
c.RunConfig,
c.MountRootfs,
c.MirrorRegistry,
c.Bootstrap,
// c.GetPhasePluginFunc(plugin.PhasePreInit),
c.Init,
c.Join,
// c.GetPhasePluginFunc(plugin.PhasePreGuest),
c.RunGuest,
// c.GetPhasePluginFunc(plugin.PhasePostInstall),
)
return todoList, nil
}

func (c *CreateProcessor) Check(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Check in CreateProcessor.")
err := checker.RunCheckList([]checker.Interface{checker.NewHostChecker()}, cluster, checker.PhasePre)
if err != nil {
return err
}
return nil
}

func (c *CreateProcessor) CheckImageType(cluster *v2.Cluster) error {
imageTypes := sets.NewString()
for _, image := range cluster.Spec.Image {
oci, err := c.Buildah.InspectImage(image)
if err != nil {
return err
}
if oci.Config.Labels != nil {
imageTypes.Insert(oci.Config.Labels[v2.ImageTypeKey])
} else {
imageTypes.Insert(string(v2.AppImage))
}
}
if !imageTypes.Has(string(v2.RootfsImage)) {
return errors.New("can't apply ApplicationImage, kubernetes cluster not found, need to run a BaseImage")
}
return nil
}

func (c *CreateProcessor) PreProcess(cluster *v2.Cluster) error {
logger.Info("Executing pipeline PreProcess in CreateProcessor.")
err := c.Buildah.Pull(cluster.Spec.Image, buildah.WithPlatformOption(buildah.DefaultPlatform()),
buildah.WithPullPolicyOption(buildah.PullIfMissing.String()))
if err != nil {
return err
}
if err = c.CheckImageType(cluster); err != nil {
return err
}
for _, img := range cluster.Spec.Image {
bderInfo, err := c.Buildah.Create(rand.Generator(8), img)
if err != nil {
return err
}
mount := &v2.MountImage{
Name: bderInfo.Container,
ImageName: img,
MountPoint: bderInfo.MountPoint,
}
if err = OCIToImageMount(mount, c.Buildah); err != nil {
return err
}
cluster.Status.Mounts = append(cluster.Status.Mounts, *mount)
}
if err = SyncClusterStatus(cluster, c.Buildah, false); err != nil {
return err
}
runTime, err := runtime.NewDefaultRuntime(cluster, c.ClusterFile.GetKubeadmConfig())
if err != nil {
return fmt.Errorf("failed to init runtime, %v", err)
}
c.Runtime = runTime
return nil
}

func (c *CreateProcessor) RunConfig(cluster *v2.Cluster) error {
logger.Info("Executing pipeline RunConfig in CreateProcessor.")
eg, _ := errgroup.WithContext(context.Background())
for _, cManifest := range cluster.Status.Mounts {
manifest := cManifest
eg.Go(func() error {
cfg := config.NewConfiguration(manifest.ImageName, manifest.MountPoint, c.ClusterFile.GetConfigs())
return cfg.Dump()
})
}
return eg.Wait()
}

func (c *CreateProcessor) MountRootfs(cluster *v2.Cluster) error {
logger.Info("Executing pipeline MountRootfs in CreateProcessor.")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
fs, err := filesystem.NewRootfsMounter(cluster.Status.Mounts)
if err != nil {
return err
}
return fs.MountRootfs(cluster, hosts)
}

func (c *CreateProcessor) MirrorRegistry(cluster *v2.Cluster) error {
logger.Info("Executing pipeline MirrorRegistry in CreateProcessor.")
return MirrorRegistry(cluster, cluster.Status.Mounts)
}

func (c *CreateProcessor) Bootstrap(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Bootstrap in CreateProcessor")
hosts := append(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList()...)
bs := bootstrap.New(cluster)
if err := bs.Preflight(hosts...); err != nil {
return err
}
if err := bs.Init(hosts...); err != nil {
return err
}
return bs.ApplyAddons(hosts...)
}

func (c *CreateProcessor) Init(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Init in CreateProcessor.")
return c.Runtime.Init()
}

func (c *CreateProcessor) Join(cluster *v2.Cluster) error {
logger.Info("Executing pipeline Join in CreateProcessor.")
err := c.Runtime.JoinMasters(cluster.GetMasterIPAndPortList()[1:])
if err != nil {
return err
}
err = c.Runtime.JoinNodes(cluster.GetNodeIPAndPortList())
if err != nil {
return err
}
err = c.Runtime.SyncNodeIPVS(cluster.GetMasterIPAndPortList(), cluster.GetNodeIPAndPortList())
if err != nil {
return err
}
return yaml.MarshalYamlToFile(constants.Clusterfile(cluster.Name), cluster)
}

func (c *CreateProcessor) RunGuest(cluster *v2.Cluster) error {
logger.Info("Executing pipeline RunGuest in CreateProcessor.")
return c.Guest.Apply(cluster, cluster.Status.Mounts)
}

func NewCreateProcessor(name string, clusterFile clusterfile.Interface) (Interface, error) {
bder, err := buildah.New(name)
if err != nil {
return nil, err
}
gs, err := guest.NewGuestManager()
if err != nil {
return nil, err
}

return &CreateProcessor{
ClusterFile: clusterFile,
Buildah: bder,
Guest: gs,
}, nil
}

标签:pipeline,return,err,nil,CreateProcessor,cluster,v2,go
From: https://www.cnblogs.com/cheyunhua/p/17123484.html

相关文章

  • go 二维数组
    1.定义方式有两种1)先声明/定义,再赋值var数组名[大小][大小]类型funcmain(){//二维数组示例演示/*00000000100002......
  • django解决跨域请求
    安装django-cors-headerspython-mpipinstalldjango-cors-headers在#setting.py下添加如下代码INSTALLED_APPS=[...,"corsheaders",...,]MIDDL......
  • Go实现Web 应用程序
    Go内置有net/http包使用这个http包可以实现Web服务。 通过下面简单的两个函数,就可以搭建一个Web服务,同时该Web服务具有高并发的特性。http.HandleFunc("/favicon......
  • Python+Django(3):创建主页
    打开项目主文件夹learning_log中的文件urls.py:fromdjango.contribimportadminfromdjango.urlsimportpath,re_pathasurl,includeurlpatterns=[pat......
  • go追加文件
    RunE:func(cmd*cobra.Command,args[]string)error{objects,err:=apply.NewClusterFromArgs(args,genArgs)iferr!=nil{returnerr}data,e......
  • MongoDB连接字符串的URI格式
    两种的连接字符串格式1.标准的连接格式mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]](1)单机连接格式mongodb://user......
  • Python+Django(2):创建应用程序
    新打开一个终端窗口,切换到manage.py所在的目录激活虚拟环境:ll_env\Scripts\activate命令startappappname让Django建立创建应用程序所需的基础设施:pythonmanage.pyst......
  • Go 工程化 - JSON 使用技巧
    概述在 Go快速入门指南-JSON 讲解了 JSON 的常用方法,但是除此之外,JSON 还有一些鲜为人知的使用技巧,可以简洁地组合和忽略结构体字段,避免了重新定义结构体和内嵌......
  • Codeforces Round #852 (Div. 2) D - Moscow Gorillas
    https://codeforces.com/contest/1793/problem/D不妨枚举MEX(...)的值x。此时对于序列[l,r],需要满足:两个序列的1到x-1都在这个区间内,并且x都不在这个区间内......
  • Python+Django(1):建立项目
    为项目新建一个目录,将其命名为learning_log,再在终端中切换到这个目录(Python3):运行模块venv来创建一个名为ll_env的虚拟环境:python-mvenvll_env激活虚拟环境:ll_env\S......