我有一个具有以下架构的 pyspark df:
root
|-- array_bytes: binary (nullable = true)
我希望能够将其转换为图像数组。我可以使用以下代码在 Pandas 中完成此操作:
df_pandas = df.toPandas()
def bytes_to_array(byte_data):
arr = np.frombuffer(byte_data, dtype=np.uint8)
return arr.reshape((224, 224, 3))
df_pandas['image_array'] = df_pandas['array_bytes'].apply(bytes_to_array)
我似乎找不到在 PySpark 中执行此操作的方法。这是我尝试过的:
def convert_binary_to_array(binary_data: bytes) -> np.ndarray:
arr = np.frombuffer(binary_data, dtype=np.uint8)
return arr.reshape((224, 224, 3))
def convert_binary_in_df(df, binary_column: str = 'binary_data'):
def convert_binary_udf(byte_data):
return convert_binary_to_array(byte_data).tolist()
# register and apply udf
convert_binary_spark_udf = udf(convert_binary_udf, ArrayType(ArrayType(IntegerType())))
df_output = df.withColumn("image_data", convert_binary_spark_udf(binary_column))
return df_output
df_converted = convert_binary_in_df(df, binary_column='array_bytes')
但是,
image_data
最终只是充满了空值。我对 PySpark 的了解不是很强,无法找出问题所在。预先感谢您的任何帮助。
遇到的问题与 PySpark UDF 中 NumPy 数组的处理方式有关。PySpark UDF 无法直接返回 NumPy 数组,因为它们不是 Spark 原生支持的数据类型。
为了解决这个问题,需要将 NumPy 数组转换为 Spark 可以理解的类型,例如列表或
DenseVector
。
以下是如何使用
DenseVector
和
VectorAssembler
将二进制图像数据转换为 RGB 数组的改进 PySpark 代码:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
def convert_binary_to_array(binary_data: bytes) -> Vectors.dense:
arr = np.frombuffer(binary_data, dtype=np.uint8)
return Vectors.dense(arr.reshape((224, 224, 3)).flatten().tolist())
convert_binary_udf = udf(convert_binary_to_array, VectorUDT())
df_converted = df.withColumn("image_vector", convert_binary_udf("array_bytes"))
# 如果需要一个单独的列来表示每个颜色通道:
assembler = VectorAssembler(inputCols=["image_vector"], outputCol="image_array")
df_converted = assembler.transform(df_converted).select("array_bytes", "image_array")
代码解释:
-
convert_binary_to_array
函数:- 它接收二进制数据作为输入。
-
使用
np.frombuffer
将二进制数据转换为 NumPy 数组。 -
使用
reshape
将数组重塑为 (224, 224, 3) 的形状。 -
关键更改:
将重塑后的数组展平,转换为 Python 列表,并使用
Vectors.dense
创建一个DenseVector
对象。
-
convert_binary_udf
UDF:-
使用
udf
函数将convert_binary_to_array
函数注册为 PySpark UDF,并指定返回类型为VectorUDT()
。
-
使用
-
df.withColumn
:-
使用
convert_binary_udf
UDF 创建一个名为image_vector
的新列,其中包含转换后的图像数据作为DenseVector
。
-
使用
-
VectorAssembler
(可选):-
如果需要将
DenseVector
转换为单独的列,请使用VectorAssembler
将image_vector
列转换为名为image_array
的新列,其中包含所有颜色通道值。
-
如果需要将
注意:
-
确保已将
numpy
导入为np
。 - 此代码假设的图像数据确实采用 224x224x3 的 RGB 格式。如果格式不同,则需要相应地调整代码。