我已经使用 Ultralytics Python 包成功训练了 YOLOv8 模型,现在的目标是对存储在 S3 存储桶中的 1 亿张图像进行推理。目前,我有一个具有 GPU 加速功能的 Databricks 笔记本,可以执行推理,但我不知道如何扩展它。
从 Databricks 文档中,我了解到使用 Databricks Autoloader 从 S3 和 MLflow 获取图像来管理模型可以帮助扩展批量推理过程。
如何有效地扩展 Databricks 中 1 亿张图像的批量推理过程? 我应该使用 MLflow 来管理和扩展推理作业吗?
当前设置正在运行多个具有专用计算的笔记本,这似乎效率低下。
可以使用 Databricks 有效地扩展 1 亿张图像的 YOLOv8 批量推理。以下是使用 Autoloader、MLflow 和 Spark 的分步指南:
1. 数据加载和预处理:
- 使用 Databricks Autoloader 从 S3 加载图像: Autoloader 简化了从 S3 数据湖中摄取数据的过程,并可以自动检测新文件并对其进行处理。
from pyspark.sql.functions import col, lit
from databricks import autoloader
spark.conf.set("spark.databricks.cloudFiles.schemaInference.enabled", "true")
image_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "binaryFile") \
.option("cloudFiles.inferColumnTypes", "true") \
.load("s3://your-bucket/images/") \
.withColumn("input_file_name", input_file_name()) \
.withColumn("split_index", lit(0)) # 对于分布式推理,请将图像拆分为多个分区
- 对图像进行预处理: 在传递给 YOLO 模型之前,可能需要对图像执行一些预处理步骤,例如调整大小或规范化。可以使用 Spark UDF(用户定义函数)来完成此操作,以在图像数据上执行分布式预处理。
from torchvision import transforms
from pyspark.sql.functions import udf
# 定义预处理管道
preprocess = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((640, 640)), # 将图像大小调整为 YOLOv8 输入大小
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 创建一个 UDF 来预处理图像
@udf("array<float>")
def preprocess_image(image_data):
image = Image.open(io.BytesIO(image_data)).convert('RGB')
return preprocess(image).numpy().tolist()
# 将预处理步骤应用于图像数据帧
processed_image_df = image_df.withColumn("processed_image", preprocess_image(col("content")))
2. 模型加载和推理:
- 使用 MLflow 管理的 YOLOv8 模型: 将的训练模型记录到 MLflow,并从的 Databricks 群集加载它。MLflow 简化了模型版本控制、打包和部署。
import mlflow
# 将训练好的 YOLOv8 模型记录到 MLflow
logged_model = mlflow.pytorch.log_model(model, "yolo_model")
# 从 MLflow 加载模型
model_uri = logged_model.artifacts["model"]
loaded_model = mlflow.pytorch.load_model(model_uri)
- 使用 Pandas UDF 进行分布式推理: Pandas UDF(又名向量化 UDF)允许在 Spark 数据帧上高效地执行并行化推理。可以使用 Pandas UDF 将 YOLOv8 模型应用于图像数据帧的每一行。
import pandas as pd
from pyspark.sql.functions import pandas_udf
# 创建一个 Pandas UDF 来执行推理
@pandas_udf("array<array<float>>")
def predict_udf(image_batch: pd.Series) -> pd.Series:
results = []
for image_data in image_batch:
# 将图像数据转换为 YOLO 格式
image = ...
# 执行推理
predictions = loaded_model(image)
# 处理预测结果(例如,NMS)
processed_predictions = ...
results.append(processed_predictions)
return pd.Series(results)
# 在图像数据帧上执行推理
predictions_df = processed_image_df.withColumn("predictions", predict_udf(col("processed_image")))
3. 扩展和优化:
- 水平扩展的 Databricks 群集: 增加群集中工作节点的数量。这将允许并行处理更多图像并减少整体处理时间。
-
利用 GPU 加速:
在具有多个 GPU 的 GPU 加速 Databricks 群集上运行的 Spark 作业。YOLOv8 在 GPU 上运行时可以显著提高推理性能。在群集设置中选择适当的 GPU 实例类型并配置
spark.conf
以使用 GPU。 -
数据分区:
通过设置
spark.sql.shuffle.partitions
或使用 repartition()/coalesce() 对的数据进行分区以匹配的执行程序数量。适当的分区可以减少数据倾斜并提高并行度。 - 优化 UDF 性能: 尽可能减少 UDF 内的开销,并考虑使用 Apache Arrow 之类的技术来高效地在 Python 和 JVM 之间传输数据。
4. 后处理和保存预测结果:
- 对预测结果进行后处理: 可能需要对预测结果执行一些后处理步骤,例如非最大抑制 (NMS),以移除重叠的边界框。
- 将预测结果保存到持久存储: 可以将预测结果保存到 Delta Lake、S3 或选择的任何其他存储选项中。
# 将预测结果保存到 Delta Lake
predictions_df.write \
.format("delta") \
.mode("overwrite") \
.save("s3://your-bucket/predictions/")
使用 MLflow 进行作业调度和管理:
MLflow 可以通过以下方式帮助管理和扩展推理作业:
- 模型版本控制和跟踪: 轻松加载不同版本的 YOLOv8 模型并跟踪推理性能。
- 作业调度: 使用 MLflow 作业调度或 Databricks 作业调度程序安排的推理管道定期运行或在触发器上运行。
- 实验跟踪: 尝试不同的参数和配置,以优化推理管道的性能。
通过遵循这些步骤并在的 Databricks 群集中利用 Autoloader、MLflow 和 Spark 功能,可以有效地扩展 YOLOv8 批量推理,以处理 1 亿 张图像或更多图像。请记住监控性能并根据特定用例和需求调整配置。
标签:python,databricks,yolo,mlflow,yolov8 From: 78821744