首页 > 其他分享 >第11篇:Milvus高效数据导入与预处理:从理论到实践

第11篇:Milvus高效数据导入与预处理:从理论到实践

时间:2024-06-12 23:57:42浏览次数:27  
标签:11 java 示例 导入 new import 数据 预处理 Milvus

欢迎来到Milvus高效数据导入与预处理的世界!在本文,我将带你深入了解Milvus的批量数据导入方式和数据预处理技巧。通过这篇博客,你将学会如何高效地将大规模数据导入Milvus,并对数据进行预处理,以确保高效的检索和分析。准备好了吗?让我们开始这段知识之旅吧!

文章目录

Milvus的批量数据导入

批量数据导入的方式

Milvus支持多种批量数据导入方式,以适应不同的应用场景和需求。主要方式包括:

  1. 直接插入:通过API接口一次性插入大批量数据。
  2. 文件导入:将数据保存到文件中,然后批量导入Milvus。
  3. 分布式导入:利用分布式系统并行导入大规模数据。
直接插入

直接插入方式适用于数据量较小或中等的数据集,数据可以通过API一次性传输到Milvus。

文件导入

文件导入方式适用于数据量较大、需要批量处理的数据集。数据可以先保存到文件(如CSV、JSON)中,然后通过脚本或工具批量导入Milvus。

分布式导入

分布式导入方式适用于超大规模数据集,通过分布式系统并行处理和导入数据,提高导入效率。

批量数据导入方式 直接插入 文件导入 分布式导入 小规模数据集 大规模数据集 超大规模数据集

数据预处理技巧

数据预处理是数据导入过程中的重要环节,通过合理的数据预处理,可以显著提高数据导入和检索的效率。主要预处理技巧包括:

  1. 数据清洗:去除无效数据、修复缺失值等。
  2. 数据标准化:将数据转换到统一的尺度或分布。
  3. 数据降维:使用PCA、t-SNE等方法降低数据维度。
  4. 特征提取:从原始数据中提取重要特征。
数据预处理技巧 数据清洗 数据标准化 数据降维 特征提取

数据清洗

数据清洗适用于数据质量较差、存在大量缺失值或噪声的数据集。

数据标准化

数据标准化适用于不同尺度或分布的数据集,通过标准化处理可以提高算法的性能和稳定性。

数据降维

数据降维适用于高维数据集,通过降维可以减少数据的冗余,提高计算效率。

特征提取

特征提取适用于复杂的原始数据集,通过提取重要特征,可以提高数据的可解释性和检索性能。

批量数据导入的注意点

注意点

  1. 数据格式:确保导入的数据格式与Milvus兼容,如向量数据应为浮点数。
  2. 数据量控制:根据系统资源合理控制单次导入的数据量,避免资源耗尽。
  3. 错误处理:设置错误处理机制,保证导入过程的稳定性和可靠性。

批量数据导入示例

直接插入示例
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.util.Arrays;
import java.util.List;

public class MilvusBatchInsertExample {
    public static void main(String[] args) {
        // 连接到Milvus服务器
        MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                .withHost("localhost")  // Milvus服务器地址
                .withPort(19530)  // Milvus服务器端口
                .build());

        // 准备插入的数据
        List<Long> idList = Arrays.asList(1L, 2L, 3L);
        List<List<Float>> vectorList = Arrays.asList(
                Arrays.asList(0.1f, 0.2f, 0.3f, 0.4f),
                Arrays.asList(0.5f, 0.6f, 0.7f, 0.8f),
                Arrays.asList(0.9f, 1.0f, 1.1f, 1.2f)
        );

        // 创建插入参数
        InsertParam insertParam = InsertParam.newBuilder()
                .withCollectionName("example_collection")  // 指定集合名称
                .withFields(Arrays.asList(
                        InsertParam.Field.newBuilder()
                                .withName("id")  // 字段名称
                                .withValues(idList)  // 字段值
                                .build(),
                        InsertParam.Field.newBuilder()
                                .withName("vector")  // 字段名称
                                .withValues(vectorList)  // 字段值
                                .build()
                ))
                .build();

        // 执行数据插入
        MutationResult insertResult = client.insert(insertParam);
        System.out.println("Data inserted successfully!");
    }
}

文件导入示例

假设我们有一个CSV文件data.csv,内容如下:

id,vector
1,0.1 0.2 0.3 0.4
2,0.5 0.6 0.7 0.8
3,0.9 1.0 1.1 1.2

可以使用以下Java代码读取CSV文件并导入数据:

import com.opencsv.CSVReader;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.io.FileReader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class MilvusCSVImportExample {
    public static void main(String[] args) {
        try {
            // 连接到Milvus服务器
            MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                    .withHost("localhost")  // Milvus服务器地址
                    .withPort(19530)  // Milvus服务器端口
                    .build());

            // 读取CSV文件
            CSVReader reader = new CSVReader(new FileReader("data.csv"));
            String[] nextLine;
            List<Long> idList = new ArrayList<>();
            List<List<Float>> vectorList = new ArrayList<>();
            reader.readNext(); // 跳过头行
            while ((nextLine = reader.readNext()) != null) {
                idList.add(Long.parseLong(nextLine[0]));
                List<Float> vector = new ArrayList<>();
                for (String value : nextLine[1].split(" ")) {
                    vector.add(Float.parseFloat(value));
                }
                vectorList.add(vector);
            }
            reader.close();

            // 创建插入参数
            InsertParam insertParam = InsertParam.newBuilder()
                    .withCollectionName("example_collection")  // 指定集合名称
                    .withFields(Arrays.asList(
                            InsertParam.Field.newBuilder()
                                    .withName("id")  // 字段名称
                                    .withValues(idList)  // 字段值
                                    .build(),
                            InsertParam.Field.newBuilder()
                                    .withName("vector")  // 字段名称
                                    .withValues(vectorList)  // 字段值
                                    .build()
                    ))
                    .build();

            // 执行数据插入
            MutationResult insertResult = client.insert(insertParam);
            System.out.println("Data imported successfully from CSV file!");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

分布式导入示例

假设我们使用Apache Spark进行分布式数据处理和导入:

首先,引入依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency>
    <groupId>io.milvus</groupId>
    <artifactId>milvus-sdk-java</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>com.opencsv</groupId>
    <artifactId>opencsv</artifactId>
    <version>5.5.2</version>
</dependency>

然后,使用以下Java代码进行分布式

数据处理和导入:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import io.milvus.client.MilvusClient;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.ConnectParam;
import io.milvus.param.dml.InsertParam;
import io.milvus.grpc.MutationResult;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Iterator;

public class MilvusSparkImportExample {
    public static void main(String[] args) {
        // 创建SparkSession
        SparkSession spark = SparkSession.builder()
                .appName("MilvusBatchInsert")
                .master("local[*]") // 本地模式,适用于测试
                .getOrCreate();

        // 读取CSV文件
        Dataset<Row> df = spark.read().format("csv")
                .option("header", "true")
                .load("data.csv");

        // 分布式处理和导入
        df.foreachPartition((ForeachPartitionFunction<Row>) partition -> {
            // 连接到Milvus服务器
            MilvusClient client = new MilvusServiceClient(ConnectParam.newBuilder()
                    .withHost("localhost")  // Milvus服务器地址
                    .withPort(19530)  // Milvus服务器端口
                    .build());

            List<Long> idList = new ArrayList<>();
            List<List<Float>> vectorList = new ArrayList<>();

            // 处理分区数据
            while (partition.hasNext()) {
                Row row = partition.next();
                idList.add(Long.parseLong(row.getString(0)));
                List<Float> vector = new ArrayList<>();
                for (String value : row.getString(1).split(" ")) {
                    vector.add(Float.parseFloat(value));
                }
                vectorList.add(vector);
            }

            // 创建插入参数
            InsertParam insertParam = InsertParam.newBuilder()
                    .withCollectionName("example_collection")  // 指定集合名称
                    .withFields(Arrays.asList(
                            InsertParam.Field.newBuilder()
                                    .withName("id")  // 字段名称
                                    .withValues(idList)  // 字段值
                                    .build(),
                            InsertParam.Field.newBuilder()
                                    .withName("vector")  // 字段名称
                                    .withValues(vectorList)  // 字段值
                                    .build()
                    ))
                    .build();

            // 执行数据插入
            MutationResult insertResult = client.insert(insertParam);
            System.out.println("Data imported successfully from Spark partition!");

            client.close();
        });

        spark.stop();
    }
}

数据预处理技巧

数据清洗示例

数据清洗是数据预处理的重要步骤,通过去除无效数据和修复缺失值,可以提高数据质量和系统性能。以下是Java实现的数据清洗示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;

public class DataCleaningExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            // 去除无效数据和修复缺失值
            List<String> cleanedData = new ArrayList<>();
            cleanedData.add(lines.get(0)); // 保留头行
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                if (parts.length == 2 && !parts[1].isEmpty()) {
                    cleanedData.add(line);
                }
            }

            // 保存清洗后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("cleaned_data.csv"));
            for (String line : cleanedData) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Data cleaned successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

数据标准化示例

数据标准化是将数据转换到统一的尺度或分布,有助于提高算法的性能和稳定性。以下是Java实现的数据标准化示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;

public class DataStandardizationExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("cleaned_data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            List<List<Double>> vectors = new ArrayList<>();
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                List<Double> vector = Arrays.stream(parts[1].split(" "))
                        .map(Double::parseDouble)
                        .collect(Collectors.toList());
                vectors.add(vector);
            }

            // 计算均值和标准差
            int dim = vectors.get(0).size();
            double[] means = new double[dim];
            double[] stds = new double[dim];
            for (int i = 0; i < dim; i++) {
                DescriptiveStatistics stats = new DescriptiveStatistics();
                for (List<Double> vector : vectors) {
                    stats.addValue(vector.get(i));
                }
                means[i] = stats.getMean();
                stds[i] = stats.getStandardDeviation();
            }

            // 标准化数据
            List<String> standardizedData = new ArrayList<>();
            standardizedData.add(lines.get(0)); // 保留头行
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                List<Double> vector = Arrays.stream(parts[1].split(" "))
                        .map(Double::parseDouble)
                        .collect(Collectors.toList());
                for (int i = 0; i < dim; i++) {
                    vector.set(i, (vector.get(i) - means[i]) / stds[i]);
                }
                standardizedData.add(parts[0] + "," + vector.stream()
                        .map(String::valueOf)
                        .collect(Collectors.joining(" ")));
            }

            // 保存标准化后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("standardized_data.csv"));
            for (String line : standardizedData) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Data standardized successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

数据降维示例

数据降维是减少数据维度的一种技术,通过PCA等方法,可以降低数据的冗余,提高计算效率。以下是Java实现的数据降维示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.linear.*;

public class DataDimensionalityReductionExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("standardized_data.csv"))
                    .lines()
                    .collect(Collectors.toList());

            // 转换为矩阵
            List<double[]> vectors = new ArrayList<>();
            for (String line : lines.subList(1, lines.size())) {
                String[] parts = line.split(",");
                double[] vector = Arrays.stream(parts[1].split(" "))
                        .mapToDouble(Double::parseDouble)
                        .toArray();
                vectors.add(vector);
            }
            RealMatrix matrix = new Array2DRowRealMatrix(vectors.toArray(new double[0][0]));

            // 计算协方差矩阵
            RealMatrix covarianceMatrix = new Covariance(matrix).getCovarianceMatrix();

            // 计算特征值和特征向量
            EigenDecomposition ed = new EigenDecomposition(covarianceMatrix);
            RealMatrix eigenVectors = ed.getV();

            // 选择前两个主成分
            RealMatrix pcaMatrix = matrix.multiply(eigenVectors.getSubMatrix(0, eigenVectors.getRowDimension() - 1, 0, 1));

            // 保存降维后的数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("reduced_data.csv"));
            writer.write(lines.get(0)); // 保留头行
            writer.newLine();
            for (int i = 0; i < pcaMatrix.getRowDimension(); i++) {
                writer.write(i + 1 + "," + Arrays.stream(pcaMatrix.getRow(i))
                        .mapToObj(String::valueOf)
                        .collect(Collectors.joining(" ")));
                writer.newLine();
            }
            writer.close();

            System.out.println("Data reduced successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

特征提取示例

特征提取是从原始数据中提取重要特征的一种技术,通过特征提取,可以提高数据的可解释性和检索性能。以下是Java实现的特征提取示例:

import java.io.*;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.math3.linear.*;

public class DataFeatureExtractionExample {
    public static void main(String[] args) {
        try {
            // 读取数据文件
            List<String> lines = new BufferedReader(new FileReader("reduced_data.csv"))
                    .lines()


                    .collect(Collectors.toList());

            // 假设我们已经知道哪些特征是重要的(例如通过领域知识或特征选择算法确定)
            // 这里只是简单示例,选择前两个特征
            List<String> extractedFeatures = lines.stream()
                    .map(line -> {
                        String[] parts = line.split(",");
                        return parts[0] + "," + parts[1] + " " + parts[2];
                    })
                    .collect(Collectors.toList());

            // 保存提取后的特征数据
            BufferedWriter writer = new BufferedWriter(new FileWriter("extracted_features.csv"));
            for (String line : extractedFeatures) {
                writer.write(line);
                writer.newLine();
            }
            writer.close();

            System.out.println("Features extracted successfully!");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

完整的 pom.xml 文件依赖包

以下是一个完整的 pom.xml 文件示例,包含上述所有依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>milvus-data-import</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!-- Milvus SDK -->
        <dependency>
            <groupId>io.milvus</groupId>
            <artifactId>milvus-sdk-java</artifactId>
            <version>2.0.0</version>
        </dependency>

        <!-- OpenCSV -->
        <dependency>
            <groupId>com.opencsv</groupId>
            <artifactId>opencsv</artifactId>
            <version>5.5.2</version>
        </dependency>

        <!-- Apache Commons Math -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.6.1</version>
        </dependency>

        <!-- Apache Spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>
</project>

其他注意事项

  1. 确保网络连接:下载这些依赖包需要网络连接,请确保Maven能够连接到中央仓库或配置了镜像。
  2. 环境配置:确保本地环境已配置好Java开发环境(JDK 8或更高版本)。
  3. 数据文件准备:确保本地有准备好的CSV文件(如data.csv)用于测试数据导入。

总结

通过这篇博客,我们详细介绍了Milvus的批量数据导入方式和数据预处理技巧。我们探讨了批量数据导入的多种方式,包括直接插入、文件导入和分布式导入,并详细讲解了数据预处理的各种技巧,如数据清洗、数据标准化、数据降维和特征提取。通过具体的Java代码示例,我们展示了如何在实际应用中实现这些技巧。

如果你喜欢这篇文章,别忘了收藏文章、关注作者、订阅专栏,感激不尽。

标签:11,java,示例,导入,new,import,数据,预处理,Milvus
From: https://blog.csdn.net/wjm1991/article/details/139639370

相关文章

  • python-11-def函数 好比是sop 调用函数可以让程序听令
    学习内容:《python编程:从入门到实践》第二版知识点:定义函数、调用函数、形参、实参练习内容:练习8-1:消息编写一个名为display_message()的函数,它打印一个句子,指出你在本章学的是什么。调用这个函数,确认显示的消息正确无误。练习8-2:喜欢的图书编写一个名为favorite_book()......
  • C++ 新特性 | C++ 11 | typename关键字
    文章目录一、typename关键字前言:在C++的模板编程中,typename关键字扮演着至关重要的角色。它主要用于指示编译器将一个特定的标识符解释为类型名称,而不是变量名或其他实体。本文将深入探讨typename的用法,帮助读者更好地理解其在模板编程中的作用。一、typename关......
  • 110.网络游戏逆向分析与漏洞攻防-装备系统数据分析-装备与技能描述信息的处理
    免责声明:内容仅供学习参考,请合法利用知识,禁止进行违法犯罪活动!如果看不懂、不知道现在做的什么,那就跟着做完看效果,代码看不懂是正常的,只要会抄就行,抄着抄着就能懂了内容参考于:易道云信息技术研究院上一个内容:109.商店与捨取窗口数据的处理码云版本号:4275a0966772e3fd4941ee......
  • 1188 有多少零-PAT乙级真题(2024夏季B-3)-极简代码-C++
    B-3有多少零给定 n 个正整数,请你数数它们的乘积的末尾有多少个零。例如26、225、48的乘积是280800,末尾有2个零。输入格式:输入给出一个不超过 10^6 的正整数 n,下一行给出 n 个不超过 10^6 的正整数。输出格式:在一行中输出给定的 n 个正整数的乘积末尾零的......
  • 8.11 矢量图层线要素单一符号使用七(爆炸线)
    文章目录前言爆炸线(Lineburst)QGis设置线符号为爆炸线(Lineburst)二次开发代码实现爆炸线(Lineburst)总结前言本章介绍矢量图层线要素单一符号中爆炸线(Lineburst)的使用说明:文章中的示例代码均来自开源项目qgis_cpp_api_apps爆炸线(Lineburst)沿着一条线垂直绘制渐变图案......
  • 11_1、多态性:概念及运算符重载
    多态性多态的概念和类型多态的类型多态的实现运算符重载运算符重载的概念和规则概念规则运算符重载为类的成员函数双目运算符单目运算符运算符重载为类的友元函数双目运算符重载单目运算符重载多态的概念和类型消息:消息在C++编程中指的是对类的成员函数的调用。......
  • dark1130_theme.xml sourceinsight 主题
    <ThemeList> <Theme Name="dark1130" > <DisplayColors> <ItemName="DefaultText"Color="#e0e0e0"/> <ItemName="WindowBackground"Color="#001515"/> <ItemName......
  • Arria 10 GX现场可编程门阵列10AX115N1F40I1SG、10AX115R2F40I2SG、10AX115R2F40I1SG
    Arria®10器件系列包括高性能,低功耗的20nm中端FPGA和SoC。Arria®10器件系列实现了:比上一代中高端FPGA更高的性能。通过一套综合节能技术来降低功耗。Arria®10器件专为各领域中高性能、功耗敏感的中端应用而设计。与竞争对手相比,利用公开的OpenCore设计,Arria®10F......
  • 【CMake系列】11-CMake Pack
    cmakepack用于将我们的写好的项目打包,发送给使用方;打包后产生的内容有源代码包二进制包平台原生的二进制安装Debian->.debredhat->.rpmmacOS->.dmgwindows->NSIS本专栏的实践代码全部放在github上,欢迎star!!!如有问题,欢迎留言、......
  • C51学习归纳11 --- PWM原理、应用案例
        本节进入到一个更加常用的环节PWM的产生与应用,日常生活中,电机的使用非常普及,如何控制转速?其实就可以应用我的PWM。一、PWM的产生原理    PWM(PulseWidthModulation)即脉冲宽度调制,在具有惯性的系统中,可以通过对一系列脉冲的宽度进行调制,来等效地获得所需......