首页 > 其他分享 >有状态转化操作WindowOperations

有状态转化操作WindowOperations

时间:2024-01-24 11:22:45浏览次数:31  
标签:状态 窗口 WindowOperations val reduce 转化 滑动 DStream 函数

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。

➢ 窗口时长:计算内容的时间范围;

➢ 滑动步长:隔多久触发一次计算。

注意:这两者都必须为采集周期大小的整数倍。

object WindowOperationsDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc = new StreamingContext(conf, Duration(3000))
sc.checkpoint("ck/")
val lineDs: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 8888)
val wordMap: DStream[(String, Int)] = lineDs.flatMap(_.split(" ")).map((_, 1))
//加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
val Sum: DStream[(String, Int)] = wordMap.reduceByKeyAndWindow((x,y)=>x+y,(x,y)=>x-y,Duration(6000),Duration(3000))
Sum.print()
sc.start()
sc.awaitTermination()
}
}

 关于 Window 的操作还有如下方法:

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream; (2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数; (3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流; (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。 (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式传入)。如前述函数,reduce 任务的数量通过可选参数来配置。

countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。

countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()返回的 DStream 则包含窗口中每个值的个数。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), 
Seconds(10)) 
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))

 

 

 

标签:状态,窗口,WindowOperations,val,reduce,转化,滑动,DStream,函数
From: https://www.cnblogs.com/huifeidezhuzai/p/17984209

相关文章

  • 有状态转化操作UpdateStateByKey
    UpdateStateByKey原语用于记录历史记录,有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件......
  • React 状态管理 valtio 解析
    valtio是什么valtio是一个很轻量级的响应式状态管理库,它基于Proxy实现,类似于vue的数据驱动视图的理念,使用外部状态代理去驱动React视图更新,不管在react组件内部还是外面都可以使用。下面提供valtio基本用法例子:https://codesandbox.io/embed/5x592g?view=Editor+%2B+Pr......
  • 无状态转化操作
    无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加importStreamingContext._才能在Scala中使用。 需要记住的是,尽管这些函数看起......
  • 使用递归解决嵌套页面的状态改变
    场景一个注销页,里面有四种状态。注销说明页输入手机号码和图形验证码输入短信验证码注销处理中在每一个状态中,都需要被APP调用window.jumpOther()返回到上一个状态<template><divv-if="pageStatus.isDelete"></div><divv-if="pageStatus.isInputPhone"></div......
  • 虚拟机出现未知状态
    虚拟机出现未知状态虚拟机出错信息 方法一:首先将出现未知状态的虚拟机关机(ssh或远程桌面关机),找到虚拟机的存储位置:  找到后缀为.vmx文件后,将虚拟机移除清单  在指定的主机创建新的虚拟机,->自定义->虚拟机name->选择主机->选择磁盘(最好选原先的磁盘)->虚拟机版本->系统及版......
  • 主机提示IPMI 系统事件日志状态告警
    登陆vCenter连接到一台ESX主机(Dell服务器,很久之前机器的小屏幕上就有告警,内容为日志满了,因为机器不能重启,所以一直没有机会去清除日志)时,得到一条警报:主机IPMI系统事件日志状态,这种警报通常是由于系统事件日志满了导致的,必须清除IPMI系统日志后重置传感器。1.Client登陆vCenter控......
  • VUE框架CLI组件化配置Router切换路由和保持路由状态和路由组件的销毁------VUE框架
    <template><div><MyHeader></MyHeader><div><h1>省份</h1><!--在默认的情况下,我们切换组件会导致原组件被销毁--><button@click="forward()">前进</button>......
  • LOJ3990/LG9602 IOI2023 足球场 题解 (区间DP+精简无用状态)
    首先考虑一个足球场长啥样才是合法的。发现一个点能只拐弯一次到达另一个点,可以分为两种情况:先左右走,再上下走或先上下走,后左右走。无论哪种情况,都要求我们走一步使得和目标点一个轴相同,再走一步使得另一个轴也相同,所以加入把每一行选择的格子看成一个区间(因为如果不连续显然是......
  • MySQL线程状态详解
    前言:我们常用showprocesslist或showfullprocesslist查看数据库连接状态,其中比较关注的是State列,此列表示该连接此刻所在的状态。那么你真的了解不同State值所表示的状态吗?下面我们参考官方文档来一探究竟。以MySQL5.7版本为例官方文档地址:https://dev.my......
  • js中的bigint类型转化为json字符串时报无法序列化的问题
    网上查了一下,解决这个问题的思路就是将bigint类型的数据转化为字符串,这样就能正确转化为json字符串了。对于一个是bigint的变量,直接使用toString方法就可以转化为字符串了,但是bigint变量在一个对象中,那么我们就需要一个更加通用的方法,网上看到一个很好的封装好的方法,如下。expor......