我正在将存储在 S3 中的近 100 万行作为 parquet 文件读取到数据帧中(存储桶中的数据大小为 900mb)。根据值过滤数据帧,然后转换为 pandas 数据帧。涉及2个UDF(classify和transformDate)。 我在运行此代码片段时收到错误 eof 。这段代码有什么问题?是我缺少一些火花设置还是 UDF 使用不当?下面的代码片段
#Import headers skipped for simplicity
findspark.init()
def classify(value):
if float(value) < 0.0 or float(value) >= 10.0:
return -1
return int(float(value) * 2) + 1
def transformDate(dateStr):
date_format = '%d-%b-%Y:%H:%M:%S %Z'
datetime_obj = datetime.datetime.strptime("{} {}".format(dateStr, 'UTC'), date_format)
return datetime_obj
def read_from_s3():
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4')
conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.connection.maximum', 100)
conf.set('fs.s3a.threads.max', 50)
conf.set('spark.default.parallelism', 2048)
conf.set('spark.sql.shuffle.partitions',4096)
conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
conf.set('spark.sql.streaming.schemaInference','true')
conf.set('spark.rpc.message.maxSize', '1024')
conf.set('spark.executor.memory', '32g')
conf.set('spark.shuffle.file.buffer', '64k')
conf.set('spark.eventLog.buffer.kb', '200k')
conf.set('spark.executor.cores', '8')
conf.set('spark.cores.max', '8')
conf.set('spark.driver.memory', '32g')
conf.set('spark.driver.maxResultSize', '21G')
conf.set('spark.worker.cleanup.enabled', True)
conf.set('spark.executor.heartbeatInterval', '43200s')
conf.set('spark.network.timeout', '3000000s')
conf.set('spark.hadoop.fs.s3a.access.key', '<<>>')
conf.set('spark.hadoop.fs.s3a.secret.key', '<<>>')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel('DEBUG')
classify_udf = udf(lambda x:classify(x),IntegerType())
transform_date_udf = udf(lambda x:transformDate(x),TimestampType())
paths = 's3a://<<mybucket>>/'
df = spark.read.parquet(paths,
inferSchema=True)
df = df.withColumn('colA_encoded',classify_udf('colA'))
df = df.withColumn('date_transformed',transform_date_udf('train_date'))
return df
def do_filter(df: DataFrame):
df = df.fillna({'colA':'0'})
df= df.withColumn('colA_float',df['colA'].cast("float").alias('colA_float'))
df2 = (df.filter(col('colA_float') > 3.0).select('colZ').distinct())
df=df.join(df2, 'colZ')
return df
def do_proper_group(df: DataFrame):
window = Window.partitionBy('colZ').orderBy('colA')
df =df.select('*', dense_rank().over(window).alias('myIdx'))
return df
def plot_data_frame(df: DataFrame =None) :
fig, ax = plt.subplots()
print(df.count())
pandas_df = df.toPandas()
print(pandas_df.columns)
pandas_df.set_index('myIdx', inplace=True)
pandas_df.groupby('colZ')['colA_float'].plot(legend=True,x='myIdx', xlabel="My Group Number", ylabel="Value")
plt.show()
if __name__ == '__main__':
start = time.time()
print(start)
plot_data_frame(do_proper_group(do_filter(read_from_s3())))
end = time.time()
print(end - start)
下面的错误日志
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: read
24/07/24 06:47:31 DEBUG PythonUDFRunner: Exception thrown after task interruption
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\worker.py", line 1225, in main
File "D:\Downloads\spark-3.5.1-bin-hadoop3\spark-3.5.1-bin-hadoop3\python\lib\pyspark.zip\pyspark\serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(generated.java:28)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:179)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
24/07/24 06:47:31 DEBUG Invoker: Starting: lazySeek
这个 EOF 错误来自哪里? S3存储桶中共有760个文件,大小总计900M。所有镶木地板文件。
不确定问题是否集中在 Spark 配置或我过滤数据帧并加入它的方式。任何帮助表示赞赏。
遇到的
EOFError
错误通常表明 Spark 的执行器 Python 进程与驱动程序进程之间的通信中断。这可能是由几个因素造成的,包括:
- 网络问题: S3 和的 Spark 集群之间的网络连接不稳定或带宽有限,可能会导致数据提取和混洗操作期间出现问题。
-
执行器资源不足:
如果的执行器没有足够的内存或 CPU 资源来处理数据量,它们可能会崩溃或运行缓慢,从而导致
EOFError
。 - 数据倾斜: 如果的数据在特定键上出现倾斜(即,某些键具有比其他键多得多的记录),则可能会导致某些执行器执行比其他执行器更多的工作,从而导致它们失败。
-
Spark 配置:
一些 Spark 配置设置,例如
spark.network.timeout
或spark.executor.heartbeatInterval
,如果设置不正确,可能会导致通信超时。 - UDF 代码中的错误: 虽然不太可能,但的 UDF 代码中也可能存在导致执行器崩溃的错误。
为了解决此问题,可以尝试以下步骤:
1. 调查网络问题:
* 检查 S3 和的 Spark 集群之间的网络连接是否稳定可靠。
* 考虑增加 S3 连接的超时值,例如
spark.hadoop.fs.s3a.connection.timeout
和
spark.hadoop.fs.s3a.request.timeout
。
2. 调整执行器资源:
* 增加执行器内存 (
spark.executor.memory
) 和核心数 (
spark.executor.cores
)。
* 尝试增加执行器数量 (
spark.executor.instances
) 以分配工作负载。
3. 解决数据倾斜: * 分析的数据以确定是否存在任何倾斜的键。 * 考虑使用诸如盐化之类的技术重新分区的数据,或在连接条件中使用广播连接来减少数据倾斜。
4. 检查 Spark 配置:
* 确保
spark.network.timeout
和
spark.executor.heartbeatInterval
设置足够高以防止超时。
* 可以尝试增加
spark.sql.shuffle.partitions
的值,以减少每个分区的混洗数据量。
5. 调试 UDF 代码: * 仔细检查的 UDF 代码是否存在任何错误,这些错误可能会导致执行器崩溃。 * 考虑在单独的环境中测试的 UDF 代码,以确保它们按预期工作。
此外,请确保的 S3 凭证配置正确且的 Spark 集群具有访问 S3 存储桶的必要权限。
如果在实施这些步骤后仍然遇到问题,请提供以下附加信息,以便我进一步帮助:
- Spark 版本
- 集群配置(执行器数量、每个执行器的内存和核心)
- 完整的错误日志
- 的 Spark 应用程序的任何其他相关代码片段