流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。
其实就是单独起一个线程专门去专门查找程序是否停止的标志
import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.streaming.{StreamingContext, StreamingContextState} class MonitorStop(ssc: StreamingContext) extends Runnable { override def run(): Unit = { val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "atguigu") while (true) { try Thread.sleep(5000) catch { case e: InterruptedException => e.printStackTrace() } val state: StreamingContextState = ssc.getState val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark")) if (bool) { if (state == StreamingContextState.ACTIVE) { ssc.stop(stopSparkContext = true, stopGracefully = true) System.exit(0) } } } } }
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkTest {
def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
Option[Int]) => {
//当前批次内容的计算
val sum: Int = values.sum
//取出状态信息中上一次状态
val lastStatu: Int = status.getOrElse(0)
Some(sum + lastStatu)
}
val sparkConf: SparkConf = new
SparkConf().setMaster("local[4]").setAppName("SparkTest")
//设置优雅的关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("./ck")
val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
val word: DStream[String] = line.flatMap(_.split(" "))
val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
wordAndCount.print()
ssc
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () =>
createSSC())
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
}
标签:val,Int,优雅,Streaming,new,apache,org,Spark,ssc From: https://www.cnblogs.com/huifeidezhuzai/p/17984971