首页 > 编程语言 >Apache Beam和BigQuery的错误处理(Java SDK)

Apache Beam和BigQuery的错误处理(Java SDK)

时间:2023-06-15 12:34:27浏览次数:42  
标签:JsonNode Java 错误 convertedRow BigQuery BigQueryIO BQ 错误处理 OUT

设计管道

假设我们有一个简单的场景:事件正在流向Kafka,我们希望使用管道中的事件,进行一些转换并将结果写入BigQuery表,以使数据可用于分析。


可以在作业开始之前创建BigQuery表,或者Beam本身可以创建它。


代码看起来很简单:


EventsProcessingOptions  options  =  PipelineOptionsFactory。fromArgs(args)。withValidation()
                    。作为(EventsProcessingOptions。类);
 
管道 p  =  管道。创造(选项);
 
PCollection  tableRows  =
                  //阅读kafka主题
                   p。apply(“kafka-topic-read”,kafkaReader)
                    。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())
                            。通过(记录 - > 记录。getKV。()的getValue()))
                    //将值转换为JsonNode
                    。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))
                    //创建TableRow
                    。申请(“建设-表行”,帕尔多。的(新 EventsRowFn()))
 
                    //将表格行保存到BigQuery
                    。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()
                            。到(tableSpec)
                            。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
                            。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);

少了什么东西?

在现实世界中,可能会发生错误,在大多数情况下,我们将需要处理它们。


在上面的管道中,当我们尝试将事件从Kafka解析为JsonNode,转换期间以及BigQuery插入阶段时,可能会发生错误。


错误处理计划

对于每个错误,我们将在不同的BigQuery表中创建一行,其中包含更多信息,例如来自Kafka的origin事件。


一旦发生错误,我们就可以分析错误记录并全面了解它。


然后,我们可以修复管道代码,重置/更改Kafka使用者组偏移,并再次使用固定代码重播事件。


我们还可以修复事件本身(例如,在JSON解析错误中)并将其重新发送到Kafka。


处理转换错误

让我们快速浏览一下我们的转换函数:


@ProcessElement
public  void  processElement(@Element  JsonNode >  element,OutputReceiver < TableRowWithEvent >  out){
 
  TableRow  convertedRow  =  new  TableRow();
 
  insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
  insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
 
  //更多转变来
 
  背景。输出(输出);
 
}  
 
 
private  void  insertLong(JsonNode  value,String  key,TableRow  convertedRow){
        String  valueToInsert  =  value。asText();
        如果(valueToInsert  !=  空 &&  !valueToInsert。的isEmpty()){
            long  longValue  =  Long。parseLong(valueToInsert);
            convertedRow。set(key,longValue);
        }
}
 
private  void  insertFloat(JsonNode  value,String  key,TableRow  convertedRow){
        String  valueToInsert  =  getStringValue(value);
        if(valueToInsert  !=  null){
            float  floatValue  =  Float。parseFloat(valueToInsert);
            convertedRow。set(key,floatValue);
        }
}

是的,我们可能在解析过程中失败,因为我们将字符串解析为Float / Long,并且这对无法转换的数据失败。


我们需要从主函数输出中排除失败的数据并将这些数据发送到管道中的不同路径,然后我们将它们保存到BigQuery中的错误表中。


怎么样?让我们使用标签


当我们在ParDo 函数末尾输出一个元素时  ,我们可以在一个标签内输出它。然后我们可以获取所有标记为特定名称的元素,并对它们执行一些处理。


这里我们将使用两个标签,一个是MAIN标签,它包含所有成功的记录,另一个包含所有错误和一些上下文,例如  DEADLETTER_OUT。


该主标记必须与ParDo 函数本身的OUTPUT类型相同,并且  所有其他标记可以是不同类型。


现在,我们的  ParDo 函数将如下所示(注意标记添加):


@ProcessElement
public  void  processElement(@Element  JsonNode >  element,OutputReceiver < TableRowWithEvent >  out){
 
  public  static  final  TupleTag < JsonNode >  MAIN_OUT  =  new  TupleTag < JsonNode >(){};
  public  static  final  TupleTag < BigQueryProcessError >  DEADLETTER_OUT  =  new  TupleTag < BigQueryProcessError >(){};
 
  TableRow  convertedRow  =  new  TableRow();
 
  尝试 {
 
      insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
      insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
 
      //更多转变来
 
      背景。输出(输出);
 
  } catch(例外 e){
      记录器。误差(“失败变换” + ë。的getMessage(),ê);
      背景。输出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));
 
  }  
 
}
我们如何通过标签处理元素?让我们改变管道,并进行拆分。该  MAIN 元素将大量查询表和  DEADLETTER_OUT 内容将被发送到错误表。

EventsProcessingOptions  options  =  PipelineOptionsFactory。fromArgs(args)。withValidation()
                    。作为(EventsProcessingOptions。类);
 
管道 p  =  管道。创造(选项);
 
PCollectionTuple  tableRows  =
                  //阅读kafka主题
                   p。apply(“kafka-topic-read”,kafkaReader)
                    。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())
                            。通过(记录 - > 记录。getKV。()的getValue()))
                    //将值转换为JsonNode
                    。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))
                    //创建TableRow
                    。申请(“建设-表行”,帕尔多。的(新 EventsRowFn())。withOutputTags(MAIN_OUT,TupleTagList。的(DEADLETTER_OUT)));
 
 
                  //将MAIN标签保存到BQ
                  tableRows
                      。得到(MAIN_OUT)
                      。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()
                      。到(tableSpec)
                      。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
                      。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);
 
                  //将DEADLETTER_OUT保存到BQ错误表
                  tableRows
                    。得到(DEADLETTER_OUT)
                    。申请(“BQ-进程的错误提取物”,帕尔多。的(新 BigQueryProcessErrorExtracFn()))
                    。申请(“BQ-进程的错误写”,BigQueryIO。writeTableRows()
                            。to(errTableSpec)
                            。withJsonSchema(errSchema)
                            。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)
                            。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));
  p。run();

处理BigQuery插入错误

为了在BigQuery插入期间处理错误,我们必须使用BiqQueryIO API。


让我们放大写入阶段。并稍微改变一下:


WriteResult  writeResult  =  tableRowToInsertCollection
        。申请(“BQ-写”,BigQueryIO。写()
        //指定将返回失败的行及其错误
                。withExtendedErrorInfo()
                。到(tableSpec)
                。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
                。withWriteDisposition(BigQueryIO。写。writeDisposition会。WRITE_APPEND)
        //指定处理失败插入的策略。
                。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));
 
 
//将失败的行及其错误写入错误表                
写结果
        。getFailedInsertsWithErr()
        。申请(窗口。到(FixedWindows。的(持续时间。standardMinutes(5))))
        。申请(“BQ-插入错误提取物”,帕尔多。的(新 BigQueryInsertErrorExtractFn(tableRowToInsertView))。withSideInputs(tableRowToInsertView))
        。申请(“BQ-插入错误写”,BigQueryIO。writeTableRows()
                。to(errTableSpec)
                。withJsonSchema(errSchema)
                。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)
                。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));

在上面的代码片段中,我们从BigQueryIO获取失败的TableRows及其错误。现在我们可以将它们转换为另一个  TableRow 并将它们写入错误表。在这种情况下,我们让作业在需要时创建表。


标签:JsonNode,Java,错误,convertedRow,BigQuery,BigQueryIO,BQ,错误处理,OUT
From: https://blog.51cto.com/u_16145034/6486220

相关文章

  • Java 多线程同步问题的探究(二、给我一把锁,我能创造一个规矩)
    在上一篇中,我们讲到了多线程是如何处理共享资源的,以及保证他们对资源进行互斥访问所依赖的重要机制:对象锁。本篇中,我们来看一看传统的同步实现方式以及这背后的原理。很多人都知道,在Java多线程编程中,有一个重要的关键字,synchronized。但是很多人看到这个东西会感到困惑:“都说同......
  • Java中的WeakHashMap与类示例
    在本文中,我们将WeakHashMap 通过示例从java.util包中学习  类。我们将学到什么?WeakHashMap 课程概述WeakHashMap 类构造方法摘要WeakHashMap 类构造方法WeakHashMap 类示例1.WeakHashMap类概述WeakHashMap 是一个基于Hash表的Map接口实现的弱键。当其密钥不再正常使用......
  • Java正则表达式详解
    如果你曾经用过Perl或任何其他内建正则表达式支持的语言,你一定知道用正则表达式处理文本和匹配模式是多么简单。如果你不熟悉这个术语,那么“正则表达式”(RegularExpression)就是一个字符构成的串,它定义了一个用来搜索匹配字符串的模式。许多语言,包括Perl、PHP、Python、JavaScript......
  • JavaScript内存限制
    JavaScriptmemorylimitJavaScript应用程序可以存储的最大数据量是多少?我猜这是由浏览器处理的,每个浏览器都有其局限性吗?如果没有限制,将创建页面文件吗?如果是这样,那不安全吗? 相关讨论  有一些限制,尽管这些取决于浏览器。例如,Firefox对堆栈空间以及过多的CPU消......
  • Java_Dom4j_解析xml
    via:http://blog.163.com/kewangwu@126/blog/static/8672847120126261033594/ 1、DOM4J简介DOM4J是dom4j.org出品的一个开源XML解析包。DOM4J应用于Java平台,采用了Java集合框架并完全支持DOM,SAX和JAXP。Dom:把整个文档作为一个对象。DOM4J最大的特色是使用大量的接口......
  • ajax + java 实现类似网易邮箱邮件地址自动完成功能
    ajax+java实现类似网易邮箱邮件地址自动完成功能2008-04-0218:30********************************************************************源代码下载链接:http://www.javaeye.com/topic/150778***************************************************************......
  • 您必须知道的重要Java关键字
    什么是Java中的关键字?Java关键字是一个保留字,具有与之关联的特殊含义。为便于识别,它们通常以Java格式突出显示。在50个关键字中,有48个正在使用,而有两个不在。让我们更详细地研究一些重要的Java关键字。重要的Java关键字列表摘要: 它用于完成 抽象。它是一种与类和方法相关的非访......
  • java服务器更换jdk版本后报错:javax.net.ssl.SSLHandshakeException: No appropriate p
    java,服务器更换jdk版本后报错:Causedby:javax.net.ssl.SSLHandshakeException:Noappropriateprotocol(protocolisdisabledorciphersuitesareinappropriate)然后数据库出现:###Errorqueryingdatabase.Cause:java.lang.reflect.UndeclaredThrowableExc......
  • javascript现代编程系列教程之七——字符集(七)
    Unicode:Unicode是一个字符集(Charset),包含了世界上所有的字符。每个字符在Unicode中都有其唯一对应的数字编号,这就是我们常说的Unicode码。UTF-8:UTF-8是Unicode的实现方式之一。UTF-8使用一至四个字节为每个字符编码,英文字符通常使用一个字节,西欧其他语言的部分字符使用......
  • 用JavaScript绘制树状图(具有分支合并功能)的一种方法(其一)
    需求分析在很多模拟经营游戏中,科技树是一项重要的内容,其为玩家提供了各项技术与其前后置科技间的拓扑关系。这些科技树在表现形式上和普通树状图很相似,但由于其频繁的分支合并,为科技树的绘制带来困难。因此,我们需要一种简单的方法来绘制科技树。比如,当用户输入:<!--为了降低用户......