Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?
本节讲解Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?数据峰值及流量变化的不稳定有2个层面:1)第一个层面就是数据确实不稳定,例如晚上11点的时候访问流量特别高,相对其他时间而言表现为不稳定。2)第二个层面:数据是没问题的,数据流动的速度是匀速或接近于匀速,但是在处理的过程中发生了故障或者GC的时候耽误了时间,导致计算延迟,Web监控台上也会看到峰值的变化。这两种情况无论哪种情况,都不是一个稳定的系统能够避免的问题,解决这个问题有很多方式,本节提一个Spark本身自带的一个Backpressure机制,即数据的反压机制。
Backpressure机制的基本思想是根据上一次计算的Job信息来评估决定下一个Job数据接受的速度。根据上一个Job执行结束之后对Job的统计信息,例如Job延迟了多长时间等,基于这个统计信息,SparkStreaming的算法会评估下一个Batch Duration的时间窗口应该以什么样的速度接收数据。这里有一个问题,接收数据的速度怎么进行限制?接收数据的过程也是消费数据的过程,SparkStreaming在接收数据的时候必须把当前的数据接收完毕,才能接收下一条的数据,根据上一个作业执行的情况,根据自己的算法进行一个判断,下一个BatchDuration必须以什么样的速度接收数据,这就是所谓的Backpressure反压机制。本章节将透彻的讲解Backpressure反压机制如何实现。
在生产环境下,Spark Streaming有一个监控的平台,最简单的是使用SparkWeb View的原生页面,或者借助第三方的监控软件去监控,如监测到Spark Streaming以匀速的速度前进,表明SparkStreaming的处理是稳定的。如果在监控平台上监控Kafka剩余数据的情况,Kafka突然出现一个波峰,这个时候需考虑怎么去解决,如果SparkStreaming开启了Backpressure的机制,数据突然变大或者发生了GC,Backpressure能够动态改变消费的速度,对于Spark而言是非常有意义的。在过去的很多项目中,我们都会开启SparkStreaming的Backpressure反压机制。
从源码的角度,Backpressure反压机制从Driver来考虑,我们系统的看一下反压机制的内容。RateController继承至StreamingListener监听器,当一个Job完成的时候,StreamingListener是监听器的一种,肯定会进行注册,注册的时候会调用RateController。RateController有自己的子类,会调自己具体的业务。其中computeAndPublish方法计算新的限制速率并异步发布。
RateController.scala源代码:
1. private[streaming] abstractclass RateController(val streamUID: Int, rateEstimator:
2. RateEstimator) extendsStreamingListener with Serializable {
3. ……
4. private def computeAndPublish(time: Long,elems: Long, workDelay: Long, waitDelay: Long): Unit =
5. Future[Unit] {
6. val newRate = rateEstimator.compute(time,elems, workDelay, waitDelay)
7. newRate.foreach { s =>
8. rateLimit.set(s.toLong)
9. publish(getLatestRate())
10. }
11. },
我们先看一下RateController在什么时候启动和注册的?从原理的角度考虑什么时候启动RateController?RateController是在JobScheduler中启动的,为什么是在JobScheduler中,原因非常简单,JobScheduler知道作业什么时候完成,作业完成之后获得上一次作业的统计信息。
在Jobscheduler中查找RateController的实例化,每次作业完成的时候,Jobscheduler知道。这是Driver级别的,从start方法开始查找,从getInputStreams中把InputStreams关联上RateController,InputStreams有RateController。
JobScheduler.scala的start方法源代码:
1. def start(): Unit = synchronized {
2. if (eventLoop != null) return // schedulerhas already been started
3.
4. logDebug("Starting JobScheduler")
5. eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler") {
6. override protected def onReceive(event:JobSchedulerEvent): Unit = processEvent(event)
7.
8. override protected def one rror(e:Throwable): Unit = reportError("Error in job scheduler", e)
9. }
10. eventLoop.start()
11.
12. // attach rate controllers of input streamsto receive batch completion updates
13. for {
14. inputDStream <-ssc.graph.getInputStreams
15. rateController <-inputDStream.rateController
16. } ssc.addStreamingListener(rateController)
17.
18. listenerBus.start()
19. receiverTracker = new ReceiverTracker(ssc)
20. inputInfoTracker = newInputInfoTracker(ssc)
21.
22. val executorAllocClient:ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
23. case b: ExecutorAllocationClient =>b.asInstanceOf[ExecutorAllocationClient]
24. case _ => null
25. }
26.
27. executorAllocationManager =ExecutorAllocationManager.createIfEnabled(
28. executorAllocClient,
29. receiverTracker,
30. ssc.conf,
31. ssc.graph.batchDuration.milliseconds,
32. clock)
33. executorAllocationManager.foreach(ssc.addStreamingListener)
34. receiverTracker.start()
35. jobGenerator.start()
36. executorAllocationManager.foreach(_.start())
37. logInfo("Started JobScheduler")
38. }
我们先看一下InputStreams怎么来的,先看一下InputStreamsSuite的测试代码,通过ssc.socketTextStream创建一个networkStream:
InputStreamsSuite.scala源代码:
1. val networkStream = ssc.socketTextStream(
2. "localhost",testServer.port, StorageLevel.MEMORY_AND_DISK)
ssc.socketTextStream创建的是socketTextStream。
StreamingContext.scala的socketTextStream源代码:
1. defsocketTextStream(
2. hostname: String,
3. port: Int,
4. storageLevel: StorageLevel =StorageLevel.MEMORY_AND_DISK_SER_2
5. ): ReceiverInputDStream[String] =withNamedScope("socket text stream") {
6. socketStream[String](hostname, port,SocketReceiver.bytesToLines, storageLevel)
7. }
其中socketStream的源代码如下。
StreamingContext.scala的socketStream源代码:
1. defsocketStream[T: ClassTag](
2. hostname: String,
3. port: Int,
4. converter: (InputStream) =>Iterator[T],
5. storageLevel: StorageLevel
6. ): ReceiverInputDStream[T] = {
7. new SocketInputDStream[T](this, hostname,port, converter, storageLevel)
8. }
这里创建出SocketInputDStream,就是InputStreams的来源。SocketInputDStream中有一个getReceiver,在getReceiver方法中创建SocketReceiver。
SocketInputDStream.scala源代码:
1. private[streaming]
2. class SocketInputDStream[T:ClassTag](
3. _ssc: StreamingContext,
4. host: String,
5. port: Int,
6. bytesToObjects: InputStream =>Iterator[T],
7. storageLevel: StorageLevel
8. ) extends ReceiverInputDStream[T](_ssc) {
9.
10. def getReceiver(): Receiver[T] = {
11. new SocketReceiver(host, port,bytesToObjects, storageLevel)
12. }
13. }
SocketReceiver的onStart方法会一直不断的循环,循环进行 receive()。
SocketInputDStream.scala的SocketReceiver源代码:
1. private[streaming]
2. class SocketReceiver[T:ClassTag](
3. host: String,
4. port: Int,
5. bytesToObjects: InputStream =>Iterator[T],
6. storageLevel: StorageLevel
7. ) extends Receiver[T](storageLevel) withLogging {
8.
9. private var socket: Socket = _
10.
11. def onStart() {
12.
13. logInfo(s"Connecting to$host:$port")
14. try {
15. socket = new Socket(host, port)
16. } catch {
17. case e: ConnectException =>
18. restart(s"Error connecting to$host:$port", e)
19. return
20. }
21. logInfo(s"Connected to$host:$port")
22.
23. // Start the thread that receives data overa connection
24. new Thread("Socket Receiver") {
25. setDaemon(true)
26. override def run() { receive() }
27. }.start()
28. }
receive方法中socket.getInputStream()是接收到的数据,放入到iterator中,只要iterator不停止就会一直循环,将数据存储起来,数据存储不是一件简单的事情。注意:receive方法运行在Executor上。
SocketInputDStream.scala的receive方法源代码:
1. defreceive() {
2. try {
3. val iterator =bytesToObjects(socket.getInputStream())
4. while(!isStopped &&iterator.hasNext) {
5. store(iterator.next())
6. }
7. if (!isStopped()) {
8. restart("Socket data stream had nomore data")
9. } else {
10. logInfo("Stopped receiving")
11. }
12. } catch {
13. case NonFatal(e) =>
14. logWarning("Error receivingdata", e)
15. restart("Error receivingdata", e)
16. } finally {
17. onStop()
18. }
19. }
Receiver的store方法将单个接收到的数据存储到Spark内存中,这些单个数据在被放入Spark内存前将被合并到数据块。
Receiver.scala的源代码:
1. defstore(dataItem: T) {
2. supervisor.pushSingle(dataItem)
3. }
ReceiverSupervisor的pushSingle方法将单个数据项推到后端数据存储,这里无具体实现,需看ReceiverSupervisor具体子类ReceiverSupervisorImpl的实现。
ReceiverSupervisor.scala源代码:
1. def pushSingle(data: Any): Unit
子类ReceiverSupervisorImpl的pushSingle方法将一条记录放入defaultBlockGenerator中。
ReceiverSupervisorImpl.scala的源代码:
1. defpushSingle(data: Any) {
2. defaultBlockGenerator.addData(data)
3. }
defaultBlockGenerator的addData方法将单个数据项推入缓冲区。addData中有个关键的方法 waitToPush(), waitToPush()是关键点。
BlockGenerator.scala源代码:
1. def addData(data: Any): Unit = {
2. if (state == Active) {
3. waitToPush()
4. synchronized {
5. if (state == Active) {
6. currentBuffer += data
7. } else {
8. throw new SparkException(
9. "Cannot add data asBlockGenerator has not been started or has been stopped")
10. }
11. }
12. } else {
13. throw new SparkException(
14. "Cannot add data as BlockGeneratorhas not been started or has been stopped")
15. }
16. }
waitToPush方法里面调用rateLimiter.acquire()方法,如果addData的waitToPush方法不执行,则addData方法中在waitToPush方法之后的synchronized同步块代码将都不执行,数据不能存储。
RateLimiter.scala源代码:
1. private[receiver] abstract classRateLimiter(conf: SparkConf) extends Logging {
2.
3. // treated as an upper limit
4. private val maxRateLimit =conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
5. private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
6.
7. def waitToPush() {
8. rateLimiter.acquire()
9. }
RateLimiter.java位于com.google.common.util.concurrent包中,acquire方法从{@code RateLimiter}申请获取一个许可证,一直阻塞直到请求被授予。 其中调用acquire(1)方法,acquire(1)方法中reserve有个同步互斥信号量synchronized (mutex()),如果接收数据进行存储,需拿到一个许可证Ticket,如果没有这个令牌,就无法存储数据,也就限定了接收数据的速度。在receiver中有具体限定接收数据的方式。
RateLimiter.java的源代码:
1. public void acquire() {
2. acquire(1);
3. }
4. ......
5. public void acquire(int permits) {
6. checkPermits(permits);
7. long microsToWait;
8. synchronized (mutex) {
9. microsToWait =reserveNextTicket(permits, readSafeMicros());
10. }
11. ticker.sleepMicrosUninterruptibly(microsToWait);
12. }
回到Driver端的JobScheduler,JobScheduler在Start的时候,每个inputDStream都有一个rateController,for循环遍历获得rateController,然后将rateController交给上下文的ssc.addStreamingListener监听器,进行注册。这里listenerBus是scheduler级别的listenerBus,listenerBus收到相关的信息肯定会告诉监听器。
StreamingContext.scala的addStreamingListener源代码:
1. defaddStreamingListener(streamingListener: StreamingListener) {
2. scheduler.listenerBus.addListener(streamingListener)
3. }
4. ……
5. private[spark] val listeners =new CopyOnWriteArrayList[L]
6. …….
7. final def addListener(listener: L): Unit = {
8. listeners.add(listener)
9. }
其中listenerBus是StreamingListenerBus,StreamingListenerBus在什么时候启动的?在构建JobScheduler的时候会获取StreamingListenerBus的实例。
JobScheduler.scala源代码:
1. val listenerBus = newStreamingListenerBus(ssc.sparkContext.listenerBus)
StreamingListenerBus中有很多事件,包括receiverStarted、receiverError、receiverStopped等事件。其中batchStarted是batch处理开始,batchSubmitted是batch提交,而batchCompleted比较关键。
StreamingListenerBus.scala源代码:
1. private[streaming] classStreamingListenerBus(sparkListenerBus: LiveListenerBus)
2. extends SparkListener withListenerBus[StreamingListener, StreamingListenerEvent] {
3. ......
4. protected override defdoPostEvent(
5. listener: StreamingListener,
6. event: StreamingListenerEvent): Unit = {
7. event match {
8. case receiverStarted:StreamingListenerReceiverStarted =>
9. listener.onReceiverStarted(receiverStarted)
10. case receiverError:StreamingListenerReceiverError =>
11. listener.onReceiverError(receiverError)
12. case receiverStopped:StreamingListenerReceiverStopped =>
13. listener.onReceiverStopped(receiverStopped)
14. case batchSubmitted:StreamingListenerBatchSubmitted =>
15. listener.onBatchSubmitted(batchSubmitted)
16. case batchStarted:StreamingListenerBatchStarted =>
17. listener.onBatchStarted(batchStarted)
18. case batchCompleted: StreamingListenerBatchCompleted=>
19. listener.onBatchCompleted(batchCompleted)
20. case outputOperationStarted:StreamingListenerOutputOperationStarted =>
21. listener.onOutputOperationStarted(outputOperationStarted)
22. case outputOperationCompleted: StreamingListenerOutputOperationCompleted=>
23. listener.onOutputOperationCompleted(outputOperationCompleted)
24. case streamingStarted:StreamingListenerStreamingStarted =>
25. listener.onStreamingStarted(streamingStarted)
26. case _ =>
27. }
28. }
batchCompleted的时候,从StreamingListener的角度讲,StreamingListener是一个trait,onBatchCompleted方法中无具体实现。具体方法在子类实现,StreamingListener有很多具体的子类,其中一个子类是RateController。
StreamingListener.scala源代码:
1. trait StreamingListener {
2. ……
3. defonBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
子类RateController会调用onBatchCompleted方法,onBatchCompleted方法是个关键点。获取batchInfo的streamIdToInputInfo作为elements,for循环遍历获取workDelay、waitDelay等信息,这些信息对于第三方的监控也是非常重要的,然后通过computeAndPublish方法发布信息。在computeAndPublish方法中,rateEstimator根据传入的时间、elems、workDelay、waitDelay等信息评估新的更加合适的Rate,在newRate.foreach循环遍历中,将值赋值到rateLimit,然后publish进行发布,这里publish方法无具体实现,RateController的子类是ReceiverRateController,其publish方法将新的速率交给receiverTracker。
RateController.scala源代码:
1. override def onBatchCompleted(batchCompleted:StreamingListenerBatchCompleted) {
2. val elements =batchCompleted.batchInfo.streamIdToInputInfo
3.
4. for {
5. processingEnd <-batchCompleted.batchInfo.processingEndTime
6. workDelay <-batchCompleted.batchInfo.processingDelay
7. waitDelay <- batchCompleted.batchInfo.schedulingDelay
8. elems <-elements.get(streamUID).map(_.numRecords)
9. } computeAndPublish(processingEnd, elems,workDelay, waitDelay)
10. }
11. }
12. ……
13. protected def publish(rate:Long): Unit
在ReceiverInputDStream子类中,新的控制速率交给receiverTracker,然后receiverTracker调用sendRateUpdate方法。因为有很多RateController,因此这里会设置一个具体的ID。
ReceiverInputDStream.scala源代码
1. private[streaming] classReceiverRateController(id: Int, estimator: RateEstimator)
2. extends RateController(id, estimator) {
3. override def publish(rate: Long): Unit =
4. ssc.scheduler.receiverTracker.sendRateUpdate(id,rate)
5. }
在sendRateUpdate方法中,将ID传进去,这里变成了streamUID,每个Stream会attach一个RateController,然后endpoint调用send方法发送UpdateReceiverRateLimit(streamUID,newRate)信息。
ReceiverTracker.scala源代码:
1. defsendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
2. if (isTrackerStarted) {
3. endpoint.send(UpdateReceiverRateLimit(streamUID,newRate))
4. }
5. }
endpoint.send是RPC的通信机制,endpoint在哪里实例化的?在start方法中通过ssc.env.rpcEnv.setupEndpoint方法创建endpoint实例,创建一个ReceiverTrackerEndpoint。然后在ReceiverTrackerEndpoint中接收UpdateReceiverRateLimit消息 ,从receiverTrackingInfos中根据streamUID获取info的相关信息,然后基于info获取endpoint 的通信句柄,endpoint就是ReceiverTrackingInfo的Rpc通信实体。
ReceiverTracker.scala源代码:
1. private var endpoint: RpcEndpointRef = null
2. ……
3. def start(): Unit =synchronized {
4. if (isTrackerStarted) {
5. throw newSparkException("ReceiverTracker already started")
6. }
7.
8. if (!receiverInputStreams.isEmpty) {
9. endpoint = ssc.env.rpcEnv.setupEndpoint(
10. "ReceiverTracker", newReceiverTrackerEndpoint(ssc.env.rpcEnv))
11. if (!skipReceiverLaunch)launchReceivers()
12. logInfo("ReceiverTrackerstarted")
13. trackerState = Started
14. }
15. }
16. ……
17. private classReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extendsThreadSafeRpcEndpoint {
18. ......
19. case UpdateReceiverRateLimit(streamUID,newRate) =>
20. for (info <-receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
21. eP.send(UpdateRateLimit(newRate))
22. }
ReceiverTracker.scala在HashMap数据结构中根据ID获取ReceiverTrackingInfo信息。receiverTrackingInfos跟踪所有接收者的信息,关键是接收者ID,其值是接收者的信息,只能被ReceiverTrackerEndpoint访问。
ReceiverTracker.scala
1. privateval receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
ReceiverTrackingInfo的成员变量中有一个endpoint,是Option[RpcEndpointRef]类型。
ReceiverTrackingInfo.scala源代码:
2. private[streaming] case classReceiverTrackingInfo(
3. receiverId: Int,
4. state: ReceiverState,
5. scheduledLocations:Option[Seq[TaskLocation]],
6. runningExecutor: Option[ExecutorCacheTaskLocation],
7. name: Option[String] = None,
8. endpoint: Option[RpcEndpointRef] = None,
9. errorInfo: Option[ReceiverErrorInfo] =None) {
在ReceiverSupervisorImpl.scala中查找一下内部的RPC,这里有endpoint,endpoint是注册上去的,RpcEndpointRef从Driver的ReceiverTracker 接收消息,这里包括了UpdateRateLimit消息,接收到UpdateRateLimit消息以后,在registeredBlockGenerators调用updateRate方法。
ReceiverSupervisorImpl.scala
1. private[streaming] class ReceiverSupervisorImpl(
2. receiver: Receiver[_],
3. env: SparkEnv,
4. hadoopConf: Configuration,
5. checkpointDirOption: Option[String]
6. ) extends ReceiverSupervisor(receiver,env.conf) with Logging {
7. …….
8. private val endpoint =env.rpcEnv.setupEndpoint(
9. "Receiver-" + streamId +"-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
10. override val rpcEnv: RpcEnv = env.rpcEnv
11.
12. override def receive:PartialFunction[Any, Unit] = {
13. case StopReceiver =>
14. logInfo("Received stopsignal")
15. ReceiverSupervisorImpl.this.stop("Stoppedby driver", None)
16. case CleanupOldBlocks(threshTime) =>
17. logDebug("Received delete oldbatch signal")
18. cleanupOldBlocks(threshTime)
19. case UpdateRateLimit(eps) =>
20. logInfo(s"Received a new ratelimit: $eps.")
21. registeredBlockGenerators.asScala.foreach{ bg =>
22. bg.updateRate(eps)
23. }
24. }
25. })
updateRate方法中会设置新的接收速率,但不会超过最大速率spark.streaming.receiver.maxRate。启动Spark Streaming的Backpressure机制,要设置最大速率spark.streaming.receiver.maxRate ,这是安全保障机制,因为有时集群处理确实有限,Backpressure反压机制可以让接收速度获得极大的提升,但是要量力而行;另外一方面,Spark Streaming运行在Yarn上,资源可能有限定,不是指物理资源的限定,而是指Spark Streaming可能只能用50%的资源,所以这里设置maxRate,获得新的速率newRate以后,在rateLimiter进行设置。如果newRate大于0,maxRateLimit大于0,取newRate及maxRateLimit的最小值作为新的速率。
RateLimiter.scala的源代码:
1. private[receiver] def updateRate(newRate:Long): Unit =
2. if (newRate > 0) {
3. if (maxRateLimit > 0) {
4. rateLimiter.setRate(newRate.min(maxRateLimit))
5. } else {
6. rateLimiter.setRate(newRate)
7. }
8. }
RateLimiter.java 的setRate方法中加了一个互斥信号锁,获取每秒钟能接收多少条记录及处理多少条记录,然后调用doSetRate方法根据已有的数据和最新的数据设置速率,doSetRate交给具体的子类实现。
com.google.common.util.concurrent.RateLimiter.java的源代码:
1. public final voidsetRate(double permitsPerSecond) {
2. Preconditions.checkArgument(permitsPerSecond> 0.0
3. &&!Double.isNaN(permitsPerSecond), "rate must be positive");
4. synchronized (mutex) {
5. resync(readSafeMicros());
6. double stableIntervalMicros =TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
7. this.stableIntervalMicros =stableIntervalMicros;
8. doSetRate(permitsPerSecond,stableIntervalMicros);
9. }
10. }
11.
12. abstract void doSetRate(doublepermitsPerSecond, double stableIntervalMicros);
doSetRate方法由具体的子类实现,Bursty类是RateLimiter的子类,com.google.common.util.concurrent.RateLimiter.java是Google提供的,源码在guava-14.0.1-sources中。在存储数据的时候,必须根据速率rate存储数据,这就是本章节谈的Backpressure机制。整体思路较简单,每次计算BatchDuration Job执行完成的时候,就有JobCompleted的统计信息发回来,根据统计信息计算新的Rate,将新的rate进行远程通信交给Executor,Executor根据接收到的信息重新设置Rate,每次接收数据的时候肯定根据Rate决定每秒接收多少数据。这样就动态改变了速度,每次计算上一个作业的执行情况。
com.google.common.util.concurrent.RateLimiter.java的源代码:
1. private static class Burstyextends RateLimiter {
2. Bursty(SleepingTicker ticker) {
3. super(ticker);
4. }
5.
6. @Override
7. void doSetRate(double permitsPerSecond,double stableIntervalMicros) {
8. double oldMaxPermits =this.maxPermits;
9. /*
10. * We allow the equivalent work ofup to one second to be granted with zero waiting, if the
11. * rate limiter has been unused foras much. This is to avoid potentially producing tiny
12. * wait interval between subsequentrequests for sufficiently large rates, which would
13. * unnecessarily overconstrain thethread scheduler.
14. */
15. maxPermits = permitsPerSecond; //one second worth of permits
16. storedPermits = (oldMaxPermits ==0.0)
17. ? 0.0 // initial state
18. : storedPermits *maxPermits / oldMaxPermits;
19. }
20.
21. @Override
22. long storedPermitsToWaitTime(doublestoredPermits, double permitsToTake) {
23. return 0L;
24. }
25. }