首页 > 其他分享 >06_sparkStreaming

06_sparkStreaming

时间:2024-07-26 15:10:57浏览次数:17  
标签:map 06 val sparkStreaming RDD new DStream ssc

SparkStreaming

sparkStreaming 用于处理流式数据,其中输入数据源包括 Kafka、Flume、HDFS 等;结果输出目的地址包括 HDFS、数据库。

SparkCore 对应 RDD;SparkSQL 对应 DataFrame/DataSet;SparkStreaming 对应 DStream(离散化数据流),DStream 是对 RDD 在实时数据处理场景的一种封装。

img

SparkStreaming 主要由以下几个部分组成:

Receiver

数据源接收器,负责接收数据的 Executor 组件,负责接收 Executor 并转发给其它 Executor 做计算;

img

  • ReceiverAPI:专门用于接收数据的 Executor,但是接收速率和消费速率可能不匹配,导致计算节点内存溢出;
  • DirectAPI:由计算 Executor 直接主动消费 Kafka 等数据源数据,速度可由自身控制。

Micro-batch Generator

微批次生成器,将接收到的数据划分为小的微批次,每个批次包含一段时间范围内的数据,它可以控制生成速率。

img

DStream

离散化流,每个微批次数据可以转化为 DStream 对象。简单来说 DStream 就是对 RDD 在实时数据处理场景的一种抽象。

img

Transformations

转换操作,在 DStream 上执行一系列的转换操作(映射、过滤、聚合等);
转换操作是在微批次级别上进行的,对每个微批次的数据执行相同的转换操作。

RDD Generator

转换操作生成的 DStream 会被转化为相应 RDD 等待处理;
RDD 为分布式弹性数据集,可以由 Spark 并行处理。

Compute Engine

Spark引擎会根据RDD的依赖关系和转换操作构建执行计划,并将计算任务分配给集群中的工作节点执行。

Output Operations

计算执行完毕后,结果写入外部系统或者存储介质中;
输出可以是保存到文件系统、数据库、消息队列等组件;

Fault Tolerance

SparkStreaming 内置容错处理机制,可以处理节点故障或者数据丢失情况;

DStream 创建

RDD 队列

为了模拟测试,可以使用 RDD 队列接收 RDD 来模拟 DStream 创建。

  def test01(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val ssc = new StreamingContext(conf, Seconds(3))

    // 创建RDD队列
    val rddQueue = new mutable.Queue[RDD[Int]]
    // 获取 QueueInputStream
    val inputDStream = ssc.queueStream(rddQueue, oneAtATime = false)

    val sumDStream = inputDStream.reduce(_ + _)
    sumDStream.print()
    ssc.start()

    // 循环向 RDD 放入数据
    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 5)
      Thread.sleep(2000)
    }
    ssc.awaitTermination()
  }

自定义数据源接收器

同样可以自定义数据源,实现接收数据,数据传递到 Spark 等待处理。
继承 Receiver,实现 onStart、onStop 方法实现数据接收方法,单独启动一个线程负责接收数据并保存到 Spark 内存。

  def test02(): Unit = {
    //1.初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkstreaming")

    //2.初始化SparkStreamingContext
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    //3.创建自定义receiver的Streaming
    val lineDStream = ssc.receiverStream(new CustomerReceiver("192.168.56.153", 9999))

    //4.将每一行数据做切分,形成一个个单词
    val wordDStream = lineDStream.flatMap(_.split(" "))

    //5.将单词映射成元组(word,1)
    val wordToOneDStream = wordDStream.map((_, 1))

    //6.将相同的单词次数做统计
    val wordToSumDStream = wordToOneDStream.reduceByKey(_ + _)

    //7.打印
    wordToSumDStream.print()

    //8.启动SparkStreamingContext
    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * @param host : 主机名称
   * @param port : 端口号
   * Receiver[String] :返回值类型:String
   * StorageLevel.MEMORY_ONLY: 返回值存储级别
   */
  class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    // receiver刚启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
    override def onStart(): Unit = {
      //在onStart方法里面创建一个线程,专门用来接收数据
      new Thread("Socket Receiver") {
        override def run() {
          receive()
        }
      }.start()
    }

    // 读数据并将数据发送给Spark
    def receive(): Unit = {
      // 创建一个Socket
      var socket: Socket = new Socket(host, port)
      // 字节流读取数据不方便,转换成字符流buffer,方便整行读取
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
      // 读取数据
      var input: String = reader.readLine()
      //当receiver没有关闭并且输入数据不为空,就循环发送数据给Spark
      while (!isStopped() && input != null) {
        store(input)
        input = reader.readLine()
      }

      // 如果循环结束,则关闭资源
      reader.close()
      socket.close()
      //重启接收任务
      restart("restart")
    }

    override def onStop(): Unit = {}
  }

Kafka 数据源

https://dblab.xmu.edu.cn/blog/3158/

Kafka 或者 Flume 作为流式数据源,将数据发送到 SparkStreaming 进行计算,可以实现简单的流计算过程。

object KafkaWordCount{
  def main(args:Array[String]){
    // 初始化 Spark 环境配置
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc,Seconds(10))
    
    // 对于 Kafka/Flume 流式数据,可能会发生丢失,需要通过 ssc.checkPoint() 建立检查点机制
    ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoop
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topics = Array("wordsender")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    stream.foreachRDD(rdd => {
      val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      val maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))
      val lines = maped.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val pair = words.map(x => (x,1))
      val wordCounts = pair.reduceByKey(_+_)
      wordCounts.foreach(println)
    })
    ssc.start
    ssc.awaitTermination
  }
}

DStream 转换

DStream 的操作与 RDD 类似,分为转化操作、输出操作,此外转换操作还有一些比较特殊的方法:updateStateByKeytransform

DStream 的转换操作可以分为有状态转换操作、无状态转换操作两种:

无状态转换操作

将 RDD 操作运用到 DStream 的每个批次上,每个批次相互独立。常见的 RDD 操作比如 map、flatMap、filter、repartition、reduceByKey、groupByKey 等。

img

// 以黑名单过滤为例
  def method02(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")
    val ssc = new StreamingContext(conf, Seconds(8))
    ssc.sparkContext.setLogLevel("WARN")

    // 构建黑名单数据
    val blackList = Array(("spark", true), ("scala", true))
    val blackListRDD = ssc.sparkContext.makeRDD(blackList)

    // 生成测试 DStream,zipWithIndex 转换字符串为 (word, index) 元组格式数据
    val strArray = ("Spark java scala hadoop kafka hive hbase zookeeper").split("\\s+").zipWithIndex.map { case (word, idx) => s"$idx $word" }
    val rdd = ssc.sparkContext.makeRDD((strArray))
    val clickStream = new ConstantInputDStream(ssc, rdd)

    // 流式数据处理
    val clickStreamFormatted = clickStream.map(value => (value.split(" ")(1), value))
    clickStreamFormatted.transform(clickRDD => {
      val jointedBlackListRdd = clickRDD.leftOuterJoin(blackListRDD)
      jointedBlackListRdd.filter{
        case (word, (streamingLine, flag)) =>
          if (flag.getOrElse(false)) false
          else true
      }.map {case (word, (streamingLine, flag)) => streamingLine}
    }).print()

    // 启动流式作业
    ssc.start()
    ssc.awaitTermination()
  }

或者直接过滤,考虑到 Driver 端数据在多个 Executor 间重复使用,设置为广播变量:

  def method03(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")
    val ssc = new StreamingContext(conf, Seconds(8))
    ssc.sparkContext.setLogLevel("WARN")
    // 构建黑名单数据,构建广播变量
    val blackList = Array(("spark", true), ("scala", true))
    val blackListBC = ssc.sparkContext.broadcast(blackList.filter(_._2).map(_._1))

    // 生成测试 DStream,zipWithIndex 转换字符串为 (word, index) 元组格式数据
    val strArray = ("Spark java scala hadoop kafka hive hbase zookeeper").split("\\s+").zipWithIndex.map { case (word, idx) => s"$idx $word" }
    val rdd = ssc.sparkContext.makeRDD((strArray))
    val clickStream = new ConstantInputDStream(ssc, rdd)

    // 流式数据处理
    clickStream.map(value => (value.split(" ")(1), value))
      .filter { case (word, _) =>
        !blackListBC.value.contains(word)
      }
      .map(_._2)
      .print()

    // 启动流式作业
    ssc.start()
    ssc.awaitTermination()
  }

有状态转换操作

各个批次数据间存在关联性,包括基于滑动窗口的转化操作、追踪状态变化的转换操作。

窗口操作

可以设置当前窗口大小和滑动窗口间隔来动态获取当前 Streaming 的状态。
基于窗口的操作会比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次结果来计算出整个窗口的结果。
窗口依赖于两个参数:窗口长度(控制每次计算最近的多少个批次数据)、滑动步长(控制对新的 DStream 的计算间隔),注意这两个参数都必须为采集批次大小的整数倍。

核心窗口函数:

  • window(windowLength, slideInterval): 基于对源DStream窗口的批次进行计算返回一个新的DStream;
  • reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):在一个 (K,V) 对的 DStream 上调用此函数,返回一个新的 (K,V) DStream,并且统计窗口中批次数据的总和;
  • countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
  • reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的离散化数据流;

例如下面的例子,采集批次时间间隔为5s,采集时间窗口大小为 20s,采集窗口间隔为 10s.

  def window_demo01(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")

    // 每 5s 生成一个 RDD
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.sparkContext.setLogLevel("WARN")
    val lines = ssc.socketTextStream("localhost", 1521)
    lines.foreachRDD { (rdd, time) =>
      println(s"rdd = ${rdd.id}; time = $time")
      rdd.foreach(value => println(value))
    }

    // 设置 20s 窗口长度、10s 滑动间隔
    val res1 = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
    res1.print()
    val res2 = lines.window(Seconds(20), Seconds(10))
    res2.print()

    // 窗口元素求和
    val res3 = lines.map(_.toInt).reduceByWindow(_ + _, Seconds(20), Seconds(10))
    res3.print()

    val res4 = res2.map(_.toInt).reduce(_ + _)
    res4.print()
    ssc.start()
    ssc.awaitTermination()
  }

updateStateByKey 状态跟踪操作

该函数为每一个 Key 维护了一份 state 状态,并且定义了更新函数来对该 Key 的状态不断更新;
对于每个新的 batch 而言, Spark Streaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新;
使用 updateStateByKey 时要注意开启 checkpoint 功能。

这里演示一个对流式数据源的 wordCount 累计求和,并分批次保存结果:

  def window_demo03(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")
    val ssc = new StreamingContext(conf, Seconds(3))
    ssc.sparkContext.setLogLevel("ERROR")
    ssc.checkpoint("datas/checkpoint/")
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordStream = words.map((_, 1))

    // 定义状态更新函数
    val updateFunc = (values :Seq[Int], state:Option[Int])=> {
      // 计算当前批次总和
      val currentSum = values.sum
      // 计算历史状态总和
      val previousSum = state.getOrElse(0)
      // 计算当前+历史状态的总和
      Some(currentSum + previousSum)
    }


    val stateStream = wordStream.updateStateByKey(updateFunc)
    stateStream.print()

    stateStream.repartition(1).saveAsTextFiles("datas/output")
    ssc.start()
    ssc.awaitTermination()
  }

DStream 输出

DStream 支持的输出操作API包括:

  • saveAsTextFiles(prefix, [suffix]):以文本文件格式存储该 DStream 的内容;
  • saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 DStream 中数据保存为序列化文件数据;
  • print():简单输出到控制台,但只打印批次的最开始10个元素;
  • foreachRDD(func):将函数 func 作用到 DStream 的每一个 RDD。其中函数 func 可以执行任意复杂的逻辑,例如写入数据库、发送消息到消息队列、保存到文件系统等。
  def dstream_output(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")
    val ssc = new StreamingContext(conf, Seconds(3))

    val lineDStream = ssc.socketTextStream("192.168.56.153", 9999)
    val wordToOneStream = lineDStream.flatMap(_.split(" ")).map((_, 1))

    wordToOneStream.foreachRDD(
      rdd => {
        // Driver 端执行
        println("222222:" + Thread.currentThread().getName)
        rdd.foreachPartition(
          // Executor 端执行
          iter => iter.foreach(println)
          // 这里获取连接
          // 操作数据写数据库
          // 关闭连接....
        )
      }
    )

    ssc.start()
    ssc.awaitTermination()
  }

注意如果 DStream 数据写入到数据库,获取连接的操作必须实现在 Executor 端,而不是 Driver 层面。

DStream 优雅停机

在 HDFS 中根据配置决定当前 SparkStreaming 的生命周期。

  def stop_gracefully(): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("transform")
    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    val ssc = new StreamingContext(conf, Seconds(3))
    val lineDStream = ssc.socketTextStream("192.168.56.151", 9999)
    lineDStream.flatMap(_.split(" "))
      .map((_, 1))
      .print()

    // 开启监控程序
    new Thread(new MonitorStop(ssc)).start()
    ssc.start()
    ssc.awaitTermination()
  }

// 定时监控 hdfs 文件状态
class MonitorStop(ssc : StreamingContext) extends Runnable {
  override def run(): Unit = {
    val fs = FileSystem.get(new URI("hdfs://192.168.56.151:9000"), new Configuration(), "root")

    while (true) {
      Thread.sleep(5000)
      val result = fs.exists(new Path("hdfs://192.168.56.151:9000/stopSpark"))
      if (result) {
        val state =  ssc.getState()
        if (state == StreamingContextState.ACTIVE) {
          ssc.stop(true, true)
          System.exit(0)
        }
      }
    }
  }
}

标签:map,06,val,sparkStreaming,RDD,new,DStream,ssc
From: https://www.cnblogs.com/istitches/p/18325353

相关文章

  • 006-绕过web检查,传输sql语句的功能
    importorg.apache.commons.text.StringEscapeUtils;/***作用:*实现绕过web检查,传输sql语句的功能**pom:*org.apache.commons:commons-lang3:3.12.0*org.apache.commons:commons-text:1.10.0*/publicclassMain{publicstaticvoidmain(String[]arg......
  • 题解:P10721 [GESP202406 六级] 计算得分(未成功)
    博客食用更佳:Myblog题目传送门分析:这道题是一个标准的dp。我们可以先预处理多个\(\texttt{abc}\)连成的字符串的最大值,之后可以按最长平台的方法处理。步骤:初值:这题不需要赋值,因为题目保证得分是正的,故初值为\(0\)。状态:\(dp_i\)表示连续\(i\)个\(\texttt{abc......
  • [lnsyoj2210/luoguP5069]纵使日薄西山
    来源原题链接2024.7.25校内测验T3题意给定序列\(a\),\(m\)次查询,每次查询修改一个数,然后查询:每次操作选定最大且下标最小的数\(a_i\),使\(a_{i-1},a_i,a_{i+1}\)的值都减\(1\),查询将整个序列变为全非正数序列的操作次数.赛时50pts由于每次都会连带着相邻两个元素一......
  • CPT206 Computer Programming for Financial
    CPT206ComputerProgrammingforFinancialMathematics:CourseworkResitTaskSpecificationThomasSeligSet:Monday,22July,2024Duedate:Sunday,4August,2024,23:59ThisisthespecificationtasksheetfortheCourseworkresitassessmentofyourC......
  • CF906D Power Tower
    感觉没啥好说的,只要你知道扩展欧拉定理的式子就很trivial的一个题幂塔类的问题都考虑用扩展欧拉定理降幂,则每往指数上操作一层复杂度模数就会从\(m\)变为\(\phi(m)\)根据经典结论可知,该过程在大约\(\logm\)次操作后就会让模数变为\(1\),此时后面的部分就无需再计算了不......
  • servlet化学实验交流平台-计算机毕业设计源码13069
    目录摘要Abstract第一章绪论1.1选题背景及意义1.2国内外研究现状1.3研究方法第二章相关技术介绍2.1Java编程语言2.2B/S模式2.3MySQL简介2.4Serrvlet框架2.5Vue介绍第三章化学实验交流平台分析3.1系统目标3.2可行性分析3.2.1技术......
  • 代码随想录算法训练营第三天 | Leetcode 203 移除链表元素 Leetcode 206 翻转链表
    前言今天的两道题目都不难,但细节需要注意。如移除链表元素用到的虚拟头节点,翻转链表的思路。翻转链表真是写了忘,忘了写,希望这次能记住。除此之外我决定每天的记录里面增加一个总结八股的部分,将来二刷再翻看文章的时候顺便也能复习八股知识点。Leetcode203移除链表元素题目......
  • UNS0881a-P,V1 3BHB006338R0001 可编程控制器PLC
    产品型号:UNS0881a-P,V13BHB006338R0001产品类别:可编程控制器PLC产品成色:全新、非全新质量保障:365天原产地;美国库存;有货品牌;ABBUNS0881a-P,V13BHB006338R0001控制板是一种电子设备,主要用于控制和管理各种电气设备。它通常由主控芯片、外设接口、电源模块、存储模......
  • Java基础-学习笔记06
    **06访问修饰符封装继承多态**访问修饰符public公开级别,对外公开protected受保护级别,对子类和同一个包中的类公开default默认级别,无修饰符,向同一个包的类公开private私有级别,只有类本身可以访问,不对外公开修饰符可以用来修饰类中的属性,成员方法以及类只有默认......
  • ##笔记day06-C语言基础:随机数、一维、二维数组、字符数组
    day07笔记1)rand生成随机数1)rand()随机函数头文件:#include<stdlib.h>函数原型:intrand(void);函数功能:生成大于等于0的随机整数参数:void返回值:生成的随机整数2)srand更新随机数种子(srand()函数用于给rand()函数设定种子)头文件:......