首页 > 其他分享 >Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

时间:2023-04-03 15:05:41浏览次数:34  
标签:case scala RateController Streaming 下应 Spark 源代码 方法 def


Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

本节讲解Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?数据峰值及流量变化的不稳定有2个层面:1)第一个层面就是数据确实不稳定,例如晚上11点的时候访问流量特别高,相对其他时间而言表现为不稳定。2)第二个层面:数据是没问题的,数据流动的速度是匀速或接近于匀速,但是在处理的过程中发生了故障或者GC的时候耽误了时间,导致计算延迟,Web监控台上也会看到峰值的变化。这两种情况无论哪种情况,都不是一个稳定的系统能够避免的问题,解决这个问题有很多方式,本节提一个Spark本身自带的一个Backpressure机制,即数据的反压机制。

Backpressure机制的基本思想是根据上一次计算的Job信息来评估决定下一个Job数据接受的速度。根据上一个Job执行结束之后对Job的统计信息,例如Job延迟了多长时间等,基于这个统计信息,SparkStreaming的算法会评估下一个Batch Duration的时间窗口应该以什么样的速度接收数据。这里有一个问题,接收数据的速度怎么进行限制?接收数据的过程也是消费数据的过程,SparkStreaming在接收数据的时候必须把当前的数据接收完毕,才能接收下一条的数据,根据上一个作业执行的情况,根据自己的算法进行一个判断,下一个BatchDuration必须以什么样的速度接收数据,这就是所谓的Backpressure反压机制。本章节将透彻的讲解Backpressure反压机制如何实现。

在生产环境下,Spark Streaming有一个监控的平台,最简单的是使用SparkWeb View的原生页面,或者借助第三方的监控软件去监控,如监测到Spark Streaming以匀速的速度前进,表明SparkStreaming的处理是稳定的。如果在监控平台上监控Kafka剩余数据的情况,Kafka突然出现一个波峰,这个时候需考虑怎么去解决,如果SparkStreaming开启了Backpressure的机制,数据突然变大或者发生了GC,Backpressure能够动态改变消费的速度,对于Spark而言是非常有意义的。在过去的很多项目中,我们都会开启SparkStreaming的Backpressure反压机制。

从源码的角度,Backpressure反压机制从Driver来考虑,我们系统的看一下反压机制的内容。RateController继承至StreamingListener监听器,当一个Job完成的时候,StreamingListener是监听器的一种,肯定会进行注册,注册的时候会调用RateController。RateController有自己的子类,会调自己具体的业务。其中computeAndPublish方法计算新的限制速率并异步发布。

RateController.scala源代码:

1.         private[streaming] abstractclass RateController(val streamUID: Int, rateEstimator:

2.         RateEstimator) extendsStreamingListener with Serializable {

3.         ……

4.           private def computeAndPublish(time: Long,elems: Long, workDelay: Long, waitDelay: Long): Unit =

5.             Future[Unit] {

6.               val newRate = rateEstimator.compute(time,elems, workDelay, waitDelay)

7.               newRate.foreach { s =>

8.                 rateLimit.set(s.toLong)

9.                 publish(getLatestRate())

10.            }

11.          },



我们先看一下RateController在什么时候启动和注册的?从原理的角度考虑什么时候启动RateController?RateController是在JobScheduler中启动的,为什么是在JobScheduler中,原因非常简单,JobScheduler知道作业什么时候完成,作业完成之后获得上一次作业的统计信息。

在Jobscheduler中查找RateController的实例化,每次作业完成的时候,Jobscheduler知道。这是Driver级别的,从start方法开始查找,从getInputStreams中把InputStreams关联上RateController,InputStreams有RateController。

JobScheduler.scala的start方法源代码:

1.          def start(): Unit = synchronized {

2.             if (eventLoop != null) return // schedulerhas already been started

3.          

4.             logDebug("Starting JobScheduler")

5.             eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler") {

6.               override protected def onReceive(event:JobSchedulerEvent): Unit = processEvent(event)

7.          

8.               override protected def one rror(e:Throwable): Unit = reportError("Error in job scheduler", e)

9.             }

10.          eventLoop.start()

11.       

12.          // attach rate controllers of input streamsto receive batch completion updates

13.          for {

14.            inputDStream <-ssc.graph.getInputStreams

15.            rateController <-inputDStream.rateController

16.          } ssc.addStreamingListener(rateController)

17.       

18.          listenerBus.start()

19.          receiverTracker = new ReceiverTracker(ssc)

20.          inputInfoTracker = newInputInfoTracker(ssc)

21.       

22.          val executorAllocClient:ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {

23.            case b: ExecutorAllocationClient =>b.asInstanceOf[ExecutorAllocationClient]

24.            case _ => null

25.          }

26.       

27.          executorAllocationManager =ExecutorAllocationManager.createIfEnabled(

28.            executorAllocClient,

29.            receiverTracker,

30.            ssc.conf,

31.            ssc.graph.batchDuration.milliseconds,

32.            clock)

33.          executorAllocationManager.foreach(ssc.addStreamingListener)

34.          receiverTracker.start()

35.          jobGenerator.start()

36.          executorAllocationManager.foreach(_.start())

37.          logInfo("Started JobScheduler")

38.        }



 

我们先看一下InputStreams怎么来的,先看一下InputStreamsSuite的测试代码,通过ssc.socketTextStream创建一个networkStream:

InputStreamsSuite.scala源代码:

1.          val networkStream = ssc.socketTextStream(

2.                   "localhost",testServer.port, StorageLevel.MEMORY_AND_DISK)

 

ssc.socketTextStream创建的是socketTextStream。

StreamingContext.scala的socketTextStream源代码:

1.            defsocketTextStream(

2.               hostname: String,

3.               port: Int,

4.               storageLevel: StorageLevel =StorageLevel.MEMORY_AND_DISK_SER_2

5.             ): ReceiverInputDStream[String] =withNamedScope("socket text stream") {

6.             socketStream[String](hostname, port,SocketReceiver.bytesToLines, storageLevel)

7.           }

 

   其中socketStream的源代码如下。

   StreamingContext.scala的socketStream源代码:

1.            defsocketStream[T: ClassTag](

2.               hostname: String,

3.               port: Int,

4.               converter: (InputStream) =>Iterator[T],

5.               storageLevel: StorageLevel

6.             ): ReceiverInputDStream[T] = {

7.             new SocketInputDStream[T](this, hostname,port, converter, storageLevel)

8.           }

 

这里创建出SocketInputDStream,就是InputStreams的来源。SocketInputDStream中有一个getReceiver,在getReceiver方法中创建SocketReceiver。

SocketInputDStream.scala源代码:

1.          private[streaming]

2.         class SocketInputDStream[T:ClassTag](

3.             _ssc: StreamingContext,

4.             host: String,

5.             port: Int,

6.             bytesToObjects: InputStream =>Iterator[T],

7.             storageLevel: StorageLevel

8.           ) extends ReceiverInputDStream[T](_ssc) {

9.          

10.        def getReceiver(): Receiver[T] = {

11.          new SocketReceiver(host, port,bytesToObjects, storageLevel)

12.        }

13.      }

 

SocketReceiver的onStart方法会一直不断的循环,循环进行 receive()。

SocketInputDStream.scala的SocketReceiver源代码:

1.          private[streaming]

2.         class SocketReceiver[T:ClassTag](

3.             host: String,

4.             port: Int,

5.             bytesToObjects: InputStream =>Iterator[T],

6.             storageLevel: StorageLevel

7.           ) extends Receiver[T](storageLevel) withLogging {

8.          

9.           private var socket: Socket = _

10.       

11.        def onStart() {

12.       

13.          logInfo(s"Connecting to$host:$port")

14.          try {

15.            socket = new Socket(host, port)

16.          } catch {

17.            case e: ConnectException =>

18.              restart(s"Error connecting to$host:$port", e)

19.              return

20.          }

21.          logInfo(s"Connected to$host:$port")

22.       

23.          // Start the thread that receives data overa connection

24.          new Thread("Socket Receiver") {

25.            setDaemon(true)

26.            override def run() { receive() }

27.          }.start()

28.        }

 

receive方法中socket.getInputStream()是接收到的数据,放入到iterator中,只要iterator不停止就会一直循环,将数据存储起来,数据存储不是一件简单的事情。注意:receive方法运行在Executor上。

SocketInputDStream.scala的receive方法源代码:

1.            defreceive() {

2.             try {

3.               val iterator =bytesToObjects(socket.getInputStream())

4.               while(!isStopped &&iterator.hasNext) {

5.                 store(iterator.next())

6.               }

7.               if (!isStopped()) {

8.                 restart("Socket data stream had nomore data")

9.               } else {

10.              logInfo("Stopped receiving")

11.            }

12.          } catch {

13.            case NonFatal(e) =>

14.              logWarning("Error receivingdata", e)

15.              restart("Error receivingdata", e)

16.          } finally {

17.            onStop()

18.          }

19.        }

 

 Receiver的store方法将单个接收到的数据存储到Spark内存中,这些单个数据在被放入Spark内存前将被合并到数据块。

 Receiver.scala的源代码:

1.            defstore(dataItem: T) {

2.             supervisor.pushSingle(dataItem)

3.           }

 

ReceiverSupervisor的pushSingle方法将单个数据项推到后端数据存储,这里无具体实现,需看ReceiverSupervisor具体子类ReceiverSupervisorImpl的实现。

ReceiverSupervisor.scala源代码:

1.          def pushSingle(data: Any): Unit

 

子类ReceiverSupervisorImpl的pushSingle方法将一条记录放入defaultBlockGenerator中。

ReceiverSupervisorImpl.scala的源代码:

1.            defpushSingle(data: Any) {

2.             defaultBlockGenerator.addData(data)

3.           }

 

defaultBlockGenerator的addData方法将单个数据项推入缓冲区。addData中有个关键的方法 waitToPush(), waitToPush()是关键点。

BlockGenerator.scala源代码:

1.          def addData(data: Any): Unit = {

2.             if (state == Active) {

3.               waitToPush()

4.               synchronized {

5.                 if (state == Active) {

6.                   currentBuffer += data

7.                 } else {

8.                   throw new SparkException(

9.                     "Cannot add data asBlockGenerator has not been started or has been stopped")

10.              }

11.            }

12.          } else {

13.            throw new SparkException(

14.              "Cannot add data as BlockGeneratorhas not been started or has been stopped")

15.          }

16.        }

 

waitToPush方法里面调用rateLimiter.acquire()方法,如果addData的waitToPush方法不执行,则addData方法中在waitToPush方法之后的synchronized同步块代码将都不执行,数据不能存储。

RateLimiter.scala源代码:

1.           private[receiver] abstract classRateLimiter(conf: SparkConf) extends Logging {

2.          

3.           // treated as an upper limit

4.           private val maxRateLimit =conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)

5.           private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)

6.          

7.           def waitToPush() {

8.             rateLimiter.acquire()

9.           }



 

RateLimiter.java位于com.google.common.util.concurrent包中,acquire方法从{@code RateLimiter}申请获取一个许可证,一直阻塞直到请求被授予。 其中调用acquire(1)方法,acquire(1)方法中reserve有个同步互斥信号量synchronized (mutex()),如果接收数据进行存储,需拿到一个许可证Ticket,如果没有这个令牌,就无法存储数据,也就限定了接收数据的速度。在receiver中有具体限定接收数据的方式。

RateLimiter.java的源代码:

1.         public void acquire() {

2.                 acquire(1);

3.             }

4.         ......

5.             public void acquire(int permits) {

6.                 checkPermits(permits);

7.                 long microsToWait;

8.                 synchronized (mutex) {

9.                     microsToWait =reserveNextTicket(permits, readSafeMicros());

10.              }

11.              ticker.sleepMicrosUninterruptibly(microsToWait);

12.          }



 回到Driver端的JobScheduler,JobScheduler在Start的时候,每个inputDStream都有一个rateController,for循环遍历获得rateController,然后将rateController交给上下文的ssc.addStreamingListener监听器,进行注册。这里listenerBus是scheduler级别的listenerBus,listenerBus收到相关的信息肯定会告诉监听器。

 

StreamingContext.scala的addStreamingListener源代码:

1.            defaddStreamingListener(streamingListener: StreamingListener) {

2.             scheduler.listenerBus.addListener(streamingListener)

3.           }

4.         ……

5.         private[spark] val listeners =new CopyOnWriteArrayList[L]

6.         …….

7.           final def addListener(listener: L): Unit = {

8.             listeners.add(listener)

9.           }

 

其中listenerBus是StreamingListenerBus,StreamingListenerBus在什么时候启动的?在构建JobScheduler的时候会获取StreamingListenerBus的实例。

JobScheduler.scala源代码:

1.         val listenerBus = newStreamingListenerBus(ssc.sparkContext.listenerBus)

 

StreamingListenerBus中有很多事件,包括receiverStarted、receiverError、receiverStopped等事件。其中batchStarted是batch处理开始,batchSubmitted是batch提交,而batchCompleted比较关键。

StreamingListenerBus.scala源代码:

1.          private[streaming] classStreamingListenerBus(sparkListenerBus: LiveListenerBus)

2.           extends SparkListener withListenerBus[StreamingListener, StreamingListenerEvent] {

3.         ......

4.         protected override defdoPostEvent(

5.               listener: StreamingListener,

6.               event: StreamingListenerEvent): Unit = {

7.             event match {

8.               case receiverStarted:StreamingListenerReceiverStarted =>

9.                 listener.onReceiverStarted(receiverStarted)

10.            case receiverError:StreamingListenerReceiverError =>

11.              listener.onReceiverError(receiverError)

12.            case receiverStopped:StreamingListenerReceiverStopped =>

13.              listener.onReceiverStopped(receiverStopped)

14.            case batchSubmitted:StreamingListenerBatchSubmitted =>

15.              listener.onBatchSubmitted(batchSubmitted)

16.            case batchStarted:StreamingListenerBatchStarted =>

17.              listener.onBatchStarted(batchStarted)

18.            case batchCompleted: StreamingListenerBatchCompleted=>

19.              listener.onBatchCompleted(batchCompleted)

20.            case outputOperationStarted:StreamingListenerOutputOperationStarted =>

21.              listener.onOutputOperationStarted(outputOperationStarted)

22.            case outputOperationCompleted: StreamingListenerOutputOperationCompleted=>

23.              listener.onOutputOperationCompleted(outputOperationCompleted)

24.            case streamingStarted:StreamingListenerStreamingStarted =>

25.              listener.onStreamingStarted(streamingStarted)

26.            case _ =>

27.          }

28.        }


batchCompleted的时候,从StreamingListener的角度讲,StreamingListener是一个trait,onBatchCompleted方法中无具体实现。具体方法在子类实现,StreamingListener有很多具体的子类,其中一个子类是RateController。

StreamingListener.scala源代码:

1.         trait StreamingListener {

2.         ……

3.         defonBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }



子类RateController会调用onBatchCompleted方法,onBatchCompleted方法是个关键点。获取batchInfo的streamIdToInputInfo作为elements,for循环遍历获取workDelay、waitDelay等信息,这些信息对于第三方的监控也是非常重要的,然后通过computeAndPublish方法发布信息。在computeAndPublish方法中,rateEstimator根据传入的时间、elems、workDelay、waitDelay等信息评估新的更加合适的Rate,在newRate.foreach循环遍历中,将值赋值到rateLimit,然后publish进行发布,这里publish方法无具体实现,RateController的子类是ReceiverRateController,其publish方法将新的速率交给receiverTracker。

RateController.scala源代码:

1.            override def onBatchCompleted(batchCompleted:StreamingListenerBatchCompleted) {

2.             val elements =batchCompleted.batchInfo.streamIdToInputInfo

3.          

4.             for {

5.               processingEnd <-batchCompleted.batchInfo.processingEndTime

6.               workDelay <-batchCompleted.batchInfo.processingDelay

7.               waitDelay <- batchCompleted.batchInfo.schedulingDelay

8.               elems <-elements.get(streamUID).map(_.numRecords)

9.             } computeAndPublish(processingEnd, elems,workDelay, waitDelay)

10.        }

11.      }

12.      ……

13.      protected def publish(rate:Long): Unit

 

在ReceiverInputDStream子类中,新的控制速率交给receiverTracker,然后receiverTracker调用sendRateUpdate方法。因为有很多RateController,因此这里会设置一个具体的ID。

ReceiverInputDStream.scala源代码

1.           private[streaming] classReceiverRateController(id: Int, estimator: RateEstimator)

2.               extends RateController(id, estimator) {

3.             override def publish(rate: Long): Unit =

4.               ssc.scheduler.receiverTracker.sendRateUpdate(id,rate)

5.           }

 

在sendRateUpdate方法中,将ID传进去,这里变成了streamUID,每个Stream会attach一个RateController,然后endpoint调用send方法发送UpdateReceiverRateLimit(streamUID,newRate)信息。

ReceiverTracker.scala源代码:

1.            defsendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {

2.             if (isTrackerStarted) {

3.               endpoint.send(UpdateReceiverRateLimit(streamUID,newRate))

4.             }

5.           }



 

endpoint.send是RPC的通信机制,endpoint在哪里实例化的?在start方法中通过ssc.env.rpcEnv.setupEndpoint方法创建endpoint实例,创建一个ReceiverTrackerEndpoint。然后在ReceiverTrackerEndpoint中接收UpdateReceiverRateLimit消息 ,从receiverTrackingInfos中根据streamUID获取info的相关信息,然后基于info获取endpoint 的通信句柄,endpoint就是ReceiverTrackingInfo的Rpc通信实体。

ReceiverTracker.scala源代码:

1.          private var endpoint: RpcEndpointRef = null

2.         ……

3.         def start(): Unit =synchronized {

4.             if (isTrackerStarted) {

5.               throw newSparkException("ReceiverTracker already started")

6.             }

7.          

8.             if (!receiverInputStreams.isEmpty) {

9.               endpoint = ssc.env.rpcEnv.setupEndpoint(

10.              "ReceiverTracker", newReceiverTrackerEndpoint(ssc.env.rpcEnv))

11.            if (!skipReceiverLaunch)launchReceivers()

12.            logInfo("ReceiverTrackerstarted")

13.            trackerState = Started

14.          }

15.        }

16.      ……

17.      private classReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extendsThreadSafeRpcEndpoint {

18.      ......

19.         case UpdateReceiverRateLimit(streamUID,newRate) =>

20.              for (info <-receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {

21.                eP.send(UpdateRateLimit(newRate))

22.              }

 

ReceiverTracker.scala在HashMap数据结构中根据ID获取ReceiverTrackingInfo信息。receiverTrackingInfos跟踪所有接收者的信息,关键是接收者ID,其值是接收者的信息,只能被ReceiverTrackerEndpoint访问。

ReceiverTracker.scala

1.           privateval receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

 

ReceiverTrackingInfo的成员变量中有一个endpoint,是Option[RpcEndpointRef]类型。

ReceiverTrackingInfo.scala源代码:

2.          private[streaming] case classReceiverTrackingInfo(

3.             receiverId: Int,

4.             state: ReceiverState,

5.             scheduledLocations:Option[Seq[TaskLocation]],

6.             runningExecutor: Option[ExecutorCacheTaskLocation],

7.             name: Option[String] = None,

8.             endpoint: Option[RpcEndpointRef] = None,

9.             errorInfo: Option[ReceiverErrorInfo] =None) {

 

在ReceiverSupervisorImpl.scala中查找一下内部的RPC,这里有endpoint,endpoint是注册上去的,RpcEndpointRef从Driver的ReceiverTracker 接收消息,这里包括了UpdateRateLimit消息,接收到UpdateRateLimit消息以后,在registeredBlockGenerators调用updateRate方法。

ReceiverSupervisorImpl.scala

1.         private[streaming] class ReceiverSupervisorImpl(

2.             receiver: Receiver[_],

3.             env: SparkEnv,

4.             hadoopConf: Configuration,

5.             checkpointDirOption: Option[String]

6.           ) extends ReceiverSupervisor(receiver,env.conf) with Logging {

7.         …….

8.          private val endpoint =env.rpcEnv.setupEndpoint(

9.             "Receiver-" + streamId +"-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {

10.            override val rpcEnv: RpcEnv = env.rpcEnv

11.       

12.            override def receive:PartialFunction[Any, Unit] = {

13.              case StopReceiver =>

14.                logInfo("Received stopsignal")

15.                ReceiverSupervisorImpl.this.stop("Stoppedby driver", None)

16.              case CleanupOldBlocks(threshTime) =>

17.                logDebug("Received delete oldbatch signal")

18.                cleanupOldBlocks(threshTime)

19.              case UpdateRateLimit(eps) =>

20.                logInfo(s"Received a new ratelimit: $eps.")

21.                registeredBlockGenerators.asScala.foreach{ bg =>

22.                  bg.updateRate(eps)

23.                }

24.            }

25.          })

 

updateRate方法中会设置新的接收速率,但不会超过最大速率spark.streaming.receiver.maxRate。启动Spark Streaming的Backpressure机制,要设置最大速率spark.streaming.receiver.maxRate ,这是安全保障机制,因为有时集群处理确实有限,Backpressure反压机制可以让接收速度获得极大的提升,但是要量力而行;另外一方面,Spark Streaming运行在Yarn上,资源可能有限定,不是指物理资源的限定,而是指Spark Streaming可能只能用50%的资源,所以这里设置maxRate,获得新的速率newRate以后,在rateLimiter进行设置。如果newRate大于0,maxRateLimit大于0,取newRate及maxRateLimit的最小值作为新的速率。

RateLimiter.scala的源代码:

1.            private[receiver] def updateRate(newRate:Long): Unit =

2.             if (newRate > 0) {

3.               if (maxRateLimit > 0) {

4.                 rateLimiter.setRate(newRate.min(maxRateLimit))

5.               } else {

6.                 rateLimiter.setRate(newRate)

7.               }

8.             }

 

RateLimiter.java 的setRate方法中加了一个互斥信号锁,获取每秒钟能接收多少条记录及处理多少条记录,然后调用doSetRate方法根据已有的数据和最新的数据设置速率,doSetRate交给具体的子类实现。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         public final voidsetRate(double permitsPerSecond) {

2.                 Preconditions.checkArgument(permitsPerSecond> 0.0

3.                         &&!Double.isNaN(permitsPerSecond), "rate must be positive");

4.                 synchronized (mutex) {

5.                     resync(readSafeMicros());

6.                     double stableIntervalMicros =TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;

7.                     this.stableIntervalMicros =stableIntervalMicros;

8.                     doSetRate(permitsPerSecond,stableIntervalMicros);

9.                 }

10.          }

11.       

12.          abstract void doSetRate(doublepermitsPerSecond, double stableIntervalMicros);

 

doSetRate方法由具体的子类实现,Bursty类是RateLimiter的子类,com.google.common.util.concurrent.RateLimiter.java是Google提供的,源码在guava-14.0.1-sources中。在存储数据的时候,必须根据速率rate存储数据,这就是本章节谈的Backpressure机制。整体思路较简单,每次计算BatchDuration Job执行完成的时候,就有JobCompleted的统计信息发回来,根据统计信息计算新的Rate,将新的rate进行远程通信交给Executor,Executor根据接收到的信息重新设置Rate,每次接收数据的时候肯定根据Rate决定每秒接收多少数据。这样就动态改变了速度,每次计算上一个作业的执行情况。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         private static class Burstyextends RateLimiter {

2.                 Bursty(SleepingTicker ticker) {

3.                     super(ticker);

4.                 }

5.          

6.                 @Override

7.                 void doSetRate(double permitsPerSecond,double stableIntervalMicros) {

8.                     double oldMaxPermits =this.maxPermits;

9.                     /*

10.                   * We allow the equivalent work ofup to one second to be granted with zero waiting, if the

11.                   * rate limiter has been unused foras much. This is to avoid potentially producing tiny

12.                   * wait interval between subsequentrequests for sufficiently large rates, which would

13.                   * unnecessarily overconstrain thethread scheduler.

14.                   */

15.                  maxPermits = permitsPerSecond; //one second worth of permits

16.                  storedPermits = (oldMaxPermits ==0.0)

17.                          ? 0.0 // initial state

18.                          : storedPermits *maxPermits / oldMaxPermits;

19.              }

20.       

21.              @Override

22.              long storedPermitsToWaitTime(doublestoredPermits, double permitsToTake) {

23.                  return 0L;

24.              }

25.          }



 



标签:case,scala,RateController,Streaming,下应,Spark,源代码,方法,def
From: https://blog.51cto.com/u_10561036/6166423

相关文章

  • Hive 和 Spark 分区策略剖析
    作者:vivo互联网搜索团队-DengJie随着技术的不断的发展,大数据领域对于海量数据的存储和处理的技术框架越来越多。在离线数据处理生态系统最具代表性的分布式处理引擎当属Hive和Spark,它们在分区策略方面有着一些相似之处,但也存在一些不同之处。一、概述随着技术的不断的发展,大数据......
  • Hive 和 Spark 分区策略剖析
    作者:vivo互联网搜索团队-DengJie随着技术的不断的发展,大数据领域对于海量数据的存储和处理的技术框架越来越多。在离线数据处理生态系统最具代表性的分布式处理引擎当属Hive和Spark,它们在分区策略方面有着一些相似之处,但也存在一些不同之处。一、概述随着技术的不断的发展......
  • idea中spark安装
    Idea中spark的安装配置下载Scala插件后新建scala项目更改scala的SDK(下载的版本)和jdk并且更改工作空间导入spark相关的架包找到安装路径导入jars或者lib(找到Libraries添加ScalaSDK和java)设置maven全局的参数(导入maven的安装路径以及se......
  • Spark源码解析(二):Spark闭包检查
    一、理解Scala闭包:Closures1.1闭包的定义闭包就是一个函数和与其相关的引用环境组合的一个整体(实体)。进一步说,闭包是绑定了自由变量的函数实例。通常来讲,闭包的实现机制是定义一个特殊的数据结构,保存了函数地址指针与闭包创建时的函数的词法环境以及绑定自由变量。对于闭......
  • Spark源码解析(一):RDD之Transfrom算子
    一、延迟计算RDD代表的是分布式数据形态,因此,RDD到RDD之间的转换,本质上是数据形态上的转换(Transformations)在RDD的编程模型中,一共有两种算子,Transformations类算子和Actions类算子。开发者需要使用Transformations类算子,定义并描述数据形态的转换过程,然后调用Actions......
  • 手把手带你玩转Spark机器学习-深度学习在Spark上的应用
    文章目录系列文章目录前言一、ApacheSparkTimeline二、开发步骤1.在jupyter中启动SparkSession和SparkContext2.下载数据3.用Spark读取图片3.TransferLearning总结前言本文将介绍深度学习在Spark上的应用,我们将聚焦于深度学习Pipelines库,并讲解使用DLPipelines的方式。我们......
  • 常用spark优化参数
    常用spark优化参数强制使用sparkenginesettqs.query.engine.type=sparkCli;setspark.yarn.priority=4;双写HDFS开启:setspark.shuffle.hdfs.enable=true;s......
  • docker 搭建大数据集群(hive、hbase、ZooKeeper、Scala、Spark)
    1)本机系统设置电脑设置虚拟缓存(设置为自动管理)虚拟机设置内存和CPU内存设置为8G(或以上)CPU稍微设置高一点(三个虚拟化能开就开)虚拟机系统配置阿里源 wget-O/......
  • 大数据之—Spark环境搭建
    目录前言Local模式安装配置服务启动停止启动客户端测试Standalone模式配置启动测试1、python—spark—shell2、spark-submit3、SparkonYarn前言参考:https://blog.csdn.......
  • 概述Spark主要特点
    Spark是在MapReduce基础上产生的,它克服了MapReduce存在的性能低下、编程不够灵活等缺点。Spark作为一种DAG计算框架,其主要特点如下:1、性能高效其性能高效主要体现在以下几个......