首页 > 编程问答 >如何在 PySpark 中将二进制图像数据转换为 RGB 数组?

如何在 PySpark 中将二进制图像数据转换为 RGB 数组?

时间:2024-08-01 06:59:36浏览次数:17  
标签:python image pyspark binary byte

我有一个具有以下架构的 pyspark df:

root
 |-- array_bytes: binary (nullable = true)

我希望能够将其转换为图像数组。我可以使用以下代码在 Pandas 中完成此操作:

df_pandas = df.toPandas()
def bytes_to_array(byte_data):
    arr = np.frombuffer(byte_data, dtype=np.uint8)
    return arr.reshape((224, 224, 3)) 

df_pandas['image_array'] = df_pandas['array_bytes'].apply(bytes_to_array)

我似乎找不到在 PySpark 中执行此操作的方法。这是我尝试过的:

def convert_binary_to_array(binary_data: bytes) -> np.ndarray:
    arr = np.frombuffer(binary_data, dtype=np.uint8)
    return arr.reshape((224, 224, 3))


def convert_binary_in_df(df, binary_column: str = 'binary_data'):
    def convert_binary_udf(byte_data):
        return convert_binary_to_array(byte_data).tolist()
    
    # register and apply udf
    convert_binary_spark_udf = udf(convert_binary_udf, ArrayType(ArrayType(IntegerType())))
    df_output = df.withColumn("image_data", convert_binary_spark_udf(binary_column))
    return df_output

df_converted = convert_binary_in_df(df, binary_column='array_bytes')

但是, image_data 最终只是充满了空值。我对 PySpark 的了解不是很强,无法找出问题所在。预先感谢您的任何帮助。


遇到的问题与 PySpark UDF 中 NumPy 数组的处理方式有关。PySpark UDF 无法直接返回 NumPy 数组,因为它们不是 Spark 原生支持的数据类型。

为了解决这个问题,需要将 NumPy 数组转换为 Spark 可以理解的类型,例如列表或 DenseVector

以下是如何使用 DenseVector VectorAssembler 将二进制图像数据转换为 RGB 数组的改进 PySpark 代码:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler

def convert_binary_to_array(binary_data: bytes) -> Vectors.dense:
    arr = np.frombuffer(binary_data, dtype=np.uint8)
    return Vectors.dense(arr.reshape((224, 224, 3)).flatten().tolist()) 

convert_binary_udf = udf(convert_binary_to_array, VectorUDT())

df_converted = df.withColumn("image_vector", convert_binary_udf("array_bytes"))

# 如果需要一个单独的列来表示每个颜色通道:
assembler = VectorAssembler(inputCols=["image_vector"], outputCol="image_array")
df_converted = assembler.transform(df_converted).select("array_bytes", "image_array")

代码解释:

  1. convert_binary_to_array 函数:

    • 它接收二进制数据作为输入。
    • 使用 np.frombuffer 将二进制数据转换为 NumPy 数组。
    • 使用 reshape 将数组重塑为 (224, 224, 3) 的形状。
    • 关键更改: 将重塑后的数组展平,转换为 Python 列表,并使用 Vectors.dense 创建一个 DenseVector 对象。
  2. convert_binary_udf UDF:

    • 使用 udf 函数将 convert_binary_to_array 函数注册为 PySpark UDF,并指定返回类型为 VectorUDT()
  3. df.withColumn

    • 使用 convert_binary_udf UDF 创建一个名为 image_vector 的新列,其中包含转换后的图像数据作为 DenseVector
  4. VectorAssembler (可选):

    • 如果需要将 DenseVector 转换为单独的列,请使用 VectorAssembler image_vector 列转换为名为 image_array 的新列,其中包含所有颜色通道值。

注意:

  • 确保已将 numpy 导入为 np
  • 此代码假设的图像数据确实采用 224x224x3 的 RGB 格式。如果格式不同,则需要相应地调整代码。

标签:python,image,pyspark,binary,byte
From: 78818568

相关文章