首页 > 其他分享 >Flink侧输出流解析

Flink侧输出流解析

时间:2023-12-30 17:23:44浏览次数:23  
标签:输出 String val Flink 订单 数据流 解析 Order

在实时数据处理领域,Apache Flink 已成为一个不可或缺的工具。它以其高吞吐量和低延迟处理能力而闻名。而在 Flink 的众多特性中,侧输出流(Side Outputs)提供了一种灵活的方式来处理复杂的数据流。本文将探讨如何在 Flink 的 Scala API 中有效使用侧输出流。

1. 侧输出流的基本概念

侧输出流是一种特殊类型的输出流,它允许您从主数据流中分离出特定的事件或数据。与主流相比,侧输出流用于处理异常数据、监控事件或分流特殊数据,从而使主数据流保持清晰和高效。

2. Scala API中实现侧输出流

让我们通过一个简单的例子来了解如何在 Flink 的 Scala API 中实现侧输出流:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.OutputTag

object SideOutputExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val mainDataStream: DataStream[String] = env.socketTextStream("localhost", 9999)

    // 创建一个侧输出标签
    val sideOutputTag = new OutputTag[String]("side-output")

    // 处理主数据流
    val processedStream = mainDataStream.process(new ProcessFunction[String, String] {
      override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
        if (value.contains("特殊事件")) {
          ctx.output(sideOutputTag, value)
        } else {
          out.collect(value)
        }
      }
    })

    // 获取侧输出流
    val sideOutputStream = processedStream.getSideOutput(sideOutputTag)
    sideOutputStream.print()

    env.execute("Side Output Example")
  }
}

在这个例子中,我们定义了一个侧输出标签 sideOutputTag,用于从主数据流中分离包含“特殊事件”的数据。主数据流继续处理其他数据,而被标记的数据则通过侧输出流进行处理。

4. 实际案例分析

想象一个电商平台的实时数据处理场景,我们需要从用户活动流中分离出异常交易行为。使用侧输出流,我们可以轻松地将这些异常事件分流,进行进一步的分析和处理,而不干扰主流程的处理。

让我们通过一个处理订单数据的例子,展示如何在Flink 中使用 Scala API 和侧输出流来识别和处理异常交易。在这个场景中,我们假设有一个实时订单数据流,我们的目标是从中识别出异常订单(例如金额过大或过小的订单)并将其重定向到侧输出流以便进一步分析。

1. 数据流和订单模型

首先,我们定义一个订单的数据模型:

case class Order(orderId: String, userId: String, amount: Double, timestamp: Long)

假设我们有一个实时的订单数据流,每个订单都是一个 Order 对象。

2. 定义侧输出流

接着,我们定义一个侧输出流,专门用于处理异常订单。这些异常订单可以是金额过大或过小的订单:

val abnormalOrdersOutputTag = new OutputTag[Order]("abnormal-orders")

3. 定义环境

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

4. 处理订单流并分离异常订单

然后,我们对主数据流进行处理,将异常订单重定向到侧输出流:

val orders = List(
  Order("order1", "user1", 150.00, 1672382910000L), // 正常订单
  Order("order2", "user2", 5000.00, 1672382915000L), // 正常订单
  Order("order3", "user3", 20000.00, 1672382920000L), // 异常订单(金额过大)
  Order("order4", "user4", 50.00, 1672382925000L), // 异常订单(金额过小)
  Order("order5", "user5", 300.00, 1672382930000L) // 正常订单
)

// 模拟一个实时的订单数据流
val orderStream: DataStream[Order] = env.fromCollection(orders)

val processedOrderStream = orderStream.process(new ProcessFunction[Order, Order] {
  override def processElement(order: Order, ctx: ProcessFunction[Order, Order]#Context, out: Collector[Order]): Unit = {
    if (order.amount > 10000 || order.amount < 100) {
      // 如果订单金额异常,将订单发送到侧输出流
      ctx.output(abnormalOrdersOutputTag, order)
    } else {
      // 正常订单继续在主数据流中处理
      out.collect(order)
    }
  }
})

5. 获取并处理侧输出流

最后,我们获取侧输出流并对异常订单进行进一步的处理:

val abnormalOrdersStream = processedOrderStream.getSideOutput(abnormalOrdersOutputTag)
abnormalOrdersStream.map(order => s"异常订单: ${order.orderId}").print()

在这个例子中,我们将异常订单的订单ID打印出来,但在实际应用中,这个侧输出流可能被用于触发警报、进行深入分析或与其他系统集成。

6. 执行

env.execute("Order Side Output Example")

结果

异常订单: order3
异常订单: order4

通过使用侧输出流,我们能够在不干扰主数据流的情况下,有效地识别和处理异常订单。这种方法提高了数据处理的灵活性和效率,特别适合于复杂或多变的数据处理场景。

标签:输出,String,val,Flink,订单,数据流,解析,Order
From: https://www.cnblogs.com/toycon/p/17936541.html

相关文章

  • GPT-2(small)架构推理解析
    1、有字符串BBCAD2、为字符串中的每个字母添加index索引以进行排序,A、B、C、D的索引下标分别是0、1、2、3,因此排序的数字结果为011233、将01123中的每个数字转换为c个元素的向量(这个过程称为embedding,其中c是一个超参数)4、将每个字母的索引信息分别嵌入到tokenembedding矩阵的......
  • IP: dns-lookup : 查询域名的公网IP地址 解决 DNS域名解析绑架的问题例如访问 raw.git
    示例:https://github.com/orgs/community/discussions/42655https://github.com/mwaskom/seaborn-data/blob/2b29313169bf8dfa77d8dc930f7bd3eba559a906/dataset_names.txthttps://www.ip-lookup.org/dns-lookup/raw.githubusercontent.comIPDetailsDomain:Raw.githubuser......
  • 【flink番外篇】7、flink的State(Keyed State和operator state)介绍及示例(2) - operator
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 深入解析 C 语言中的 for 循环、break 和 continue
    C语言中的for循环当您确切地知道要循环执行代码块的次数时,可以使用 for 循环而不是 while 循环for (语句1; 语句2; 语句3){  //要执行的代码块}语句1在执行代码块之前执行(一次)。语句2定义执行代码块的条件。语句3在执行代码块后执行(每次)。下面的......
  • 深入解析 C 语言中的 for 循环、break 和 continue
    C语言中的for循环当您确切地知道要循环执行代码块的次数时,可以使用 for 循环而不是 while 循环for (语句1; 语句2; 语句3){  //要执行的代码块}语句1在执行代码块之前执行(一次)。语句2定义执行代码块的条件。语句3在执行代码块后执行(每次)。下面的......
  • CountDownLatch源码解析
    CountDownLatch源码解析countdown是倒计时的意思,latch是门闩的意思,也有门锁的意思,合起来字面意思就是一个倒计树计锁器的意思,先来看一个具体的案例分析大致了解importjava.util.concurrent.CountDownLatch;publicclassMain{publicstaticvoidmain(String[]args)......
  • MES喷码机联动:MES实时下发设备生产参数及信息,实现从上层系统控制设备输出
    随着工厂数字化的不断转型,设备单机工作已逐渐无法满足工业工厂互联网信息化数字化升级需求,从上层工单拉动设备生产参数的变化以及信息输出已经成为必然趋势。开发工具:C#WPF数据库:sqlite3MES喷码机联动下发工具:1.自动读取文本文件内容发送至喷码机,对接上层系统,如mes等。2.可......
  • 29.capability 配置参数解析与 分布式运行
    目录capability概述capability配置SeleniumGrid简介分布式运行capability概述Capabilities是WebDriver支持的标准命令之外的扩展命令(配置信息)配置web驱动的属性,如浏览器名称、浏览器平台等。结合SeleniumGrid完成分布式、兼容性等测试官网地址:https://ww......
  • 【Flink系列二十一】深入理解 JVM的类型加载约束,解决 Flink 类型加载冲突问题的通用方
    classByteArrayDeserializerisnotaninstanceoforg.apache.kafka.common.serialization.DeserializerDebuggingClassloading类似的XcannotbecasttoXexceptions如何理解这类异常?这类异常可以归纳为类型异常,按个人有限经验,现象分为两种常见情况:类型赋值检查:不能......
  • CRM公司管理系统能为中小企业做哪些事情?CRM功能解析
    巴菲特曾说:“设计出的工具越多,使用工具的人就得越聪明。“”如果您是中小企业主,想要企业更好地发展,您都可以考虑使用CRM管理系统。它可以帮助中小企业有效地管理客户,提高业务效率,实现快、稳、准的发展。本文将详细的描述公司管理系统CRM对中小型公司意味着什么???一、什么是CRM系......