首页 > 其他分享 >sparkstreaming转换算子--窗口函数

sparkstreaming转换算子--窗口函数

时间:2022-09-02 12:55:51浏览次数:74  
标签:10 窗口 val -- sparkstreaming Seconds 算子 DStream ssc

window

  • 画图理解
  • 说明
    countByWindow 对每个滑动窗口的数据执行count操作
    reduceByWindow 对每个滑动窗口的数据执行reduce操作
    reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
    countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
    都需要传入两个核心参数
    windowDuration: Duration, 窗口时间长度--一般是batchSize(批次时间)的整数倍
    slideDuration: Duration: 滑动时间长度----一般是batchSize(批次时间)的整数倍
  • 案例
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream

/**
 * 滑动窗口算子
 *    每隔一段时间,对原始DStream中多个批次数据整合  成为新的DStream中一个批次数据
 *
 * Spark Streaming中,一个批次执行一次,不会积攒当前批次的数据。滑动窗口算子可以实现将多个批次数据积攒下来,然后再去做统一的运算
 *   窗口算子最为基础核心的算子 window 会给我们返回一个新的DStream,但是这个DStream包含多个未被处理的批次数据
 *      窗口函数中需要传递核心参数
 *      windowDuration: Duration,  窗口时间长度--一般是batchSize(批次时间)的整数倍
 *      slideDuration: Duration:  滑动时间长度----一般是batchSize(批次时间)的整数倍
 */object ByWindow {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[3]").setAppName("transform3")
    val ssc = new StreamingContext(conf, Milliseconds(10000))

    val ds: DStream[String] = ssc.socketTextStream("node1", 44444, StorageLevel.MEMORY_ONLY)
    // 窗口长度 30s  滑动间隔 10s  每个10s时间将DStream中前30秒的数据 整合为一个批次数据处理
    val ds1 = ds.window(Seconds(10), Seconds(10))
    ds1.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 使用window窗口函数算子 严格意义上只负责去对原始DStream进行窗口检测,形成窗口批次数据的DStream,如果我们要对窗口批次数据
 * 进行处理的话,还得需要对窗口批次数据的DStream使用转换算子和行动算子计算逻辑
 *
 * windows函数也有一些变种的窗口函数算子:既可以实现窗口批次数据的检测,也可以实现一些相关的计算功能
 *   countByWindow 对每个滑动窗口的数据执行count操作
 *   reduceByWindow 对每个滑动窗口的数据执行reduce操作
 *   reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作
 *   countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作
 */
object ByWindow2 {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state02").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(10000))
    ssc.checkpoint("hdfs://node1:9000/sparkstreaming")

    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)
    val ds1 = ds.map((_, 1))
    val ds2 = ds1.reduceByKeyAndWindow((a: Int, b: Int)=>(a+b), Seconds(10), Seconds(10))
    ds2.print()
    println(",,,,,,,,,,")
    val ds3: DStream[Long] = ds1.countByWindow(Seconds(10), Seconds(10))
    ds3.print()
    println(",,,,,,,,,,")
    val ds4 = ds1.reduceByWindow((a, b) => (a._1+b._1, 0), Seconds(10), Seconds(10))
    ds4.print()
    println(",,,,,,,,,,")
    // 需要设置检查点
    val ds5 = ds1.countByValueAndWindow(Seconds(10), Seconds(10))
    ds5.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

应用场景:黑名单

package SparkStreaming.trans

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Milliseconds, Minutes, Seconds, StreamingContext}

/**
 * 黑名单统计
 *   有市场就有竞争,有竞争就少不来邪门外道
 *   A厂家投放了广告,广告每点击一次都是有记录的,但是不排初竞争对手的恶意点击
 *
 *   实时统计黑名单用户
 *   网站每隔3秒记录一批次用户的点击行为,记录的时候,认定如果在1分钟之内 用户点击次数超过10次 认定这个用户是一个黑名单用户
 *   需要把用户IP封掉
 *
 *   Spark Streaming去连接端口数据源:
 *     端口模拟用户的点击行为  发送数字 数字就代表某一个用户id
 */
object BlackUser {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("state01").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf,Milliseconds(3000))
    val ds:DStream[String] = ssc.socketTextStream("node1", 44444)

    val ds1:DStream[String] = ds.window(Minutes(1),Minutes(1))
    val ds2:DStream[(String,Int)] = ds1.map((_, 1)).reduceByKey(_ + _)
    //保留黑名单用户
    val ds3:DStream[(String,Int)] = ds2.filter(tuple=>{
      if(tuple._2>=10){
        true
      }else{
        false
      }
    })
    ds3.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

标签:10,窗口,val,--,sparkstreaming,Seconds,算子,DStream,ssc
From: https://www.cnblogs.com/jsqup/p/16649421.html

相关文章

  • letcode算法--6.字符串转换整数 (atoi)
    请你来实现一个 myAtoi(strings) 函数,使其能将字符串转换成一个32位有符号整数(类似C/C++中的atoi函数)。函数 myAtoi(strings)的算法如下:读入字符串并丢弃无......
  • codeforces极简题解
    CF1713F利用lucas定理,\(b_S\)表示下标\(T\)与\(S\)无交的\(a_T\)的异或,由于部分\(b_S\)未知,不能直接iFWT。回顾容斥:\([S=\emptyset]=\sum_{T\subseteqS}(-1)^|T|\),\([n=0......
  • SpringBoot整合Redis
    14、SpringBoot整合Redis14.1、概述SpringBoot操作数据库:spring-data,jpa,jdbc,mongodb,redisSpringData也是和SpringBoot齐名的项目!说明:在SpringBoot2.x之后,原来使用的jed......
  • springboot项目使用jsp
    异常问题场景提示:这里简述项目相关背景springboot课堂学习问题详情提示:这里描述项目中遇到的问题jsp无法访问原因分析提示:这里填写问题的分析没有jsp解......
  • 开放容器倡议 | Open Container Initiative (OCI)
    开放容器倡议(OCI)是一个轻量级的开放治理结构(项目),在Linux基金会的主持下形成,其明确目的是围绕容器格式和运行时创建开放的行业标准。OCI于2015年6月22日由D......
  • ubuntu20.04开启ipv6连接SSH
      所有操作均在root权限下进行1sudo-i  1、打开配置文件1vi/etc/ssh/sshd_config  2、编辑取消原有注释“#”,保存退出1#原有2#Port223......
  • C#|在List集合为NULL时需要添加数据的处理方法
    最近写了一个循环往字典的Value值添加数据的程序(字典的Value为一个List),经常碰到“Objectreferencenotsettoaninstanceofanobject”,就自己去了解了一下空集合和......
  • 紧急除颤
    #include<iostream>#include<stdio.h>#include<graphics.h>#include<conio.h>#include<stdlib.h>#include<Windows.h>#include<string>#include<iomanip>#pragmac......
  • JAVA进阶--常用时间API、包装类、正则表达式、Array类、Lambda表达式、常见算法--202
    第一节 Date日期对象1、日期对象如何创建,如何获取时间毫秒值Datedate=newDate();Longtime=date.getTime();2、时间毫秒值怎么恢复成......
  • 【C++】断言、likely等
    断言assert就是对表达式进行判断,如果条件不成立就会调用abort()中止程序运行,对于debug空指针有奇效,但是release版本不会用是一个宏而非函数五个要点:1.在函数开始时,监测......