首页 > 编程语言 >CreateIndex API执行流程_milvus源码解析

CreateIndex API执行流程_milvus源码解析

时间:2023-11-23 16:32:13浏览次数:41  
标签:API index err ...... req 源码 cit CreateIndex

CreateIndex API执行流程源码解析

milvus版本:v2.3.2

整体架构:

architecture.png

CreateIndex 的数据流向:

create_index数据流向.jpg

1.客户端sdk发出CreateIndex API请求。

import numpy as np
from pymilvus import (
    connections,
    FieldSchema, CollectionSchema, DataType,
    Collection,
)

num_entities, dim = 2000, 8


print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")


fields = [
    FieldSchema(name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
    FieldSchema(name="random", dtype=DataType.DOUBLE),
    FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, dim=dim)
]

schema = CollectionSchema(fields, "hello_milvus is the simplest demo to introduce the APIs")

print("Create collection `hello_milvus`")
hello_milvus = Collection("hello_milvus", schema, consistency_level="Strong")


print("Start inserting entities")
rng = np.random.default_rng(seed=19530)
entities = [
    # provide the pk field because `auto_id` is set to False
    [str(i) for i in range(num_entities)],
    rng.random(num_entities).tolist(),  # field random, only supports list
    rng.random((num_entities, dim)),    # field embeddings, supports numpy.ndarray and list
]

insert_result = hello_milvus.insert(entities)

hello_milvus.flush()

print("Start Creating index IVF_FLAT")
index = {
    "index_type": "IVF_FLAT",
    "metric_type": "L2",
    "params": {"nlist": 8},
}

hello_milvus.create_index("embeddings", index,index_name="idx_embeddings")

客户端SDK向proxy发送一个CreateIndex API请求,在embeddings列上创建一个名为idx_embeddings的索引。

2.客户端接受API请求,将request封装为createIndexTask,并压入ddQueue队列。

代码路径:internal\proxy\impl.go

// CreateIndex create index for collection.
func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
	......
    
	// request封装为task
	cit := &createIndexTask{
		ctx:                ctx,
		Condition:          NewTaskCondition(ctx),
		req:                request,
		rootCoord:          node.rootCoord,
		datacoord:          node.dataCoord,
		replicateMsgStream: node.replicateMsgStream,
	}

	......
    // 将task压入ddQueue队列

	if err := node.sched.ddQueue.Enqueue(cit); err != nil {
		......
	}

	......
    // 等待cct执行完
	if err := cit.WaitToFinish(); err != nil {
		......
	}

	......
}

3.执行createIndexTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()一般为真正执行逻辑。

代码路径:internal\proxy\task_index.go

func (cit *createIndexTask) Execute(ctx context.Context) error {
	......
	req := &indexpb.CreateIndexRequest{
		CollectionID:    cit.collectionID,
		FieldID:         cit.fieldSchema.GetFieldID(),
		IndexName:       cit.req.GetIndexName(),
		TypeParams:      cit.newTypeParams,
		IndexParams:     cit.newIndexParams,
		IsAutoIndex:     cit.isAutoIndex,
		UserIndexParams: cit.newExtraParams,
		Timestamp:       cit.BeginTs(),
	}
	cit.result, err = cit.datacoord.CreateIndex(ctx, req)
	......
	SendReplicateMessagePack(ctx, cit.replicateMsgStream, cit.req)
	return nil
}

从代码可以看出调用了datacoord的CreateIndex接口。

4.进入datacoord的CreateIndex接口。

代码路径:internal\datacoord\index_service.go

// CreateIndex create an index on collection.
// Index building is asynchronous, so when an index building request comes, an IndexID is assigned to the task and
// will get all flushed segments from DataCoord and record tasks with these segments. The background process
// indexBuilder will find this task and assign it to IndexNode for execution.
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
	......
	// 分配indexID,indexID=0
	indexID, err := s.meta.CanCreateIndex(req)
	......

	if indexID == 0 {
		// 分配indexID
		indexID, err = s.allocator.allocID(ctx)
		......
	}

	index := &model.Index{
		CollectionID:    req.GetCollectionID(),
		FieldID:         req.GetFieldID(),
		IndexID:         indexID,
		IndexName:       req.GetIndexName(),
		TypeParams:      req.GetTypeParams(),
		IndexParams:     req.GetIndexParams(),
		CreateTime:      req.GetTimestamp(),
		IsAutoIndex:     req.GetIsAutoIndex(),
		UserIndexParams: req.GetUserIndexParams(),
	}

	// Get flushed segments and create index

	err = s.meta.CreateIndex(index)
	......

	// 将collectionID发送到channel,其它的goroutine进行消费。
	select {
	case s.notifyIndexChan <- req.GetCollectionID():
	default:
	}

	......
}

变量index:

index_model.jpg

5.进入s.meta.CreateIndex()

代码路径:internal\datacoord\index_meta.go

func (m *meta) CreateIndex(index *model.Index) error {
	......
	// 写入etcd元数据
	if err := m.catalog.CreateIndex(m.ctx, index); err != nil {
		......
	}

	m.updateCollectionIndex(index)
	......
}

在这里重点研究m.catalog.CreateIndex()这个方法做了什么事情。

func (kc *Catalog) CreateIndex(ctx context.Context, index *model.Index) error {
	key := BuildIndexKey(index.CollectionID, index.IndexID)

	value, err := proto.Marshal(model.MarshalIndexModel(index))
	if err != nil {
		return err
	}

	err = kc.MetaKv.Save(key, string(value))
	if err != nil {
		return err
	}
	return nil
}

在etcd会产生1个key。

==field-index/445834678636119060/445834678636519085==

value的值的结构为indexpb.FieldIndex,然后进行protobuf序列化后存入etcd。

因此etcd存储的是二进制数据。

&indexpb.FieldIndex{
	IndexInfo: &indexpb.IndexInfo{
		CollectionID:    index.CollectionID,
		FieldID:         index.FieldID,
		IndexName:       index.IndexName,
		IndexID:         index.IndexID,
		TypeParams:      index.TypeParams,
		IndexParams:     index.IndexParams,
		IsAutoIndex:     index.IsAutoIndex,
		UserIndexParams: index.UserIndexParams,
	},
	Deleted:    index.IsDeleted,
	CreateTime: index.CreateTime,
}

fieldindex.jpg

跟踪BuildIndexKey()函数,即可以得到key的规则。整理如下:

key规则:

  • 前缀/field-index/{collectionID}/{IndexID}

可以反映index属于哪个collection。Index的value可以反映属于哪个field。

不能反映属于哪个partition、哪个segment。

总结:

  • CreateIndex由proxy传递给协调器dataCoord操作etcd。
  • CreateIndex最终会在etcd上写入1种类型的key(其实还有一种,在另一篇中进行介绍)。

标签:API,index,err,......,req,源码,cit,CreateIndex
From: https://blog.51cto.com/huangzhimao/8530904

相关文章

  • ES插入报错 索引只读:blocked by: [FORBIDDEN/12/index read-only / allow delete (ap
     ES插入报错:reason:ElasticsearchException[Elasticsearchexception[type=cluster_block_exception,reason=blockedby:[FORBIDDEN/12/indexread-only/allowdelete(api)];]]   这种大多都是因为磁盘空间不足了超过设置的阈值,一般80%所以索引变成只读了,要把空......
  • OpenHarmony之NAPI框架介绍
     张志成诚迈科技高级技术专家 NAPI是什么NAPI的概念源自Nodejs,为了实现javascript脚本与C++库之间的相互调用,Nodejs对V8引擎的api做了一层封装,称为NAPI。可以在Nodejs官网(https://nodejs.org/dist/latest-v20.x/docs/api/n-api.html)上查看各种NAPI接口定义说明。可以......
  • 收银系统源码-线上商城篇,打通线下收银台,ERP进销存
    智慧新零售系统是一套线下线上打通一体化的saas收银系统。主要为零售行业提供多样化的线下收银、双端线上商城、erp进销存、精细化会员管理、50+营销插件、企微私域scrm等行业解决方案,帮助商户做数字化升级!今天我们重点聊一下线上商城功能。智慧新零售线上商城主要分为微信和支付宝......
  • 直播平台源码,vue图片中划框截取部分图片
    直播平台源码,vue图片中划框截取部分图片 <template> <div>  <el-dialogtitle="请在图片上画出需要新增故障的位置":visible.sync="dialogVisible"width="1270px":before-close="handleClose":close-on-click-modal="false":close......
  • 使用Python调用API接口获取小红书笔记详情数据
    本文将详细介绍如何使用Python编程语言调用小红书API接口,以获取小红书笔记的详情数据。我们将从以下几个方面展开讨论:1)API接口简介;2)Python环境准备;3)API密钥获取;4)使用Requests库发送API请求;5)解析响应数据;6)异常处理与错误排查。一、API接口简介API(应用程序编程接口)是一种......
  • 微信小程序商店导航网站系统模板源码
    微信小程序商店导航网站系统源码是基于帝国cms开源程序开发的一款小程序二维码收录大全的导航系统。模板源码无任务限制,可以随便修改成喜欢的页面,有能力者可以二次开发。本小程序导航网站源码前端自适应手机WAP页面。适用范围:微信小程序导航网站、微信公众号导航网站、微......
  • obproxy 源码编译以及一些问题整理-暂未编译成功
    尝试自己编译下oceanbase的obproxy并记录下一些问题,目前是暂未编译成功,因为是openssl版本包的问题环境说明基于了RockyLinuxrelease8.8,同时obproxy使用了4.2.1版本的构建参考命令这个官方已经提供了,主要就是initdebug,makeshbuild.shinitshbuild.sh......
  • Java商城网站系统设计与实现(带源码)
    基于Java的商城网站系统设计与实现功能介绍平台采用B/S结构,后端采用主流的Springboot框架进行开发,前端采用主流的Vue.js进行开发。整个平台包括前台和后台两个部分。前台功能包括:首页、商品详情页、订单、用户中心模块。后台功能包括:总览、订单管理、商品管理、分类管理、标签管理......
  • MFC-ODBC API动态连接配置数据库
    一、ODBC管理器介绍在Window中,ODBC数据远管理器有6个标签:用户DSN、系统DSN、文件DSN、驱动程序、跟踪、连接池,通常情况下,使用用户DSN或者系统DSN,这里主要了解用户DSN和系统DSN即可。用户DSN:ODBC用户数据源存储了如何与指定的数据库提供者连接的信息,只有当前用户可见。系统DSN:O......
  • 一个基于php+mysql的外卖订餐网站(带源码)
    订饭组一个基于php+mysql的外卖订餐网站,包括前端和后台。测试账号:admin密码:admin运行环境:Apache+PHP+Mysql代码说明htaccessRewrite配置文件,需要放入到项目根目录configs.php需要配置数据库连接信息(主机、用户名、密码),系统常量等data.sql位于data目录中,是数据库备份文件,需要提......