首页 > 编程问答 >跳过 Python Spark Pyspark Databricks 未知字段异常中的不良记录

跳过 Python Spark Pyspark Databricks 未知字段异常中的不良记录

时间:2024-07-23 08:16:07浏览次数:13  
标签:python apache-spark error-handling databricks databricks-autoloader

我想知道是否有人知道如何跳过我们从 json 文件获取的记录

这是错误

[UNKNOWN_FIELD_EXCEPTION.NEW_FIELDS_IN_RECORD_WITH_FILE_PATH] 在解析过程中遇到未知字段: 这是失败的代码

sent = spark.readStream.format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('multiline', 'true') \
  .option('cloudFiles.inferColumnTypes', 'true') \
  .option('cloudFiles.schemaLocation', checkpoint_path) \
  .load(raw_files) \
  .withColumn('load_ts', F.current_timestamp()) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .trigger(availableNow=True) \
  .option('mergeSchema', 'true') \
  .toTable(b_write_path)

谢谢!

我还没有看到任何有关如何修复此错误的文档。


可以使用该选项在 PySpark 中使用结构化流处理跳过错误的 JSON 记录 corrupt_record_policy 并设置它 ignore 。这将跳过导致架构不匹配的任何记录。

以下是如何修改代码的方法:

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp

spark = SparkSession.builder.appName("SkipBadRecords").getOrCreate()

# ... 的其他设置和路径

sent = spark.readStream \
  .format('cloudFiles') \
  .option('cloudFiles.format', 'json') \
  .option('multiline', 'true') \
  .option('cloudFiles.inferColumnTypes', 'true') \
  .option('cloudFiles.schemaLocation', checkpoint_path) \
  .option("cloudFiles.corruptRecordPolicy", "ignore")\
  .load(raw_files) \
  .withColumn('load_ts', current_timestamp()) \
  .writeStream \
  .format('delta') \
  .option('checkpointLocation', checkpoint_path) \
  .trigger(availableNow=True) \
  .option('mergeSchema', 'true') \
  .toTable(b_write_path)

说明:

  1. option("cloudFiles.corruptRecordPolicy", "ignore") : 此行已添加到的读取选项中。它告诉 Spark 忽略架构不匹配的损坏或不完整的 JSON 记录。

  2. 错误处理和监控: 虽然此方法允许的流继续运行,但捕捉被忽略记录的详细信息以进行调试和潜在的进一步处理非常重要。可以考虑实现以下方法:

  3. 日志记录: 配置 Spark 记录被忽略记录的详细信息。这可以帮助识别有问题的记录和潜在的数据问题。

  4. 错误队列: 将被忽略的记录写入一个单独的错误队列或主题,以便于进一步调查和潜在的重新处理。

通过添加此选项,Spark 将在遇到架构不匹配的记录时静默地跳过它们,从而允许的流式处理管道继续运行而不会中断。请记住,实施适当的错误处理和监控对于维护数据完整性和识别潜在问题至关重要。

标签:python,apache-spark,error-handling,databricks,databricks-autoloader
From: 77448205

相关文章

  • 使用 python 截断 Databricks 中的增量表
    对于Python和SQL,这里给出了Delta表删除操作,并且给出了使用SQL的截断这里但我找不到Python截断表的文档。如何在Databricks中对增量表执行此操作?HowtodoitfordeltatableinDatabricks?虽然Databricks中没有直接使用PythonAPI截断De......
  • 从Python中的列表列表中提取随机列表选择
    我有一个从图像生成的RGB值列表:color_list=[(192,155,120),(132,81,65),(226,226,199),(76,94,117),(140,157,178),(17,34,54),(217,213,139),(134,171,144),(98,123,95),(109,145,96),(181,109,92),(71,47,39),......
  • 这段代码是否保证Python对象被立即删除?
    我正在将Redis异步客户端与Celery一起使用,但在两者之间的集成方面遇到了一些问题。上下文是我需要删除redis.Redis实例(在构造函数中创建)以便关闭连接(该对象有一个close方法,但当asyncio事件循环关闭时我无法使用它,heal_client方法仅在这些情况下才会使用。我的代码如......
  • python selenium 行为错误:AttributeError:“Context”对象没有属性“driver”
    我正在使用pythonselenium与Behavior包一起工作。这是代码:@given('theuserisontheloginpage')defstep_given_user_on_login_page(context):PATH='C:/Users/PycharmProjects/ui_test/chromedriver-win32/chromedriver.exe'context.driver=......
  • python 脚本中的路点用于处理大数据集
    我编写了一个脚本,将一堆来自api的请求写入csv文件。该api中有数千个请求,并且在结束循环/退出程序之前永远不会结束。如何合并航路点,以便如果再次发生停顿,它会自动继续前进并最终打印所有请求?我尝试了一些不起作用的方法,但我不知道下一步该怎么做。以下是使用航路点......
  • Python 中的 SSL 模块不可用(在 OSX 上)
    我在OSX10.13上的virtualenv中运行时遇到问题。我已经运行了pipinstall并且路径brewinstallopenssl指向/usr/local/include/openssl有谁知道如何解决这一问题?在我重新安装../opt/openssl/include/openssl使用python后,这种......
  • AWS Elastic Beanstalk chown PythonPath 错误
    我正在AWS的elasticbeanstalk上部署一个Web应用程序,遇到了同样的错误:[StageApplication].Stoprunningthecommand.Error:chown/var/app/staging/venv/bin/python:nosuchfileordirectory.我在我的环境配置中看到属性:PYTHONPATH:/var/......
  • Python:支持索引的内存对象数据库?
    我正在做一些数据整理,如果我可以将一堆字典放入内存数据库中,然后对其运行简单的查询,这会简单得多。例如,类似:people=db([{"name":"Joe","age":16},{"name":"Jane","favourite_color":"red"},])over_16=db.filter(age__g......
  • 如何构建一维数组的二维数组的特定 Python 结构?
    如何构建一维数组(即行向量)的二维数组的特定结构以满足特定我正在维护的遗留程序的结构?我可以在此结构中生成正确的内容all_measurements[:12]array([[0.,0.,0.,2.],[0.02,0.334,0.04,2.24],[0.04,0.668,0.08,2.48],...........
  • 如何使用 Python Flask 将新的咖啡馆(元素)添加到数据库(SQLite)?
    这是我的代码:@app.route("/add",methods=["POST"])defpost_new_cafe():new_cafe=Cafe(name=request.form.get("name"),map_url=request.form.get("map_url"),img_url=request.form.get("img......