首页 > 其他分享 >有状态转化操作UpdateStateByKey

有状态转化操作UpdateStateByKey

时间:2024-01-24 10:56:39浏览次数:24  
标签:状态 updateStateByKey val Int 转化 批次 UpdateStateByKey sc DStream

UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

通俗的讲:由于spark streaming是微批次的流式处理框架,并不是严格的实时流处理框架,还是一个批次一个批次的处理数据,只是间隔时间比较短,无状态处理的话,这个批次的数据无法与上个批次的数据一起处理,调用updateStateByKey的话(需要配合检查点,spark中checkpoint在此处有优化,并非保存所有的历史数据,而是将当前计算结果保存),该算子能够记录流的累计状态,就可以操作多个批次的数据。

注意: updateStateByKey操作,要求必须开启Checkpoint机制

示例:


object UpstateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc = new StreamingContext(conf, Duration(3000))
sc.checkpoint("ck/")
val lineDs: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 8888)
val wordMap: DStream[(String, Int)] = lineDs.flatMap(_.split(" ")).map((_, 1))
// 到了这里,就不一样了,之前的话,是不是直接就是wordMap.reduceByKey
// 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
// 然后,可以打印出那个时间段的单词计数
// 但是,有个问题,你如果要统计每个单词的全局的计数呢?
// 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
// 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
// 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
val Sum: DStream[(String, Int)] = wordMap.updateStateByKey[Int]((value: Seq[Int], state: Option[Int]) => {
//状态更新函数
val current: Int = value.sum
val preValue: Int = state.getOrElse(0)
Some(current + preValue)// 更新后的状态返回
})
Sum.print()
sc.start()
sc.awaitTermination()
}
}
 

 

标签:状态,updateStateByKey,val,Int,转化,批次,UpdateStateByKey,sc,DStream
From: https://www.cnblogs.com/huifeidezhuzai/p/17984106

相关文章

  • React 状态管理 valtio 解析
    valtio是什么valtio是一个很轻量级的响应式状态管理库,它基于Proxy实现,类似于vue的数据驱动视图的理念,使用外部状态代理去驱动React视图更新,不管在react组件内部还是外面都可以使用。下面提供valtio基本用法例子:https://codesandbox.io/embed/5x592g?view=Editor+%2B+Pr......
  • 无状态转化操作
    无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如reduceByKey())要添加importStreamingContext._才能在Scala中使用。 需要记住的是,尽管这些函数看起......
  • 使用递归解决嵌套页面的状态改变
    场景一个注销页,里面有四种状态。注销说明页输入手机号码和图形验证码输入短信验证码注销处理中在每一个状态中,都需要被APP调用window.jumpOther()返回到上一个状态<template><divv-if="pageStatus.isDelete"></div><divv-if="pageStatus.isInputPhone"></div......
  • 虚拟机出现未知状态
    虚拟机出现未知状态虚拟机出错信息 方法一:首先将出现未知状态的虚拟机关机(ssh或远程桌面关机),找到虚拟机的存储位置:  找到后缀为.vmx文件后,将虚拟机移除清单  在指定的主机创建新的虚拟机,->自定义->虚拟机name->选择主机->选择磁盘(最好选原先的磁盘)->虚拟机版本->系统及版......
  • 主机提示IPMI 系统事件日志状态告警
    登陆vCenter连接到一台ESX主机(Dell服务器,很久之前机器的小屏幕上就有告警,内容为日志满了,因为机器不能重启,所以一直没有机会去清除日志)时,得到一条警报:主机IPMI系统事件日志状态,这种警报通常是由于系统事件日志满了导致的,必须清除IPMI系统日志后重置传感器。1.Client登陆vCenter控......
  • VUE框架CLI组件化配置Router切换路由和保持路由状态和路由组件的销毁------VUE框架
    <template><div><MyHeader></MyHeader><div><h1>省份</h1><!--在默认的情况下,我们切换组件会导致原组件被销毁--><button@click="forward()">前进</button>......
  • LOJ3990/LG9602 IOI2023 足球场 题解 (区间DP+精简无用状态)
    首先考虑一个足球场长啥样才是合法的。发现一个点能只拐弯一次到达另一个点,可以分为两种情况:先左右走,再上下走或先上下走,后左右走。无论哪种情况,都要求我们走一步使得和目标点一个轴相同,再走一步使得另一个轴也相同,所以加入把每一行选择的格子看成一个区间(因为如果不连续显然是......
  • MySQL线程状态详解
    前言:我们常用showprocesslist或showfullprocesslist查看数据库连接状态,其中比较关注的是State列,此列表示该连接此刻所在的状态。那么你真的了解不同State值所表示的状态吗?下面我们参考官方文档来一探究竟。以MySQL5.7版本为例官方文档地址:https://dev.my......
  • js中的bigint类型转化为json字符串时报无法序列化的问题
    网上查了一下,解决这个问题的思路就是将bigint类型的数据转化为字符串,这样就能正确转化为json字符串了。对于一个是bigint的变量,直接使用toString方法就可以转化为字符串了,但是bigint变量在一个对象中,那么我们就需要一个更加通用的方法,网上看到一个很好的封装好的方法,如下。expor......
  • 字符串转化为toList
    certNameList="certNameList":"消防设施工程专业承包二级,地基基础工程专业承包一级,电子与智能化工程专业承包一级,建筑装修装饰工程专业承包一级,建筑幕墙工程专业承包一级",for(EntQualificationVOvo:entQualificationVOList){StringcertNameList......