首页 > 编程语言 >Spark Streaming程序优雅关闭

Spark Streaming程序优雅关闭

时间:2024-01-24 16:35:23浏览次数:33  
标签:val Int 优雅 Streaming new apache org Spark ssc

流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

其实就是单独起一个线程专门去专门查找程序是否停止的标志

import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
class MonitorStop(ssc: StreamingContext) extends Runnable {
 override def run(): Unit = {
 val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new 
Configuration(), "atguigu")
 while (true) {
 try
 Thread.sleep(5000)
 catch {
 case e: InterruptedException =>
 e.printStackTrace()
 }
 val state: StreamingContextState = ssc.getState
 val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
 if (bool) {
 if (state == StreamingContextState.ACTIVE) {
 ssc.stop(stopSparkContext = true, stopGracefully = true)
 System.exit(0)
 }
 }
 }
 }
}

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
 def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
 val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
Option[Int]) => {
 //当前批次内容的计算
 val sum: Int = values.sum
 //取出状态信息中上一次状态
  val lastStatu: Int = status.getOrElse(0)
 Some(sum + lastStatu)
 }
 val sparkConf: SparkConf = new
SparkConf().setMaster("local[4]").setAppName("SparkTest")
 //设置优雅的关闭
 sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
 val ssc = new StreamingContext(sparkConf, Seconds(5))
 ssc.checkpoint("./ck")
 val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
 val word: DStream[String] = line.flatMap(_.split(" "))
 val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
 val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
 wordAndCount.print()
 ssc
 }
 def main(args: Array[String]): Unit = {
 val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () =>
createSSC())
 new Thread(new MonitorStop(ssc)).start()
 ssc.start()
 ssc.awaitTermination()
 }
}

 

标签:val,Int,优雅,Streaming,new,apache,org,Spark,ssc
From: https://www.cnblogs.com/huifeidezhuzai/p/17984971

相关文章

  • 如何在word中优雅地插入代码
    如何在word中优雅地插入代码呢?网上的方法大致有这么几种:利用notepad++来实现(操作路径有点长,比较麻烦)自己在word做模版(这个模版折腾下来倒是可以一劳永逸,但是不支持不同语言的高亮)利用word的宏来实现,但需要写宏脚本(比较麻烦)国外的 www.planetb.ca 网站进去太慢了,体验很不......
  • HttpRetryException: cannot retry due to redirection, in streaming mode
     failcannotretryduetoredirection,instreamingmodeexecutingPOSThttps://vsp.allinpay.com/apiweb/gateway/payfeign.RetryableException:cannotretryduetoredirection,instreamingmodeexecutingPOSThttps://vsp.allinpay.com/apiweb/gateway/pay......
  • 别再混淆事件源(Event Sourcing)和消息流(Message Streaming)了!
    0前言Kafka不适合事件溯源,Kafka适合消息流。这两种事物需要不同存储机制。事件溯源(EventSourcing),需DB充当事件日志,为事件溯源存储的事件必须以某种方式编写,以便将来的读取能够快速组装属于单个聚合的较小(更小的)事件流最初发射它们的。这需要随机访问索引消息流(MessageS......
  • Spark介绍
    ApacheSpark是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)所开源的类HadoopMapReduce的通用并行计算框架,Spark拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不......
  • 实验 3 Spark 和 Hadoop 的安装
    (1)启动Hadoop,在HDFS中创建用户目录“/user/hadoop”;  (2)在Linux系统的本地文件系统的“/home/hadoop”目录下新建一个文本文件test.txt,并在该文件中随便输入一些内容,然后上传到HDFS的“/user/hadoop”目录下;  (3)把HDFS中“/user/hadoop”目录下的test.txt......
  • Spark SQL五大关联策略
    1、五种连接策略选择连接策略的核心原则是尽量避免shuffle和sort的操作,因为这些操作性能开销很大,比较吃资源且耗时,所以首选的连接策略是不需要shuffle和sort的hash连接策略。◦BroadcastHashJoin(BHJ):广播散列连接◦ShuffleHashJoin(SHJ):洗牌散列连接◦ShuffleSortMergeJoi......
  • springboot中优雅的个性定制化错误页面+源码解析
    boot项目的优点就是帮助我们简化了配置,并且为我们提供了一系列的扩展点供我们使用,其中不乏错误页面的个性化开发。理解错误响应流程我们来到org.springframework.boot.autoconfigure.web.servlet.error下的ErrorMvcAutoConfiguration这里面配置了错误响应的规则。主要介绍里面注册......
  • SpringBoot引入SpEL,优雅控制复杂权限!
    对于在Springboot中,利用自定义注解+切面来实现接口权限的控制这个大家应该都很熟悉,整体来说思路如下:自定义一个权限校验的注解,包含参数value配置在对应的接口上定义一个切面类,指定切点在切入的方法体里写上权限判断的逻辑然而,在实际的开发中,对于权限校验的需求场景是很多的,比如:傻眼......
  • Spark SQL的运行原理
    DataFrame、DataSet和SparkSQL的实际执行流程都是相同的:1.进行DataFrame/Dataset/SQL编程;2.如果是有效的代码,即代码没有编译错误,Spark会将其转换为一个逻辑计划;3.Spark将此逻辑计划转换为物理计划,同时进行代码优化;4.Spark然后在集群上执行这个物理计划(基于RDD操作)......
  • Spark Streaming工作原理
         说起SparkStreaming,玩大数据的没有不知道的,但对于小白来说还是有些生疏,所以本篇文章就来介绍一下SparkStreaming,以期让同行能更清楚地掌握SparkStreaming的原理。   一:什么是SparkStreaming   官方对于SparkStreaming的介绍是这样的(翻译过来的):Sp......