首页 > 编程语言 >CreateCollection API执行流程_milvus源码解析

CreateCollection API执行流程_milvus源码解析

时间:2023-11-15 17:32:49浏览次数:43  
标签:err partition ts collection field coll API CreateCollection 源码

CreateCollection API执行流程源码解析

milvus版本:v2.3.2

CreateCollection这个API流程较长,也是milvus的核心API之一,涉及的内容比较复杂。这里只介绍和元数据相关的流程。

整体架构:

architecture.png

CreateCollection 的数据流向:

create_collection数据流向.jpg

1.客户端sdk发出CreateCollection API请求。

from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 3000, 1024

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong",shards_num=2)

客户端SDK向proxy发送一个CreateCollection API请求,创建一个名为hello_milvus的collection。

hello_milvus.jpg

2.客户端接受API请求,将request封装为createCollectionTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
	......
    // request封装为task
	cct := &createCollectionTask{
		ctx:                     ctx,
		Condition:               NewTaskCondition(ctx),
		CreateCollectionRequest: request,
		rootCoord:               node.rootCoord,
	}

	......
    // 将task压入ddQueue队列
	if err := node.sched.ddQueue.Enqueue(cct); err != nil {
		......
	}

	......
    // 等待cct执行完
	if err := cct.WaitToFinish(); err != nil {
		......
	}

	......
}

3.执行createCollectionTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
	var err error
	t.result, err = t.rootCoord.CreateCollection(ctx, t.CreateCollectionRequest)
	return err
}

从代码可以看出调用了rootCoord的CreateCollection接口。

4.进入rootCoord的CreateCollection接口。

代码路径:internal\rootcoord\root_coord.go

继续将请求封装为rootcoord里的createCollectionTask

func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
	......
    // 封装为createCollectionTask
	t := &createCollectionTask{
		baseTask: newBaseTask(ctx, c),
		Req:      in,
	}
    // 加入调度
	if err := c.scheduler.AddTask(t); err != nil {
		......
	}
    // 等待task完成
	if err := t.WaitToFinish(); err != nil {
		......
	}

	......
}

5.执行createCollectionTask的Prepare、Execute、NotifyDone方法。

Execute()为核心方法。

代码路径:internal\rootcoord\create_collection_task.go

func (t *createCollectionTask) Execute(ctx context.Context) error {
	// collID为collectionID,在Prepare()里分配
	// partIDs为partitionID,在Prepare()里分配
	collID := t.collID
	partIDs := t.partIDs
	// 产生时间戳
	ts, err := t.getCreateTs()
	if err != nil {
		return err
	}
	// vchanNames为虚拟channel,在Prepare()里分配
	// chanNames为物理channel,在Prepare()里分配
	vchanNames := t.channels.virtualChannels
	chanNames := t.channels.physicalChannels

	startPositions, err := t.addChannelsAndGetStartPositions(ctx, ts)
	if err != nil {
		t.core.chanTimeTick.removeDmlChannels(t.channels.physicalChannels...)
		return err
	}
	// 填充partition,创建collection的时候,默认只有一个名为"Default partition"的partition。
	partitions := make([]*model.Partition, len(partIDs))
	for i, partID := range partIDs {
		partitions[i] = &model.Partition{
			PartitionID:               partID,
			PartitionName:             t.partitionNames[i],
			PartitionCreatedTimestamp: ts,
			CollectionID:              collID,
			State:                     pb.PartitionState_PartitionCreated,
		}
	}
	// 填充collection
	// 可以看出collection由collID、dbid、schemaName、fields、vchanName、chanName、partition、shardNum等组成
	collInfo := model.Collection{
		CollectionID:         collID,
		DBID:                 t.dbID,
		Name:                 t.schema.Name,
		Description:          t.schema.Description,
		AutoID:               t.schema.AutoID,
		Fields:               model.UnmarshalFieldModels(t.schema.Fields),
		VirtualChannelNames:  vchanNames,
		PhysicalChannelNames: chanNames,
		ShardsNum:            t.Req.ShardsNum,
		ConsistencyLevel:     t.Req.ConsistencyLevel,
		StartPositions:       toKeyDataPairs(startPositions),
		CreateTime:           ts,
		State:                pb.CollectionState_CollectionCreating,
		Partitions:           partitions,
		Properties:           t.Req.Properties,
		EnableDynamicField:   t.schema.EnableDynamicField,
	}

	clone := collInfo.Clone()

	existedCollInfo, err := t.core.meta.GetCollectionByName(ctx, t.Req.GetDbName(), t.Req.GetCollectionName(), typeutil.MaxTimestamp)
	if err == nil {
		equal := existedCollInfo.Equal(*clone)
		if !equal {
			return fmt.Errorf("create duplicate collection with different parameters, collection: %s", t.Req.GetCollectionName())
		}

		log.Warn("add duplicate collection", zap.String("collection", t.Req.GetCollectionName()), zap.Uint64("ts", ts))
		return nil
	}
	// 分为多个step执行,每一个undoTask由todoStep和undoStep构成
	// 执行todoStep,报错则执行undoStep
	undoTask := newBaseUndoTask(t.core.stepExecutor)
	undoTask.AddStep(&expireCacheStep{
		baseStep:        baseStep{core: t.core},
		dbName:          t.Req.GetDbName(),
		collectionNames: []string{t.Req.GetCollectionName()},
		collectionID:    InvalidCollectionID,
		ts:              ts,
	}, &nullStep{})
	undoTask.AddStep(&nullStep{}, &removeDmlChannelsStep{
		baseStep:  baseStep{core: t.core},
		pChannels: chanNames,
	}) 
	undoTask.AddStep(&addCollectionMetaStep{
		baseStep: baseStep{core: t.core},
		coll:     &collInfo,
	}, &deleteCollectionMetaStep{
		baseStep:     baseStep{core: t.core},
		collectionID: collID,
		ts: ts,
	})

	undoTask.AddStep(&nullStep{}, &unwatchChannelsStep{
		baseStep:     baseStep{core: t.core},
		collectionID: collID,
		channels:     t.channels,
		isSkip:       !Params.CommonCfg.TTMsgEnabled.GetAsBool(),
	})
	undoTask.AddStep(&watchChannelsStep{
		baseStep: baseStep{core: t.core},
		info: &watchInfo{
			ts:             ts,
			collectionID:   collID,
			vChannels:      t.channels.virtualChannels,
			startPositions: toKeyDataPairs(startPositions),
			schema: &schemapb.CollectionSchema{
				Name:        collInfo.Name,
				Description: collInfo.Description,
				AutoID:      collInfo.AutoID,
				Fields:      model.MarshalFieldModels(collInfo.Fields),
			},
		},
	}, &nullStep{})
	undoTask.AddStep(&changeCollectionStateStep{
		baseStep:     baseStep{core: t.core},
		collectionID: collID,
		state:        pb.CollectionState_CollectionCreated,
		ts:           ts,
	}, &nullStep{})

	return undoTask.Execute(ctx)
}

创建collection涉及多个步骤,可以看出这里依次分为expireCacheStep、addCollectionMetaStep、watchChannelsStep、changeCollectionStateStep这几个步骤,关于etcd元数据的操作,这里重点关注addCollectionMetaStep。其余step另用篇幅进行讲解。

6.进入addCollectionMetaStep,执行其Execute()方法。

代码路径:internal\rootcoord\step.go

func (s *addCollectionMetaStep) Execute(ctx context.Context) ([]nestedStep, error) {
	err := s.core.meta.AddCollection(ctx, s.coll)
	return nil, err
}

在这里重点研究s.core.meta.AddCollection()这个方法做了什么事情。

调用栈如下:

CreateCollection()(internal\proxy\impl.go)
 |--Execute()(internal\proxy\task.go)
   |--t.rootCoord.CreateCollection()(同上)
     |--CreateCollection()(rpc调用,internal\rootcoord\root_coord.go)
       |--Execute()(internal\rootcoord\create_collection_task.go)
         |--Execute()(internal\rootcoord\step.go)
           |--s.core.meta.AddCollection()
             |--AddCollection()(internal\rootcoord\meta_table.go)
               |--mt.catalog.CreateCollection()
                 |--CreateCollection()(internal\metastore\kv\rootcoord\kv_catalog.go)
                   |--kc.Snapshot.Save()
                   |--etcd.SaveByBatchWithLimit()

create_collection堆栈.jpg

在etcd产生collection相关的key:

==root-coord/database/collection-info/1/445652621026918798==

value的值的结构为etcdpb.CollectionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

collSchema := &schemapb.CollectionSchema{
	Name:               coll.Name,
	Description:        coll.Description,
	AutoID:             coll.AutoID,
	EnableDynamicField: coll.EnableDynamicField,
}

collectionPb := &pb.CollectionInfo{
	ID:                   coll.CollectionID,
	DbId:                 coll.DBID,
	Schema:               collSchema,
	CreateTime:           coll.CreateTime,
	VirtualChannelNames:  coll.VirtualChannelNames,
	PhysicalChannelNames: coll.PhysicalChannelNames,
	ShardsNum:            coll.ShardsNum,
	ConsistencyLevel:     coll.ConsistencyLevel,
	StartPositions:       coll.StartPositions,
	State:                coll.State,
	Properties:           coll.Properties,
}

collectionInfo.jpg

可以看出collection由ID、DbId、schema等组成,其中schema不记录Fields,也不记录partitionID、partitionName、FieldIndex。其它信息由另外的key-value记录。

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
	if coll.State != pb.CollectionState_CollectionCreating {
		return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name)
	}
	// 构建key的规则
	k1 := BuildCollectionKey(coll.DBID, coll.CollectionID)
	collInfo := model.MarshalCollectionModel(coll)
    // 序列化
	v1, err := proto.Marshal(collInfo)
	if err != nil {
		return fmt.Errorf("failed to marshal collection info: %s", err.Error())
	}

	// 写入etcd
	if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {
		return err
	}

	......
}

跟踪BuildCollectionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/database/collection-info/{dbID}/{collectionID}
  • 前缀/snapshots/root-coord/database/collection-info/{dbID}/{collectionID}_ts{时间戳}

根据路径能够反映出collection属于哪个DB。默认数据库名为default,dbID为1。

在etcd还会产生partition相关的key:

==root-coord/partitions/445653146967736660/445653146967736661==

value的值的结构为etcdpb.PartitionInfo,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&pb.PartitionInfo{
	PartitionID:               partition.PartitionID,
	PartitionName:             partition.PartitionName,
	PartitionCreatedTimestamp: partition.PartitionCreatedTimestamp,
	CollectionId:              partition.CollectionID,
	State:                     partition.State,
}

partitionInfo.jpg

可以看出来partition包括partitionID、partitionName、collectionId等。

for _, partition := range coll.Partitions {
	k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
	partitionInfo := model.MarshalPartitionModel(partition)
	v, err := proto.Marshal(partitionInfo)
	if err != nil {
		return err
	}
	kvs[k] = string(v)
}

跟踪BuildPartitionKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/partitions/{collectionID}/{partitionID}

  • 前缀/snapshots/root-coord/partitions/{collectionID}/{partitionID}_ts{时间戳}

由路径可以反映出partition属于哪个collection。

一个collection可以包含多个partition。默认partition名为:_default。

分区名称可配置(milvus.yml):

common.defaultPartitionName

在etcd还会产生field相关的key:

==root-coord/fields/445653146967736660/100==

value的值的结构为schemapb.FieldSchema ,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&schemapb.FieldSchema{
	FieldID:        field.FieldID,
	Name:           field.Name,
	IsPrimaryKey:   field.IsPrimaryKey,
	Description:    field.Description,
	DataType:       field.DataType,
	TypeParams:     field.TypeParams,
	IndexParams:    field.IndexParams,
	AutoID:         field.AutoID,
	IsDynamic:      field.IsDynamic,
	IsPartitionKey: field.IsPartitionKey,
	DefaultValue:   field.DefaultValue,
	ElementType:    field.ElementType,
}

fieldInfo.jpg

fieldInfo记录了字段的filedID、name、description、datatype等信息。

for _, field := range coll.Fields {
	k := BuildFieldKey(coll.CollectionID, field.FieldID)
	fieldInfo := model.MarshalFieldModel(field)
	v, err := proto.Marshal(fieldInfo)
	if err != nil {
		return err
	}
	kvs[k] = string(v)
}

跟踪BuildFieldKey()函数,不难得出key的规则。整理如下:

key规则:

  • 前缀/root-coord/fields/{collectionID}/{fieldID}
  • 前缀/snapshots/root-coord/fields/{collectionID}/{fieldID}_ts{时间戳}

从路径可以反映field属于哪个collection。一个field就是一个字段。

kvs.jpg

将kvs批量写入etcd。kvs既有partition,又有field。

完整代码:

func (kc *Catalog) CreateCollection(ctx context.Context, coll *model.Collection, ts typeutil.Timestamp) error {
	if coll.State != pb.CollectionState_CollectionCreating {
		return fmt.Errorf("cannot create collection with state: %s, collection: %s", coll.State.String(), coll.Name)
	}
    // 构建collection的key规则
	k1 := BuildCollectionKey(coll.DBID, coll.CollectionID)
    // 填充collection
	collInfo := model.MarshalCollectionModel(coll)
    // 序列化
	v1, err := proto.Marshal(collInfo)
	if err != nil {
		return fmt.Errorf("failed to marshal collection info: %s", err.Error())
	}

	// 写入etcd,最终会写入2个key,一个原始的,一个加snapshots
	if err := kc.Snapshot.Save(k1, string(v1), ts); err != nil {
		return err
	}

	kvs := map[string]string{}

	// 构建partition
	for _, partition := range coll.Partitions {
        // 构建partition的key规则
		k := BuildPartitionKey(coll.CollectionID, partition.PartitionID)
        // 填充partition
		partitionInfo := model.MarshalPartitionModel(partition)
        // 序列化
		v, err := proto.Marshal(partitionInfo)
		if err != nil {
			return err
		}
		kvs[k] = string(v)
	}

	// 构建field
	for _, field := range coll.Fields {
        // 构建field的key规则
		k := BuildFieldKey(coll.CollectionID, field.FieldID)
        // 填充field
		fieldInfo := model.MarshalFieldModel(field)
        // 序列化
		v, err := proto.Marshal(fieldInfo)
		if err != nil {
			return err
		}
		kvs[k] = string(v)
	}

	// 批量写入etcd,传入一个key,最终会写入2个key,一个原始的,一个加snapshots
	return etcd.SaveByBatchWithLimit(kvs, maxTxnNum/2, func(partialKvs map[string]string) error {
		return kc.Snapshot.MultiSave(partialKvs, ts)
	})
}

使用etcd-manager查看etcd。

partition-key.jpg

field-key.jpg

客户端SDK使用了3个field,分别是pk、random、embeddings。

fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

每一个field都分配有一个fieldID,例如本例中pk分配100、random分配101、embedding分配102。

但是注意还会产生2个fieldID,一个为0、一个为1。

总结:

1.CreateCollection由proxy传递给协调器rootCoord操作etcd。

2.CreateCollection最终会在etcd上写入3种类型的key

  • collection

    前缀/root-coord/database/collection-info/{dbID}/{collectionID}

  • partition

    前缀/root-coord/partitions/{collectionID}/{partitionID}

  • field

    前缀/root-coord/fields/{collectionID}/{fieldID}

标签:err,partition,ts,collection,field,coll,API,CreateCollection,源码
From: https://blog.51cto.com/huangzhimao/8399491

相关文章

  • WhatsApp个人号、Business号、API号到底有什么区别
    WhatsApp作为全球苹果应用商店中用户下载量最多的社交网络应用,在全球有25亿的用户,每日活跃人数超过5亿人。很多人都以为WhatsApp就是一个软件,但是其实它是个家族,里面共有三个成员,分别是WhatsApp Messenger,WhatsApp Business和WhatsApp Business API。按照它们推出市场的时间来......
  • Flask简介、Flask创建和运行、fastapi、显示用户小案例
    Flask简介#python主流的web框架-Django-fastapi:异步-flask#flask是个微型的web框架,不像djagno那么庞大,django有很多内置app,缓存,信号,消息,权限,admin#flask随着项目越来越大,使用第三方插件,越来越像django模版渲染:jinja2web服务器:WerkzeugWS......
  • warp-transducer源码安装,warprnnt_pytorch生成
    warp-transducer是可以在CPU和GPU上实现并行RNN-transducer的开源库,现在支持pytroch和tensorflow的调用,主要用来计算RNNTLoss,本文是在ubuntu18.04的系统上进行的。warp-transducer下载地址源码编译warp-transducer首先获取warp-transducer的代码:gitclonehttps://github.com/......
  • 成品直播源码,JAVA获取图片的宽、高和大小
    成品直播源码,JAVA获取图片的宽、高和大小如果是本地磁盘文件     Filefile=newFile("C:\\Users\\root\\Desktop\\test.jpg");    BufferedImagebufferedImage=ImageIO.read(newFileInputStream(file));    intheight=bufferedImage.getHei......
  • 短视频app源码,自定义快速滚动条FastScrollBar
    短视频app源码,自定义快速滚动条FastScrollBarAndroidMainfest.xml中  <activityandroid:name=".MainActivity"      android:theme="@style/FastScrollTheme">      <intent-filter>        <actionandroid:name="androi......
  • 贷款额度生成器,支持信用卡APP版,虚拟截图E4a开发,仅提供源码
    闲着无聊用易安卓开发了一个额度生成器,原理非常简单,就网上找了一个模版,加了几个编辑框,用PS消除了原有的信息内容,在上面加了一些标签,实现用户输入确定按钮后改变图片上的标签内容,下面是框架图:框架界面图:  生成框架图:  代码:【e4a】:==================================......
  • 源码级JVS低代码功能新增:动态配置、逻辑多级循环嵌套等等
    低代码更新功能新增:1.下拉组件选项新增动态配置;选项的内容可以根据特定的条件或数据源进行动态变化的功能,通过动态配置,用户可以灵活地设置下拉组件的选项内容,例如从数据库或其他数据源中获取选项数据,或者根据用户的操作动态改变选项。2.新增应用操作日志详情;操作日志是用来记录轻......
  • 率先支持Kuasar!iSulad Sandbox API 简化调用链,沙箱管理能力增强
    本文分享自华为云社区《率先支持Kuasar!iSuladSandboxAPI简化调用链,可靠性倍增》,作者:云容器大未来。沙箱隔离技术是一种将进程有效隔离到独立环境中运行的技术。随着容器技术的兴起,沙箱隔离技术也在云原生领域中得到了广泛的应用。iSulad率先通过SandboxAPI支持Kuasar,提供......
  • zookeeper之curator API
    参考:https://www.jianshu.com/p/075f3262938c概述ApacheCurator是一个比较完善的,由Netflix公司开源的一套ZooKeeper的JAVA客户端框架组件。解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。PatrixckHunt(Zookeep......
  • Linux socket API
    socket是进程通信机制的一种,与PIPE、FIFO不同的是,socket即可以在同一台主机通信(unixdomain),也可以通过网络在不同主机上的进程间通信(如:ipv4、ipv6),例如因特网,应用层通过调用socketAPI来与内核TCP/IP协议栈的通信,通过网络字节实现不用主机之间的数据传输。前置条件字节序对于多......