UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
通俗的讲:由于spark streaming是微批次的流式处理框架,并不是严格的实时流处理框架,还是一个批次一个批次的处理数据,只是间隔时间比较短,无状态处理的话,这个批次的数据无法与上个批次的数据一起处理,调用updateStateByKey的话(需要配合检查点,spark中checkpoint在此处有优化,并非保存所有的历史数据,而是将当前计算结果保存),该算子能够记录流的累计状态,就可以操作多个批次的数据。
注意: updateStateByKey操作,要求必须开启Checkpoint机制
示例:
object UpstateByKeyDemo {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount")
val sc = new StreamingContext(conf, Duration(3000))
sc.checkpoint("ck/")
val lineDs: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 8888)
val wordMap: DStream[(String, Int)] = lineDs.flatMap(_.split(" ")).map((_, 1))
// 到了这里,就不一样了,之前的话,是不是直接就是wordMap.reduceByKey
// 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
// 然后,可以打印出那个时间段的单词计数
// 但是,有个问题,你如果要统计每个单词的全局的计数呢?
// 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
// 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
// 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
val Sum: DStream[(String, Int)] = wordMap.updateStateByKey[Int]((value: Seq[Int], state: Option[Int]) => {
//状态更新函数
val current: Int = value.sum
val preValue: Int = state.getOrElse(0)
Some(current + preValue)// 更新后的状态返回
})
Sum.print()
sc.start()
sc.awaitTermination()
}
}
标签:状态,updateStateByKey,val,Int,转化,批次,UpdateStateByKey,sc,DStream From: https://www.cnblogs.com/huifeidezhuzai/p/17984106