我有一个函数可以从路径读取 avro 文件(按日文件夹)并将其写入 相同的路径(聚合到按月的文件夹)。如果文件夹有 .avro 文件,该函数可以正常工作。
但如果文件夹为空,我会收到错误。
java.io.FileNotFoundException: No avro files found. If files don't have .avro extension, set ignoreExtension to true
我已经使用 try 和 except 块进行了处理。即使那样我也会收到错误。这是下面的函数。
def aggregate_data(path_to_source, input_format, output_format, output_table_format='avro'):
all_input_dates = [str(fileStatus.getPath()).split('/')[-1] for fileStatus in
lake.listStatus(fs.Path(path_to_source))
if 'latest' not in str(fileStatus.getPath())]
all_output_dates = list(set(datetime.datetime.strptime(i, input_format).strftime(output_format)
for i in all_input_dates if validate_date(i, input_format) - {datetime.date.today().strftime(output_format)})
print("***** {0} *****".format(path_to_source))
try:
for _partition in all_output_dates:
src_df = spark.read.option("ignoreExtension":"true"). format(output_table_format)\
.load(os.path.join(path_to_source, '{0}*'.format(_partition)))
agg_df = src_df.repartition(estimate_part_num(src_df)).write.mode('overwrite')\
.format(output_table_format).save(os.path.join(path_to_source, _partition))
command_exist = ['hadoop', 'fs', '-rm', '-r', os.path.join(path_to_source, '{0}[.]*'. format(_partition))]
subprocess.call(command_exist)
except FileNotFoundException as e:
print("**** Exception occured ****".format(path_to_source),e)
如何在 python 中处理 FileNotFoundException
看来你的问题不是 FileNotFoundException 本身,而是代码中其他的地方导致了这个异常的抛出。错误信息 "No avro files found." 表明这是 Spark 在尝试读取 Avro 文件时抛出的异常,而不是 Python 解释器抛出的 FileNotFoundException。
以下是改进你的代码并处理这种情况的步骤:
-
捕获 Spark 异常 : Python 的
FileNotFoundException
不会捕获 Spark 抛出的异常。你需要捕获更通用的异常类型,比如Py4JJavaError
,它包含 Spark 抛出的 Java 异常。 -
检查目录是否存在 : 在尝试读取数据之前,先检查目录是否存在。你可以使用
os.path.isdir
函数来检查目录是否存在。 -
处理空目录 : 如果目录存在但为空,你可以选择跳过该目录或者进行其他处理。
以下是修改后的代码:
def aggregate_data(path_to_source, input_format, output_format, output_table_format='avro'):
# ... (获取日期列表)
print("***** {0} *****".format(path_to_source))
for _partition in all_output_dates:
# 构建目录路径
dir_path = os.path.join(path_to_source, _partition)
# 检查目录是否存在
if os.path.isdir(dir_path):
try:
# 读取数据,如果目录为空,可能会抛出异常
src_df = spark.read.option("ignoreExtension", "true") \
.format(output_table_format) \
.load(os.path.join(dir_path, '*'))
# ... (处理数据)
except Py4JJavaError as e:
# 打印异常信息
print(f"**** Exception occured while processing {dir_path}: {e}")
# 检查是否为 "No avro files found" 异常
if "No avro files found" in str(e):
print(f"**** Directory {dir_path} is empty. Skipping...")
else:
# 其他 Spark 异常,可能需要进一步处理或抛出
raise e
else:
print(f"**** Directory {dir_path} does not exist. Skipping...")
# ... (删除旧数据)
这段代码会检查每个日期目录是否存在,如果存在但为空,则会打印一条信息并跳过该目录。如果遇到其他类型的异常,则会打印异常信息并抛出异常,以便你能进行调试。
希望这些改进可以帮助你解决问题。如果还有其他问题,请告诉我。
标签:python,exception,filenotfoundexception From: 78814397