首页 > 其他分享 >Flink计算TopN

Flink计算TopN

时间:2023-12-26 16:56:35浏览次数:34  
标签:Map 01 String Top Flink Sale TopN 计算 mutable

在 Apache Flink 中实现高效的 Top N 数据处理,尤其是涉及时间窗口和多条件排序时,需要精细地控制数据流和状态管理。


普通计算TopN:

1. 定义数据源(Source)

首先,我们需要定义数据源。这可能是 Kafka 流、文件、数据库或任何其他支持的数据源。

val stream: DataStream[YourType] = env.addSource(...)

2. 定义业务逻辑(Transformation)

接下来,我们需要根据业务需求对数据进行转换。这可能包括映射、过滤、聚合等操作。

val transformedStream: DataStream[YourTransformedType] = stream
  .map(...)       // 例如,映射操作
  .filter(...)    // 例如,过滤操作
  // 其他转换操作...

3. 计算 Top N

在 Flink 中,计算 Top N 可以通过使用 KeyedProcessFunction 或 Window(窗口)实现。以下是两种常见的方法:

方法 A:使用 KeyedProcessFunction

定义一个状态来存储当前 Top N 的元素。这通常是一个 ListState 或 MapState。

使用 keyBy 函数对数据进行分组。这是根据某个键(如用户 ID、产品类别等)进行分组。

使用 process 函数处理每个元素。在这个函数中,会去更新状态,并保留当前的 Top N 元素。

stream
  .keyBy(...)  // 分组键
  .process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
    // 定义状态
    private var state: ListState[YourType] = ...

    override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
      // 更新状态
      // 计算 Top N
    }
  })

方法 B:使用 Time Window(时间窗口)

如果我们的 Top N 计算是基于某个时间范围内的数据(例如,每5分钟的 Top N),则可以使用时间窗口。

使用 window 函数定义窗口。这可以是滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)等。

在窗口上应用聚合或其他操作,以计算每个窗口的 Top N。

import org.apache.flink.streaming.api.windowing.time.Time

stream
  .keyBy(...)  // 分组键
  .timeWindow(Time.minutes(5))  // 定义时间窗口
  .process(new ProcessWindowFunction[...] {
    override def process(key: KeyType, context: Context, elements: Iterable[InputType], out: Collector[OutputType]): Unit = {
      // 计算每个窗口的 Top N
    }
  })

4. 输出结果(Sink)

最后,将计算得到的 Top N 结果输出到所需的目的地,如 Kafka、数据库、控制台等。

topNStream.addSink(...)

关于划分窗口计算TopN:

1. 数据预处理与窗口划分

首先,对数据流进行 keyBy 操作,根据特定的键(如分类ID、事件ID等)对数据进行分组。然后,定义一个滑动窗口来平滑数据,并在这个窗口内应用增量聚合函数。

val aggregatedStream = dataStream
  .keyBy(...)  // 分组键
  .timeWindow(Time.minutes(5), Time.minutes(1))  // 定义滑动窗口
  .aggregate(new MyAggregateFunction, new MyWindowFunction)

其中,MyAggregateFunction 是一个实现了 AggregateFunction 接口的类,用于增量聚合窗口内的数据。MyWindowFunction 是一个实现了 WindowFunction 接口的类,用于在窗口触发时获取窗口信息并输出中间结果。

2. 定义 ProcessFunction 以处理窗口数据

在得到每个窗口的聚合结果后,使用 ProcessFunction 来处理这些数据。在这个函数中,可以使用 ValueState 来存储每个窗口的数据,并注册一个定时器来控制何时输出 Top N 结果。

val topNStream = aggregatedStream
  .keyBy(...)  // 根据需要的键进行二次分组
  .process(new KeyedProcessFunction[KeyType, InputType, OutputType] {
    // 定义状态
    private var windowDataState: ValueState[List[YourType]] = ...

    override def processElement(value: InputType, ctx: Context, out: Collector[OutputType]): Unit = {
      // 更新状态
      // 注册定时器
      ctx.timerService().registerEventTimeTimer(value.windowEndTime + 1)
    }

    override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OutputType]): Unit = {
      // 排序并输出 Top N
    }
  })

3. 数据排序与 Top N 输出

在 onTimer 方法中,当定时器触发时,对存储在 ValueState 中的数据进行排序,并输出每个窗口的 Top N 结果。这一步骤确保了只有当一个窗口的所有数据都已经到达时,才进行排序和输出。

4. 状态和容错处理

由于在实时计算中,状态管理和容错是关键考虑因素,确保您的状态管理策略(如使用 ListState 或 ValueState)与您的容错需求(如检查点和保存点)相匹配。

5. 考虑 Watermarks 和数据延迟

由于 Flink 中的时间管理很重要,确保合理地设置 Watermarks,以处理乱序事件和数据延迟。这对于确保定时器准确触发和窗口正确计算至关重要。


两次 keyBy 解释:

在上面的思路中做了两次 keyBy ,主要是为了在不同维度上进一步细化数据处理逻辑,原因:

1. 针对不同维度的数据处理

初次 keyBy 通常是根据主要维度(如用户 ID、商品类别等)进行分组,以便在这些维度上进行聚合或其他处理。而在某些情况下,聚合后的数据可能需要根据额外的维度进行进一步的处理。例如,你可能需要根据聚合结果的时间窗口或其他业务逻辑相关的维度进行分析和处理。

2. 更精细的数据管理

二次分组允许对数据流进行更精细的切分,使得每个子流可以根据不同的业务逻辑进行独立处理。这样可以更灵活地应对复杂的业务需求,例如在不同时间窗口或不同事件类型上实施不同的逻辑。

3. 优化资源利用和性能

通过在不同的维度上分组,可以更有效地利用 Flink 的资源,比如任务的并行度和状态管理。这种方式有助于提高整体的处理效率,减少不必要的资源浪费。

4. Top N 计算的特殊需求

在进行 Top N 计算时,尤其是当需要根据多个维度(如时间窗口、分类ID、事件ID等)进行排序和选择时,二次分组变得尤为重要。这样可以确保每个独立的子流都有自己的 Top N 计算逻辑,更加精确地反映不同维度组合下的数据特性。

5. 提高容错性和可维护性

二次分组有助于隔离不同数据流的处理,使得系统更加容错,易于维护和调试。当处理复杂的数据流时,这种隔离可以使问题定位和解决变得更加容易。


现在我们来看一个例子:
假设我们要计算过去一小时内,每5分钟更新一次,每个类别中销售额最高的Top 3产品,并要求这些产品在不同地区的销售额进行对比。

1. 定义案例类

case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)

2. 环境配置

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

3. 使用集合来模拟数据源

    // 示例数据源
    class SampleSource extends SourceFunction[Sale] {
      override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
        // 第一个小时的数据
        ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L)) // 2021-01-01 00:00:00
        ctx.collect(Sale("product2", "category1", 150.0, "region1", 1609459800L)) // 2021-01-01 00:10:00
        ctx.collect(Sale("product3", "category1", 120.0, "region2", 1609460400L)) // 2021-01-01 00:20:00
        ctx.collect(Sale("product4", "category2", 200.0, "region1", 1609461000L)) // 2021-01-01 00:30:00
        ctx.collect(Sale("product5", "category2", 250.0, "region2", 1609461600L)) // 2021-01-01 00:40:00
        ctx.collect(Sale("product6", "category3", 300.0, "region1", 1609462200L)) // 2021-01-01 00:50:00

        // 第二个小时的数据
        ctx.collect(Sale("product7", "category1", 130.0, "region3", 1609462800L)) // 2021-01-01 01:00:00
        ctx.collect(Sale("product8", "category2", 210.0, "region1", 1609463400L)) // 2021-01-01 01:10:00
        ctx.collect(Sale("product9", "category3", 350.0, "region2", 1609464000L)) // 2021-01-01 01:20:00
        ctx.collect(Sale("product10", "category3", 320.0, "region3", 1609464600L)) // 2021-01-01 01:30:00
        ctx.collect(Sale("product11", "category1", 140.0, "region1", 1609465200L)) // 2021-01-01 01:40:00
        ctx.collect(Sale("product12", "category2", 230.0, "region2", 1609465800L)) // 2021-01-01 01:50:00
      }

      override def cancel(): Unit = {}
    }

4. 设置 WatermarkStrategy

    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
      .withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
        override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
      })

5. 添加数据源并应用时间戳和水印

    val timedSalesStream = env.addSource(new SampleSource())
      .assignTimestampsAndWatermarks(watermarkStrategy)

6. 处理数据流的数据

    val topSales = timedSalesStream
      .keyBy(_.category)
      .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
      .aggregate(new TopSalesAggregateFunction())
      .map { result =>
        result.map { case (category, salesList) =>
          val topSalesByRegion = salesList
            .groupBy(_.region) // 按地区分组
            .mapValues(_.sortBy(-_.amount).take(3)) // 每个地区取销售额最高的Top 3
            .toList
            .sortBy(_._1) // 按地区名称排序

          val formattedSales = topSalesByRegion.map { case (region, sales) =>
            val salesInfo = sales.map(sale => s"${sale.productId} (amount: ${sale.amount})").mkString(", ")
            s"Region: $region, Top Sales: $salesInfo"
          }.mkString("; ")

          s"Category: $category, $formattedSales"
        }.mkString("\n")
      }

7. 定义TopSalesAggregateFunction

class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
  override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()

  override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
    accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
    if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
    accumulator
  }

  override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
    accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
  }

  override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
    for ((category, sales) <- b) {
      val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
      sales.foreach(mergedQueue.enqueue(_))
      if (mergedQueue.size > 3) mergedQueue.dequeue()
    }
    a
  }
}

8. 输出与执行作业

    topSales.print()

    env.execute("Sales Top 3 Analysis")

现在我们来逐行解释一下代码:

案例类定义

case class Sale(productId: String, category: String, amount: Double, region: String, timestamp: Long)

case class Sale 定义了一个样例类 Sale,用于表示销售数据。它包含以下字段:
productId:产品的ID。
category:产品所属的类别。
amount:销售额。
region:销售发生的地区。
timestamp:销售发生的时间戳。

环境配置

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

env 是 Flink 流执行环境的实例,它用于设置和执行流处理作业。
env.setParallelism(1) 设置作业的并行度为 1。这意味着作业将在单个任务槽中运行。

数据源定义

class SampleSource extends SourceFunction[Sale] {
  // ...
}

这段代码定义了一个名为 SampleSource 的自定义数据源,它实现了 SourceFunction 接口。这个数据源产生 Sale 类型的数据。

数据源的 run 方法

override def run(ctx: SourceFunction.SourceContext[Sale]): Unit = {
  // 示例数据
}

run 方法负责在流执行期间不断发送 Sale 数据。这里通过调用 ctx.collect 方法来发出数据。

示例数据生成

ctx.collect(Sale("product1", "category1", 100.0, "region1", 1609459200L))
// ...更多数据

这些行创建 Sale 对象,并通过 ctx.collect 发送它们。每个对象包含示例数据,例如产品ID、类别、销售额、地区和时间戳。

水印策略

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness[Sale](Duration.ofSeconds(10))
  .withTimestampAssigner(new SerializableTimestampAssigner[Sale] {
    override def extractTimestamp(element: Sale, recordTimestamp: Long): Long = element.timestamp * 1000
  })

设置了一个水印策略,允许一定程度的乱序(最多10秒延迟)。这对于基于事件时间的窗口操作至关重要。
withTimestampAssigner 定义了如何从 Sale 对象中提取时间戳。

数据流配置

val timedSalesStream = env.addSource(new SampleSource())
  .assignTimestampsAndWatermarks(watermarkStrategy)

这里,env.addSource(new SampleSource()) 从自定义源 SampleSource 添加数据到流中。
.assignTimestampsAndWatermarks(watermarkStrategy) 将之前定义的水印策略应用于数据流。

聚合操作

val topSales = timedSalesStream
  .keyBy(_.category)
  .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
  .aggregate(new TopSalesAggregateFunction())

数据流根据 category 字段进行分组(keyBy(_.category))。
应用了滑动时间窗口(每小时一窗,每5分钟滑动一次)。
使用自定义的聚合函数 TopSalesAggregateFunction 来计算每个类别中销售额最高的 Top 3 产品。

聚合函数的定义

class TopSalesAggregateFunction extends AggregateFunction[Sale, mutable.Map[String, mutable.PriorityQueue[Sale]], Map[String, List[Sale]]] {
  // 方法实现
}

这个类扩展了 AggregateFunction,它是 Flink API 中用于自定义聚合逻辑的一部分。
泛型参数解释:
Sale:输入类型,表示每个元素的类型。
mutable.Map[String, mutable.PriorityQueue[Sale]]:累加器的类型,用于聚合中间结果。
Map[String, List[Sale]]:聚合结果的类型。

createAccumulator 方法

override def createAccumulator(): mutable.Map[String, mutable.PriorityQueue[Sale]] = mutable.Map[String, mutable.PriorityQueue[Sale]]()

此方法初始化累加器,它是存储中间聚合状态的数据结构。
在这里,累加器是一个映射,它将每个类别映射到一个优先队列(PriorityQueue)。优先队列用于保持每个类别销售额最高的产品。

add 方法

override def add(value: Sale, accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
  accumulator.getOrElseUpdate(value.category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount))).enqueue(value)
  if (accumulator(value.category).size > 3) accumulator(value.category).dequeue()
  accumulator
}

add 方法定义了如何将一个新的 Sale 元素添加到累加器中。
对于每个 Sale 元素,它首先检查累加器中是否已经有该类别的队列。如果没有,它会创建一个新的队列。
队列根据销售额进行排序,最高的销售额在队列前面。
元素被添加到相应类别的队列中。
如果队列的大小超过 3(即我们只关心销售额最高的前三个产品),则从队列中移除销售额最低的产品。

getResult 方法

override def getResult(accumulator: mutable.Map[String, mutable.PriorityQueue[Sale]]): Map[String, List[Sale]] = {
  accumulator.mapValues(_.toList.sortBy(-_.amount)).toMap
}

getResult 方法提取聚合的最终结果。
它将每个类别的 PriorityQueue 转换为一个列表,并按销售额降序排序。
最终结果是一个映射,将每个类别映射到其销售额最高的 Top 3 产品列表。

merge 方法

override def merge(a: mutable.Map[String, mutable.PriorityQueue[Sale]], b: mutable.Map[String, mutable.PriorityQueue[Sale]]): mutable.Map[String, mutable.PriorityQueue[Sale]] = {
  for ((category, sales) <- b) {
    val mergedQueue = a.getOrElseUpdate(category, mutable.PriorityQueue[Sale]()(Ordering.by(-_.amount)))
    sales.foreach(mergedQueue.enqueue(_))
    if (mergedQueue.size > 3) mergedQueue.dequeue()
  }
  a
}

merge 方法定义了如何合并两个累加器的结果,这在 Flink 的分布式计算环境中是必要的。
对于累加器 b 中的每个类别和其对应的销售记录队列,方法将这些记录合并到累加器 a 的对应队列中。
合并后,如果某个类别的队列大小超过 3,则移除多余的元素,确保队列只包含销售额最高的 Top 3 产品。
最后,返回合并后的累加器 a

以上就是本文全部内容啦५꒰۶⁎⁼̴̀ω⁼̴́⁎꒱۶

标签:Map,01,String,Top,Flink,Sale,TopN,计算,mutable
From: https://www.cnblogs.com/toycon/p/17927099.html

相关文章

  • 【Flink从入门到精通 05】Source&Sink
    【Flink从入门到精通05】Source&SinkFlink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。01EnvironmentFlink所有的程序都从这一步开始,只有创建了执行环境,才能开......
  • Java版Flink(一)概述和入门案例
    一、概述1、Flink是什么ApacheFlinkisaframeworkanddistributedprocessingengineforstatefulcomputationsoverunboundedandboundeddatastreams.ApacheFlink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。官网地址2、Flink特点......
  • 【计算机网络中的冲突域、广播域和碰撞域】
    (文章目录)冲突域(CollisionDomain)冲突域是指在一个局域网(LAN)上,当两个或多个设备同时发送数据帧(数据包)时,可能发生冲突的区域。在以太网中,当两个设备同时发送数据帧到同一个物理网络(例如,使用同一根网线)时,它们的数据帧可能会在中途碰撞,导致数据帧损坏,需要重新传输。为什么有冲突域......
  • 隐私计算:数据匿名化的优点和缺点
    PrimiHub一款由密码学专家团队打造的开源隐私计算平台,专注于分享数据安全、密码学、联邦学习、同态加密等隐私计算领域的技术和内容。数据分析是如今商业社会业务运营的核心工具,节省成本的同时还可以深入了解用户偏好,通过定制产品来收获最大化收益。然而,企业持有的大量数据是用......
  • 【flink番外篇】6、flink的WaterMark(介绍、基本使用、kafka的水印以及超出最大允许延
    Flink系列文章一、Flink专栏Flink专栏系统介绍某一知识点,并辅以具体的示例进行说明。1、Flink部署系列本部分介绍Flink的部署、配置相关基础内容。2、Flink基础系列本部分介绍Flink的基础部分,比如术语、架构、编程模型、编程指南、基本的datastreamapi用法、四大基......
  • 分享一些linux云计算开发面试题
    近年来,随着云计算的快速发展,对于中高级Linux云计算开发人才的需求也越来越大。在面试过程中,面试官通常会提出一系列与Linux云计算开发相关的问题,以评估面试者的技术实力和解决问题的能力。本文将围绕中高级Linux云计算开发面试题展开讨论,并给出相关问答。 一、虚拟化技术 1.什么......
  • 量子优势:计算机的未来
    量子优势:计算机的未来发布日期:2023-11-30    浏览次数:2 量子计算具有里程碑式的优势量子计算具有里程碑式的优势。量子计算机可以解决最强大的非量子或经典计算机无法解决的问题。量子计算机利用它的奇异的性能来解决问题。量子指的是原子和分子或更小粒......
  • 计算机领域前沿技术(未来计算机科学中的十大新兴技术)
    计算机领域前沿技术(未来计算机科学中的十大新兴技术)发布日期:2023-07-29    浏览次数:3计算机科学技术是一股强大的力量。从提高生产力到弥合地理距离,它从各个方面影响着我们的生活。人工智能(AI)、机器学习(ML)、机器人和5G网络正在重塑行业,开辟新的应用。例如,“精准医疗”正......
  • R:计算相对丰度最简单的R代码
    rm(list=ls())#清除所有变量setwd("C:\\Users\\Administrator\\Desktop\\新建文件夹\\PCoA")#设置工作目录#加载必要的库library(dplyr)#读取数据,假设您的文件名是data.txt#请根据您的文件实际路径调整这里的文件名data<-read.table("otu_table.txt",header......
  • 【UniApp】-uni-app-动态计算字体大小(苹果计算器)
    前言本文主要介绍uni-app中动态计算字体大小的方法原因呢就是在上一篇文章当中我发现输入的内容已经超过了展示区域于是我就想到了动态计算字体大小的方法,这样就可以保证输入的内容不会超过展示区域正文首先要改造的是style="font-size:180rpx;"这里不能直接写......