首页 > 编程问答 >如何在 PySpark 中将二进制图像数据转换为 RGB 数组?

如何在 PySpark 中将二进制图像数据转换为 RGB 数组?

时间:2024-08-01 06:59:36浏览次数:12  
标签:python image pyspark binary byte

我有一个具有以下架构的 pyspark df:

root
 |-- array_bytes: binary (nullable = true)

我希望能够将其转换为图像数组。我可以使用以下代码在 Pandas 中完成此操作:

df_pandas = df.toPandas()
def bytes_to_array(byte_data):
    arr = np.frombuffer(byte_data, dtype=np.uint8)
    return arr.reshape((224, 224, 3)) 

df_pandas['image_array'] = df_pandas['array_bytes'].apply(bytes_to_array)

我似乎找不到在 PySpark 中执行此操作的方法。这是我尝试过的:

def convert_binary_to_array(binary_data: bytes) -> np.ndarray:
    arr = np.frombuffer(binary_data, dtype=np.uint8)
    return arr.reshape((224, 224, 3))


def convert_binary_in_df(df, binary_column: str = 'binary_data'):
    def convert_binary_udf(byte_data):
        return convert_binary_to_array(byte_data).tolist()
    
    # register and apply udf
    convert_binary_spark_udf = udf(convert_binary_udf, ArrayType(ArrayType(IntegerType())))
    df_output = df.withColumn("image_data", convert_binary_spark_udf(binary_column))
    return df_output

df_converted = convert_binary_in_df(df, binary_column='array_bytes')

但是, image_data 最终只是充满了空值。我对 PySpark 的了解不是很强,无法找出问题所在。预先感谢您的任何帮助。


遇到的问题与 PySpark UDF 中 NumPy 数组的处理方式有关。PySpark UDF 无法直接返回 NumPy 数组,因为它们不是 Spark 原生支持的数据类型。

为了解决这个问题,需要将 NumPy 数组转换为 Spark 可以理解的类型,例如列表或 DenseVector

以下是如何使用 DenseVector VectorAssembler 将二进制图像数据转换为 RGB 数组的改进 PySpark 代码:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler

def convert_binary_to_array(binary_data: bytes) -> Vectors.dense:
    arr = np.frombuffer(binary_data, dtype=np.uint8)
    return Vectors.dense(arr.reshape((224, 224, 3)).flatten().tolist()) 

convert_binary_udf = udf(convert_binary_to_array, VectorUDT())

df_converted = df.withColumn("image_vector", convert_binary_udf("array_bytes"))

# 如果需要一个单独的列来表示每个颜色通道:
assembler = VectorAssembler(inputCols=["image_vector"], outputCol="image_array")
df_converted = assembler.transform(df_converted).select("array_bytes", "image_array")

代码解释:

  1. convert_binary_to_array 函数:

    • 它接收二进制数据作为输入。
    • 使用 np.frombuffer 将二进制数据转换为 NumPy 数组。
    • 使用 reshape 将数组重塑为 (224, 224, 3) 的形状。
    • 关键更改: 将重塑后的数组展平,转换为 Python 列表,并使用 Vectors.dense 创建一个 DenseVector 对象。
  2. convert_binary_udf UDF:

    • 使用 udf 函数将 convert_binary_to_array 函数注册为 PySpark UDF,并指定返回类型为 VectorUDT()
  3. df.withColumn

    • 使用 convert_binary_udf UDF 创建一个名为 image_vector 的新列,其中包含转换后的图像数据作为 DenseVector
  4. VectorAssembler (可选):

    • 如果需要将 DenseVector 转换为单独的列,请使用 VectorAssembler image_vector 列转换为名为 image_array 的新列,其中包含所有颜色通道值。

注意:

  • 确保已将 numpy 导入为 np
  • 此代码假设的图像数据确实采用 224x224x3 的 RGB 格式。如果格式不同,则需要相应地调整代码。

标签:python,image,pyspark,binary,byte
From: 78818568

相关文章

  • 将 HTTP 分块编码数据流代码片段从 Node.js 转换为 Python
    我有一个Node.js客户端代码,它将请求发送到HTTP服务器,然后连续接收分块编码数据。这是带有一些流量数据输出的Node.js代码。consthttp=require('http');constoptions={hostname:'...',path:'...',port:...,...};constreq=http.request(......
  • Google Translate API 客户端没有“Translate_image”属性
    importosfromgoogle.cloudimporttranslate_v3beta1astranslatedefupload_and_translate(input_dir,output_dir,target_language):"""Uploadsallimagesinadirectory,translatesthemusingGoogleTranslate,anddownloadsthetrans......
  • vsc python 调试器和 pylance 无法识别已安装的包
    我最近使用snowflake-connector-python在我的虚拟环境中安装了pipinstallsnowflake-connector-python[pandas]==2.7.6,当我在激活虚拟环境的情况下从命令行运行我的脚本时,它工作正常。我设置了与VSC解释器相同的虚拟环境,但尝试运行python调试器会引发异常......
  • 如何从python读取matlab持续时间对象
    我创建一个matlab持续时间对象并将其保存到.mat文件:timeend=seconds(123);save('time.mat',timeend,'-v7.3');然后我从python读取它:withh5py.File('time.mat','r')asf:var=f['timeend'][:]print(list(var))......
  • 通过 python 连接到 Snowflake 时出错“UnpicklingError: invalid load key, '\x00'
    我在使用snowflake.connector.connect通过python连接到snowflake时遇到以下错误importsnowflake.connector#pipinstallsnowflake-connector-python#iamgettingtheenvfrom.envfileistoredlocallycnx=snowflake.connector.connect(user=os.getenv('USER'),pass......
  • Python Selenium 单击 webdriverwait 与 find_element
    我无法理解这两个代码块之间的区别。发送点击在webdriverwait和find_elements中都有效。代码1fromseleniumimportwebdriverfromselenium.webdriver.common.byimportByfromselenium.webdriver.support.uiimportWebDriverWaitfromselenium.webdriver.suppo......
  • Python 问题 如何创建在 PDF 中注册为剪切线的专色?
    我正在开发一个项目,需要我在图像周围创建一条剪切线,但在任何RIP程序(例如Versaworks或Flexi)上将其注册为实际剪切线时遇到困难。我尝试了很多不同的方法python库可以帮助解决这个问题,但我无法让它工作。我希望它像我们在Illustrator中所做的那样,创建一条名为CutConto......
  • 使用Python时如何避免`setattr`(和`getattr`)?以及是否有必要避免
    如果我想向协议缓冲区中的字段添加一个在编译时未知的值,我目前正在做setattr我通常不喜欢使用setattr,因为它看起来不太安全。但是当我知道该对象是protobuf时,我认为这很好,因为我设置它的值必须是protobuf允许的类型。所以也许它并不是真的不安全?让我举......
  • Java sshtools 生成的 EDDSA 签名与 Python 的 pycryptome 生成的签名不匹配
    我有一个python库,它使用pycryptodomelibrary使用openssh格式的ED25519私钥使用Ed25519算法对数据进行签名。然后需要使用sshtools库和相应的公钥在Java应用程序中验证签名。但是签名验证失败。约束:从文件中读取私钥/公钥很重要。我无法......
  • Elastic python请求超时错误:池达到最大大小,不允许更多连接
    我正在使用Elasticsearchpython模块。我正在尝试像这样建立到服务器的连接es=Elasticsearch([config.endpoint],api_key=config.key,request_timeout=config.request_timeout)服务器连接,然后我尝试执行丰富策略。es.enr......