首页 > 其他分享 >milvus操作

milvus操作

时间:2024-09-18 16:13:38浏览次数:9  
标签:index code name collection item print 操作 milvus

import json
import sys
import time
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility, Index


# 连接到 Milvus
def connect_milvus(host='xxxxxx', port='31800'):
print("Connecting to Milvus...")
connections.connect(host=host, port=port)


# 创建或获取集合
def get_or_create_collection(collection_name, dim=256):
if utility.has_collection(collection_name):
print(f"Collection '{collection_name}' already exists.")
return Collection(name=collection_name)
else:
print(f"Creating collection '{collection_name}'.")
fields = [
FieldSchema(name="item_code", dtype=DataType.VARCHAR, is_primary=True, auto_id=False, max_length=100),
FieldSchema(name="blip_embedding", dtype=DataType.FLOAT_VECTOR, dim=dim)
]
schema = CollectionSchema(fields, "item_blip向量")
return Collection(name=collection_name, schema=schema)


# 创建索引
def create_index_if_not_exists(collection, field_name="blip_embedding"):
# 检查是否已经存在索引
if not collection.has_index():
index_params = {
"index_type": "IVF_FLAT", # 选择合适的索引类型
"metric_type": "IP", # 选择合适的距离度量方式
"params": {"nlist": 1414} # nlist 是一个影响索引性能的参数,需根据数据量调整
}
print(f"Creating index on '{field_name}'...")
Index(collection, field_name, index_params)
print("Index has created.")
else:
print("Index already exists. No need to create a new one.")


# 重建索引
def recreate_index(collection, field_name="blip_embedding"):
# 尝试释放集合,如果集合未加载则会捕获异常
try:
print("Releasing the collection before dropping the index...")
collection.release()
except Exception as e:
print("Collection is not loaded, proceeding to drop index.")

# 删除现有索引
if collection.has_index():
print("Dropping existing index...")
collection.drop_index()
print("Index dropped.")

# 创建新的索引
index_params = {
"index_type": "IVF_FLAT", # 选择合适的索引类型
"metric_type": "IP", # 选择合适的距离度量方式
"params": {"nlist": 1414} # nlist 是一个影响索引性能的参数,需根据数据量调整
}
print(f"Creating new index on '{field_name}'...")
Index(collection, field_name, index_params)
print("New index created.")


# 检查索引
def check_index_status(collection):
index_info = collection.index()
print("Index info:", index_info)


# 批量插入数据
def batch_insert(collection, item_codes, embeddings):
entities = [item_codes, embeddings]
try:
insert_result = collection.insert(entities)
print('Insert result: ', insert_result)
except Exception as e:
print('Error during insert:', e)


# 检查商品是否存在
def item_code_exists(collection, item_code):
expr = f'item_code == "{item_code}"'
try:
results = collection.query(expr=expr, output_fields=["item_code"])
return len(results) > 0
except Exception as e:
print(f"Error checking item code existence: {e}")
return False


# 删除商品
def delete_item(collection, item_code):
expr = f'item_code == "{item_code}"'
try:
collection.delete(expr)
print(f"Deleted item code: {item_code}")
except Exception as e:
print(f"Error deleting item code: {e}")


# 主函数示例
def write2milvus(collection_blip):
# 从文件中读取数据并批量插入
with open('youshi_ic_embedding_all.txt', 'r', encoding='utf-8') as r_file:
item_codes = []
embeddings = []
batch_size = 1024
for index, line in enumerate(r_file):
if index % 1000 == 0:
print('商品写入的个数:', index+1)
item_code, emb = line.strip().split('\t')
emb = json.loads('[' + emb + ']')
item_codes.append(item_code)
embeddings.append(emb)
if len(embeddings) == batch_size:
batch_insert(collection_blip, item_codes, embeddings)
item_codes = []
embeddings = []
if item_codes:
batch_insert(collection_blip, item_codes, embeddings)


# 删除集合
def drop_collection(collection_name):
try:
collection = Collection(name=collection_name)
collection.drop()
print(f"Collection '{collection_name}' has been dropped.")
except Exception as e:
print(f"Error dropping collection '{collection_name}': {e}")


# 进行相似性搜索
def search_similar(collection, item_vec, limit=50):
search_params = {
"metric_type": "IP",
"params": {"nprobe": 128},
}
collection.load()
try:
result = collection.search([item_vec], "blip_embedding", search_params, limit=limit, output_fields=["item_code"])
item_codes = [hit.entity.get('item_code') for hits in result for hit in hits]
return item_codes
except Exception as e:
print(f"Error during search: {e}")
return []


def get_search_similar_all(collection, fin):
with open(fin, 'r', encoding='utf-8') as r_file, open('youshi_ic_similar_rs.txt', 'w', encoding='utf-8') as out:
for index, line in enumerate(r_file):
if index % 1000 == 0:
print('完成商品相似品计算的个数:', index+1)
item_code, emb = line.strip().split('\t')
emb = json.loads('[' + emb + ']')
result = search_similar(collection, emb, limit=10)
if result:
out.write("{}\t{}\n".format(item_code, '#'.join(result)))


if __name__ == "__main__":
# 连接到 Milvus
connect_milvus()
# 创建或获取集合
collection_name = 'youshi_item_blip_vec'
collection = get_or_create_collection(collection_name)
# write2milvus(collection)
recreate_index(collection)
create_index_if_not_exists(collection)
fin = './test.txt'
get_search_similar_all(collection, fin)

标签:index,code,name,collection,item,print,操作,milvus
From: https://www.cnblogs.com/qiaoqifa/p/18418761

相关文章

  • Windows/Linux操作用户权限常用命令
    环境:centos7.5(主要),win7Linux/Centos(权限篇)一、概述Linux操作系统,设计用于支持多用户和处理多任务的服务器环境,实施了一套严密的权限控制系统。这一系统主要通过两个核心要素——用户身份和文件权限——来管理和限制对资源的访问。在Linux中,资源的访问权限是基于用户身份来控制......
  • Redis 字典的哈希函数和 rehash 操作详解
    Redis字典的哈希函数和rehash操作详解在Redis中,字典(HashTable)是一种重要的数据结构,用于存储键值对。下面解释Redis字典的哈希函数和rehash操作。一、哈希函数作用Redis的字典使用哈希函数将键转换为一个整数索引,这个索引用于确定键值对在哈希表中的存储位......
  • 有哪些方法可以减少脏页标记技术中的磁盘 I/O 操作?
    减少脏页标记技术中磁盘I/O操作的方法一、引言在数据库系统中,脏页标记技术用于跟踪被修改但尚未写入磁盘的数据页。然而,频繁的磁盘I/O操作会严重影响数据库的性能。因此,寻找有效的方法来减少脏页标记技术中的磁盘I/O操作至关重要。二、优化脏页标记策略(一)延迟标记......
  • linux 操作系统下的 depmod 命令介绍和使用案例
    linux操作系统下的depmod命令介绍和使用案例depmod命令在Linux操作系统中用于生成内核模块的依赖关系和相关的映射文件。它分析内核模块并创建一个依赖列表,以确保在使用modprobe加载和卸载模块时,所有必需的模块都能得到正确处理。depmod命令介绍功能生成依赖列表:depmod......
  • http到https需要那几部操作?
    将HTTP网站升级到HTTPS需获取SSL证书并完成相关配置。获取SSL证书选择证书类型:根据需求选择单域名、多域名或通配符证书。购买证书:从受信任的证书服务商处购买合适的SSL证书。永久免费SSL证书_永久免费https证书_永久免费ssl证书申请-JoySSL真正完全且永久免费!不用您花一......
  • 仿冒国家单位口吻发短信,飞猪处理客诉的操作惊到我了……
    离谱的事这两年可以说,让大家见识了不少。但小柴突然发现,为了赚钱,离谱这件事是没有上限的,他总会在你意想不到的地方敲打一下你的认知。比如昨天,小柴在群里刷到这么一个讨论,说某公司被用户投诉了,然后这家公司是怎么处理投诉的呢?它竟然模仿市监局的口吻,给用户发了条短信,称该投诉依法终......
  • Python高手之路:揭秘列表的高级操作技巧
    引言列表的高级操作不仅能够提升代码的可读性和执行效率,还能让我们的程序更加灵活多变。无论是在日常开发还是数据分析任务中,掌握这些技巧都将使你如虎添翼。接下来,让我们从最基础的概念出发,一步步深入了解列表的高级操作吧!基础语法介绍首先,我们需要明确几个核心概念:列表推导......
  • C#如何使用SQLSugar进行数据库操作
    在现代应用程序中,数据库操作是不可或缺的组成部分。SQLSugar是一个轻量级的ORM(对象关系映射)框架,能够帮助开发者以简单的方式进行数据库交互。本文将介绍如何在C#中使用SQLSugar进行数据库操作。一、什么是SQLSugar?SQLSugar是一个高性能、易于使用的ORM框架,支持多种数据库,包括......
  • 操作系统:进程间通信方式详解(下:消息队列、信号量、共享内存、套接字)
    每日一问:操作系统:进程间通信方式详解(下:消息队列、信号量、共享内存、套接字)进程间通信(Inter-ProcessCommunication,IPC)是操作系统中实现不同进程之间数据交换和协作的关键机制。本文详细介绍了几种常用的IPC方式,包括消息队列、信号量、共享内存和套接字。每种通信方式都......
  • 【Java】若依(ruoyi)——7.代码生成(二)细节操作
    之前我们已经学习了代码生成的基础使用:https://www.cnblogs.com/luyj00436/p/18398248。即创建数据库并根据三种数据结构生成代码。1.基本信息和生成信息 前缀可以在配置表设置默认配置。单应用在resources目录下的application.yml,多模块ruoyi-generator中的resources目录下......