2、Dispatcher 调度器 具体实现

(1)、Dispatcher 构造体

(2)、 方法1  registerRpcEndpoint

简单说说 sharedLoop 和 IsolatedRpcEndpoint 的区别



(3)方法2  getRpcEndpointRef

(4)方法3  removeRpcEndpointRef

(5)方法4  pstToAll

(6)方法5  postMessage

(7)方法6  postLocalMessage 和 PostOneWayMessage

(1)、Dispatcher 构造体
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int)



 private val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

  private val shutdownLatch = new CountDownLatch(1)
  private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)

   * True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced
   * immediately.
  private var stopped = false

(1) endpoints 维护了一个多线程安全的哈希map,其中存储的信息是已经注册的 endpoints 。

(2) endpointsRefs 维护了一个多线程安全的哈希map , 其中存储的信息是 RpcEndpoint 和 RpcEndpointRef,也就是endpoint endpoints 的引用

(3) shutdownLatch 是一个计数器,用于线程同步

(4) sharedLoop 是一个共享的消息循环机制,在 Spark 的 RPC(Remote Procedure Call,远程过程调用)或内部线程间消息通信中发挥作用

(5) stopped 这是一个信号,如果调度器dispatcher被停止了,也就是stopped = true 那么所有的发布的信息应该被迅速返回

(2)、 方法1  registerRpcEndpoint
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      if (endpoints.containsKey(name)) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")

      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is
      // called.
      endpointRefs.put(endpoint, endpointRef)

      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          throw e

 这个方法看起来比较长,但是很好理解,简单来说就是注册一个 endpoint ,看看具体实现

首先实例化一个 RpcEndpointAddress 也就是 endpoint 的地址,然后再实例化一个 NettyEndpointRef 也就是 endpoint 的引用 ,为了解耦合所以会有引用。

紧接着就是注册逻辑了,首先加一个进程锁,保证多线程安全,然后判断 Dispatcher 是否终止,判断 endpoint 是否已经被注册,如果都没有,那么就在 endpointRefs (线程安全的哈希map)中插入 endpoint endpointRef 。

最后需要进行 name messageLoop 的映射 ,用到了 match 表达式 ,确定 messageLoop 的类型

返回 endpointRef

简单说说 sharedLoop 和 IsolatedRpcEndpoint 的区别
  • 定义
    IsolatedRpcEndpointRpcEndpoint 的一种特殊实现,设计用于需要独立资源和单独消息处理的场景。

  • 特点

    1. 独立的消息循环
      • 每个 IsolatedRpcEndpoint 都绑定一个专属的消息循环 DedicatedMessageLoop
      • 消息不会与其他端点共享线程或资源。
    2. 隔离性强
      • 确保此端点的消息处理逻辑与其他端点完全分离。
      • 适合高优先级任务、需要精确控制的场景(例如,关键任务或对延迟敏感的服务)。
    3. 资源独占
      • 因为每个端点有单独的消息循环,所以需要更多系统资源(例如线程)。
  • 适用场景

    • 任务需要 严格隔离,避免与其他端点的资源竞争。
    • 消息处理逻辑复杂且对性能、时序要求高(如高优先级任务、独立服务等)。
  • 定义
    sharedLoop 是一个共享的消息循环机制,用于处理多个普通 RpcEndpoint 的消息。

  • 特点

    1. 共享的消息循环
      • 多个端点共用一个 MessageLoop(通常是 SharedMessageLoop)。
      • 消息处理时会竞争共享线程和资源。
    2. 效率高,资源利用率高
      • 通过共享线程池和消息队列,减少了资源的开销。
      • 适合绝大多数普通端点,特别是负载较低或轻量级的任务。
    3. 隔离性较弱
      • 不同端点之间可能会因为资源竞争而受到干扰。
      • 若某个端点发生阻塞或异常,可能会影响其他共享端点。
  • 适用场景

    • 普通任务,无需独立线程和资源。
    • 轻量级的消息处理逻辑,对性能要求不高。
    • 系统中有大量端点时,使用共享机制可以节省资源。
(3)方法2  getRpcEndpointRef
def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)

传入 RpcEndpoint 获取它的 RpcEndpointRef

(4)方法3  removeRpcEndpointRef
def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)

传入 RpcEndpoint 删除它的 RpcEndpointRef

(5)方法4  pstToAll
   * Send a message to all registered [[RpcEndpoint]]s in this process.
   * This can be used to make network events known to all end points (e.g. "a new node connected").
  def postToAll(message: InboxMessage): Unit = {
    val iter = endpoints.keySet().iterator()
    while (iter.hasNext) {
      val name = iter.next
        postMessage(name, message, (e) => { e match {
          case e: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${e.getMessage}")
          case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")

通过获取一个 endpoints 键迭代器,然后将消息分发到每一个 endpoint 

(6)方法5  postMessage
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val loop = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (loop == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        loop.post(endpointName, message)
    // We don't need to call `onStop` in the `synchronized` block

postMessage 是 Spark 的内部方法,用于将消息(InboxMessage)发送到指定的 RpcEndpoint。它的核心功能是:

  1. 检查目标端点是否存在或是否已停止。
  2. 如果可以发送,将消息提交到端点的消息队列。
  3. 如果遇到错误(例如端点未找到或已停止),通过回调通知调用方。
(7)方法6  postLocalMessage 和 PostOneWayMessage
 /** Posts a message sent by a local endpoint. */
  def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
    val rpcCallContext =
      new LocalNettyRpcCallContext(message.senderAddress, p)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))

  /** Posts a one-way message. */
  def postOneWayMessage(message: RequestMessage): Unit = {
    postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
        // SPARK-31922: in local cluster mode, there's always a RpcEnvStoppedException when
        // stop is called due to some asynchronous message handling. We catch the exception
        // and log it at debug level to avoid verbose error message when user stop a local
        // cluster in spark shell.
        case re: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${re.getMessage}")
        case e if SparkEnv.get.isStopped =>
          logWarning(s"Message $message dropped due to sparkEnv is stopped. ${e.getMessage}")
        case e => throw e




