首页 > 编程问答 >Spark EOF 错误(从 S3 读取 Parquet)- Spark 到 Pandas 的转换

Spark EOF 错误(从 S3 读取 Parquet)- Spark 到 Pandas 的转换

时间:2024-07-25 08:02:54浏览次数:12  
标签:python pandas apache-spark amazon-s3 pyspark

我正在将存储在 S3 中的近 100 万行作为 parquet 文件读取到数据帧中(存储桶中的数据大小为 900mb)。根据值过滤数据帧,然后转换为 pandas 数据帧。涉及2个UDF(classify和transformDate)。 我在运行此代码片段时收到错误 eof 。这段代码有什么问题?是我缺少一些火花设置还是 UDF 使用不当?下面的代码片段

#Import headers skipped for simplicity

findspark.init()

def classify(value):
    if float(value) < 0.0 or float(value) >= 10.0:
        return -1
    return int(float(value) * 2) + 1

def transformDate(dateStr):
    date_format = '%d-%b-%Y:%H:%M:%S %Z'
    datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format)
    return datetime_obj

def read_from_s3():
    conf = SparkConf()
    conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
    conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    conf.set('spark.hadoop.fs.s3a.connection.maximum', 100)
    conf.set('fs.s3a.threads.max', 50)
    conf.set('spark.default.parallelism', 2048)
    conf.set('spark.sql.shuffle.partitions',4096)
    conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
    conf.set('spark.sql.streaming.schemaInference','true')

    conf.set('spark.rpc.message.maxSize', '1024')
    conf.set('spark.executor.memory', '32g')
    conf.set('spark.shuffle.file.buffer', '64k')
    conf.set('spark.eventLog.buffer.kb', '200k')
    conf.set('spark.executor.cores', '8')
    conf.set('spark.cores.max', '8')
    conf.set('spark.driver.memory', '32g')
    conf.set('spark.driver.maxResultSize', '21G')
    conf.set('spark.worker.cleanup.enabled', True)
    conf.set('spark.executor.heartbeatInterval', '43200s')
    conf.set('spark.network.timeout', '3000000s')


    conf.set('spark.hadoop.fs.s3a.access.key', '<<>>')
    conf.set('spark.hadoop.fs.s3a.secret.key', '<<>>')
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    spark.sparkContext.setLogLevel('DEBUG')
    classify_udf = udf(lambda x:classify(x),IntegerType())
    transform_date_udf = udf(lambda x:transformDate(x),TimestampType())
    paths = 's3a://<<mybucket>>/'
    df = spark.read.parquet(paths,
        inferSchema=True)
    df = df.withColumn('colA_encoded',classify_udf('colA'))
    df = df.withColumn('date_transformed',transform_date_udf('train_date'))
    return df

def do_filter(df: DataFrame):
    df = df.fillna({'colA':'0'})
    df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float'))
    df2 = (df.filter(col('colA_float') > 3.0).select('colZ').distinct())
    df=df.join(df2, 'colZ')
    return df


def do_proper_group(df: DataFrame):
    window = Window.partitionBy('colZ').orderBy('colA')
    df =df.select('*', dense_rank().over(window).alias('myIdx'))
    return df

def plot_data_frame(df: DataFrame =None) :
    fig, ax = plt.subplots()
    print(df.count())
    pandas_df = df.toPandas()
    print(pandas_df.columns)
    pandas_df.set_index('myIdx', inplace=True)
    pandas_df.groupby('colZ')['colA_float'].plot(legend=True,x='myIdx', xlabel="My Group Number", ylabel="Value")
    plt.show()

if __name__ == '__main__':
    start = time.time()
    print(start)
    plot_data_frame(do_proper_group(do_filter(read_from_s3())))
    end = time.time()
    print(end - start)

下面的错误日志

24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
  File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: read
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
  File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek

这个 EOF 错误来自哪里? S3存储桶中共有760个文件,大小总计900M。所有镶木地板文件。

不确定问题是否集中在 Spark 配置或我过滤数据帧并加入它的方式。任何帮助表示赞赏。


遇到的 EOFError 错误通常表明 Spark 的执行器 Python 进程与驱动程序进程之间的通信中断。这可能是由几个因素造成的,包括:

  • 网络问题: S3 和的 Spark 集群之间的网络连接不稳定或带宽有限,可能会导致数据提取和混洗操作期间出现问题。
  • 执行器资源不足: 如果的执行器没有足够的内存或 CPU 资源来处理数据量,它们可能会崩溃或运行缓慢,从而导致 EOFError
  • 数据倾斜: 如果的数据在特定键上出现倾斜(即,某些键具有比其他键多得多的记录),则可能会导致某些执行器执行比其他执行器更多的工作,从而导致它们失败。
  • Spark 配置: 一些 Spark 配置设置,例如 spark.network.timeout spark.executor.heartbeatInterval ,如果设置不正确,可能会导致通信超时。
  • UDF 代码中的错误: 虽然不太可能,但的 UDF 代码中也可能存在导致执行器崩溃的错误。

为了解决此问题,可以尝试以下步骤:

1. 调查网络问题: * 检查 S3 和的 Spark 集群之间的网络连接是否稳定可靠。 * 考虑增加 S3 连接的超时值,例如 spark.hadoop.fs.s3a.connection.timeout spark.hadoop.fs.s3a.request.timeout

2. 调整执行器资源: * 增加执行器内存 ( spark.executor.memory ) 和核心数 ( spark.executor.cores )。 * 尝试增加执行器数量 ( spark.executor.instances ) 以分配工作负载。

3. 解决数据倾斜: * 分析的数据以确定是否存在任何倾斜的键。 * 考虑使用诸如盐化之类的技术重新分区的数据,或在连接条件中使用广播连接来减少数据倾斜。

4. 检查 Spark 配置: * 确保 spark.network.timeout spark.executor.heartbeatInterval 设置足够高以防止超时。 * 可以尝试增加 spark.sql.shuffle.partitions 的值,以减少每个分区的混洗数据量。

5. 调试 UDF 代码: * 仔细检查的 UDF 代码是否存在任何错误,这些错误可能会导致执行器崩溃。 * 考虑在单独的环境中测试的 UDF 代码,以确保它们按预期工作。

此外,请确保的 S3 凭证配置正确且的 Spark 集群具有访问 S3 存储桶的必要权限。

如果在实施这些步骤后仍然遇到问题,请提供以下附加信息,以便我进一步帮助:

  • Spark 版本
  • 集群配置(执行器数量、每个执行器的内存和核心)
  • 完整的错误日志
  • 的 Spark 应用程序的任何其他相关代码片段

标签:python,pandas,apache-spark,amazon-s3,pyspark
From: 78790994

相关文章