设计管道
假设我们有一个简单的场景:事件正在流向Kafka,我们希望使用管道中的事件,进行一些转换并将结果写入BigQuery表,以使数据可用于分析。
可以在作业开始之前创建BigQuery表,或者Beam本身可以创建它。
代码看起来很简单:
EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()
。作为(EventsProcessingOptions。类);
管道 p = 管道。创造(选项);
PCollection tableRows =
//阅读kafka主题
p。apply(“kafka-topic-read”,kafkaReader)
。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())
。通过(记录 - > 记录。getKV。()的getValue()))
//将值转换为JsonNode
。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))
//创建TableRow
。申请(“建设-表行”,帕尔多。的(新 EventsRowFn()))
//将表格行保存到BigQuery
。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()
。到(tableSpec)
。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);
少了什么东西?
在现实世界中,可能会发生错误,在大多数情况下,我们将需要处理它们。
在上面的管道中,当我们尝试将事件从Kafka解析为JsonNode,转换期间以及BigQuery插入阶段时,可能会发生错误。
错误处理计划
对于每个错误,我们将在不同的BigQuery表中创建一行,其中包含更多信息,例如来自Kafka的origin事件。
一旦发生错误,我们就可以分析错误记录并全面了解它。
然后,我们可以修复管道代码,重置/更改Kafka使用者组偏移,并再次使用固定代码重播事件。
我们还可以修复事件本身(例如,在JSON解析错误中)并将其重新发送到Kafka。
处理转换错误
让我们快速浏览一下我们的转换函数:
@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
TableRow convertedRow = new TableRow();
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多转变来
背景。输出(输出);
}
private void insertLong(JsonNode value,String key,TableRow convertedRow){
String valueToInsert = value。asText();
如果(valueToInsert != 空 && !valueToInsert。的isEmpty()){
long longValue = Long。parseLong(valueToInsert);
convertedRow。set(key,longValue);
}
}
private void insertFloat(JsonNode value,String key,TableRow convertedRow){
String valueToInsert = getStringValue(value);
if(valueToInsert != null){
float floatValue = Float。parseFloat(valueToInsert);
convertedRow。set(key,floatValue);
}
}
是的,我们可能在解析过程中失败,因为我们将字符串解析为Float / Long,并且这对无法转换的数据失败。
我们需要从主函数输出中排除失败的数据并将这些数据发送到管道中的不同路径,然后我们将它们保存到BigQuery中的错误表中。
怎么样?让我们使用标签
当我们在ParDo 函数末尾输出一个元素时 ,我们可以在一个标签内输出它。然后我们可以获取所有标记为特定名称的元素,并对它们执行一些处理。
这里我们将使用两个标签,一个是MAIN标签,它包含所有成功的记录,另一个包含所有错误和一些上下文,例如 DEADLETTER_OUT。
该主标记必须与ParDo 函数本身的OUTPUT类型相同,并且 所有其他标记可以是不同类型。
现在,我们的 ParDo 函数将如下所示(注意标记添加):
@ProcessElement
public void processElement(@Element JsonNode > element,OutputReceiver < TableRowWithEvent > out){
public static final TupleTag < JsonNode > MAIN_OUT = new TupleTag < JsonNode >(){};
public static final TupleTag < BigQueryProcessError > DEADLETTER_OUT = new TupleTag < BigQueryProcessError >(){};
TableRow convertedRow = new TableRow();
尝试 {
insertLong(元件。得到(“server_time” ),“server_time” ,convertedRow);
insertFloat(元件。得到(“screen_dpi” ),“screen_dpi” ,convertedRow);
//更多转变来
背景。输出(输出);
} catch(例外 e){
记录器。误差(“失败变换” + ë。的getMessage(),ê);
背景。输出(DEADLETTER_OUT,新 BigQueryProcessError(convertedRow。的toString(),ê。的getMessage(),ERROR_TYPE。BQ_PROCESS,originEvent));
}
}
我们如何通过标签处理元素?让我们改变管道,并进行拆分。该 MAIN 元素将大量查询表和 DEADLETTER_OUT 内容将被发送到错误表。
EventsProcessingOptions options = PipelineOptionsFactory。fromArgs(args)。withValidation()
。作为(EventsProcessingOptions。类);
管道 p = 管道。创造(选项);
PCollectionTuple tableRows =
//阅读kafka主题
p。apply(“kafka-topic-read”,kafkaReader)
。申请(“海边的卡夫卡值”,MapElements。到(TypeDescriptors。字符串())
。通过(记录 - > 记录。getKV。()的getValue()))
//将值转换为JsonNode
。申请(“字符串到JSON” ,ParseJsons。的(JsonNode。类))
//创建TableRow
。申请(“建设-表行”,帕尔多。的(新 EventsRowFn())。withOutputTags(MAIN_OUT,TupleTagList。的(DEADLETTER_OUT)));
//将MAIN标签保存到BQ
tableRows
。得到(MAIN_OUT)
。apply(“BQ-write”,BigQueryIO。< TableRowWithEvent > write()
。到(tableSpec)
。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND);
//将DEADLETTER_OUT保存到BQ错误表
tableRows
。得到(DEADLETTER_OUT)
。申请(“BQ-进程的错误提取物”,帕尔多。的(新 BigQueryProcessErrorExtracFn()))
。申请(“BQ-进程的错误写”,BigQueryIO。writeTableRows()
。to(errTableSpec)
。withJsonSchema(errSchema)
。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)
。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));
p。run();
处理BigQuery插入错误
为了在BigQuery插入期间处理错误,我们必须使用BiqQueryIO API。
让我们放大写入阶段。并稍微改变一下:
WriteResult writeResult = tableRowToInsertCollection
。申请(“BQ-写”,BigQueryIO。写()
//指定将返回失败的行及其错误
。withExtendedErrorInfo()
。到(tableSpec)
。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_NEVER)
。withWriteDisposition(BigQueryIO。写。writeDisposition会。WRITE_APPEND)
//指定处理失败插入的策略。
。withFailedInsertRetryPolicy(InsertRetryPolicy。retryTransientErrors()));
//将失败的行及其错误写入错误表
写结果
。getFailedInsertsWithErr()
。申请(窗口。到(FixedWindows。的(持续时间。standardMinutes(5))))
。申请(“BQ-插入错误提取物”,帕尔多。的(新 BigQueryInsertErrorExtractFn(tableRowToInsertView))。withSideInputs(tableRowToInsertView))
。申请(“BQ-插入错误写”,BigQueryIO。writeTableRows()
。to(errTableSpec)
。withJsonSchema(errSchema)
。withCreateDisposition(BigQueryIO。写。createDisposition会。CREATE_IF_NEEDED)
。withWriteDisposition(BigQueryIO。收件。writeDisposition会。WRITE_APPEND));
在上面的代码片段中,我们从BigQueryIO获取失败的TableRows及其错误。现在我们可以将它们转换为另一个 TableRow 并将它们写入错误表。在这种情况下,我们让作业在需要时创建表。