我想知道是否有人知道如何跳过我们从 json 文件获取的记录
这是错误
[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] 在解析过程中遇到未知字段: 这是失败的代码
sent = spark.readStream.format('cloudFiles') \
.option('cloudFiles.format', 'json') \
.option('multiline', 'true') \
.option('cloudFiles.inferColumnTypes', 'true') \
.option('cloudFiles.schemaLocation', checkpoint_path) \
.load(raw_files) \
.withColumn('load_ts', F.current_timestamp()) \
.writeStream \
.format('delta') \
.option('checkpointLocation', checkpoint_path) \
.trigger(availableNow=True) \
.option('mergeSchema', 'true') \
.toTable(b_write_path)
谢谢!
我还没有看到任何有关如何修复此错误的文档。
可以使用该选项在 PySpark 中使用结构化流处理跳过错误的 JSON 记录
corrupt_record_policy
并设置它
ignore
。这将跳过导致架构不匹配的任何记录。
以下是如何修改代码的方法:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
spark = SparkSession.builder.appName("SkipBadRecords").getOrCreate()
# ... 的其他设置和路径
sent = spark.readStream \
.format('cloudFiles') \
.option('cloudFiles.format', 'json') \
.option('multiline', 'true') \
.option('cloudFiles.inferColumnTypes', 'true') \
.option('cloudFiles.schemaLocation', checkpoint_path) \
.option("cloudFiles.corruptRecordPolicy", "ignore")\
.load(raw_files) \
.withColumn('load_ts', current_timestamp()) \
.writeStream \
.format('delta') \
.option('checkpointLocation', checkpoint_path) \
.trigger(availableNow=True) \
.option('mergeSchema', 'true') \
.toTable(b_write_path)
说明:
-
option("cloudFiles.corruptRecordPolicy", "ignore")
: 此行已添加到的读取选项中。它告诉 Spark 忽略架构不匹配的损坏或不完整的 JSON 记录。 -
错误处理和监控: 虽然此方法允许的流继续运行,但捕捉被忽略记录的详细信息以进行调试和潜在的进一步处理非常重要。可以考虑实现以下方法:
-
日志记录: 配置 Spark 记录被忽略记录的详细信息。这可以帮助识别有问题的记录和潜在的数据问题。
- 错误队列: 将被忽略的记录写入一个单独的错误队列或主题,以便于进一步调查和潜在的重新处理。
通过添加此选项,Spark 将在遇到架构不匹配的记录时静默地跳过它们,从而允许的流式处理管道继续运行而不会中断。请记住,实施适当的错误处理和监控对于维护数据完整性和识别潜在问题至关重要。
标签:python,apache-spark,error-handling,databricks,databricks-autoloader From: 77448205