首页 > 编程语言 >Spark 源码分析(二) SparkRpc中Rpc架构解读 (正在更新 MessageLoop部分~)

Spark 源码分析(二) SparkRpc中Rpc架构解读 (正在更新 MessageLoop部分~)

时间:2025-01-16 11:30:16浏览次数:3  
标签:endpoint MessageLoop name RpcEndpoint Rpc 源码 端点 new message

接上一篇SparkRpc框架阐述

目录

2、Dispatcher 调度器 具体实现

(1)、Dispatcher 构造体

(2)、 方法1  registerRpcEndpoint

简单说说 sharedLoop 和 IsolatedRpcEndpoint 的区别

1、IsolatedRpcEndpoint

2、sharedLoop

(3)方法2  getRpcEndpointRef

(4)方法3  removeRpcEndpointRef

(5)方法4  pstToAll

(6)方法5  postMessage

(7)方法6  postLocalMessage 和 PostOneWayMessage


2、Dispatcher 调度器 具体实现

由于方法太多,我调重点讲讲,希望大家自己能好好看看源码

(1)、Dispatcher 构造体
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int)

首先我们可以看到构造器中,有NettyRpcEnv和Cpu可用核数

然后是成员变量

 private val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

  private val shutdownLatch = new CountDownLatch(1)
  private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)

  /**
   * True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced
   * immediately.
   */
  @GuardedBy("this")
  private var stopped = false

(1) endpoints 维护了一个多线程安全的哈希map,其中存储的信息是已经注册的 endpoints 。

(2) endpointsRefs 维护了一个多线程安全的哈希map , 其中存储的信息是 RpcEndpoint 和 RpcEndpointRef,也就是endpoint endpoints 的引用

(3) shutdownLatch 是一个计数器,用于线程同步

(4) sharedLoop 是一个共享的消息循环机制,在 Spark 的 RPC(Remote Procedure Call,远程过程调用)或内部线程间消息通信中发挥作用

(5) stopped 这是一个信号,如果调度器dispatcher被停止了,也就是stopped = true 那么所有的发布的信息应该被迅速返回

(2)、 方法1  registerRpcEndpoint
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      if (endpoints.containsKey(name)) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }

      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is
      // called.
      endpointRefs.put(endpoint, endpointRef)

      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {
        case NonFatal(e) =>
          endpointRefs.remove(endpoint)
          throw e
      }
    }
    endpointRef
  }

 这个方法看起来比较长,但是很好理解,简单来说就是注册一个 endpoint ,看看具体实现

首先实例化一个 RpcEndpointAddress 也就是 endpoint 的地址,然后再实例化一个 NettyEndpointRef 也就是 endpoint 的引用 ,为了解耦合所以会有引用。

紧接着就是注册逻辑了,首先加一个进程锁,保证多线程安全,然后判断 Dispatcher 是否终止,判断 endpoint 是否已经被注册,如果都没有,那么就在 endpointRefs (线程安全的哈希map)中插入 endpoint endpointRef 。

最后需要进行 name messageLoop 的映射 ,用到了 match 表达式 ,确定 messageLoop 的类型

返回 endpointRef

简单说说 sharedLoop 和 IsolatedRpcEndpoint 的区别
1、IsolatedRpcEndpoint
  • 定义
    IsolatedRpcEndpointRpcEndpoint 的一种特殊实现,设计用于需要独立资源和单独消息处理的场景。

  • 特点

    1. 独立的消息循环
      • 每个 IsolatedRpcEndpoint 都绑定一个专属的消息循环 DedicatedMessageLoop
      • 消息不会与其他端点共享线程或资源。
    2. 隔离性强
      • 确保此端点的消息处理逻辑与其他端点完全分离。
      • 适合高优先级任务、需要精确控制的场景(例如,关键任务或对延迟敏感的服务)。
    3. 资源独占
      • 因为每个端点有单独的消息循环,所以需要更多系统资源(例如线程)。
  • 适用场景

    • 任务需要 严格隔离,避免与其他端点的资源竞争。
    • 消息处理逻辑复杂且对性能、时序要求高(如高优先级任务、独立服务等)。
2、sharedLoop
  • 定义
    sharedLoop 是一个共享的消息循环机制,用于处理多个普通 RpcEndpoint 的消息。

  • 特点

    1. 共享的消息循环
      • 多个端点共用一个 MessageLoop(通常是 SharedMessageLoop)。
      • 消息处理时会竞争共享线程和资源。
    2. 效率高,资源利用率高
      • 通过共享线程池和消息队列,减少了资源的开销。
      • 适合绝大多数普通端点,特别是负载较低或轻量级的任务。
    3. 隔离性较弱
      • 不同端点之间可能会因为资源竞争而受到干扰。
      • 若某个端点发生阻塞或异常,可能会影响其他共享端点。
  • 适用场景

    • 普通任务,无需独立线程和资源。
    • 轻量级的消息处理逻辑,对性能要求不高。
    • 系统中有大量端点时,使用共享机制可以节省资源。
(3)方法2  getRpcEndpointRef
def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)

传入 RpcEndpoint 获取它的 RpcEndpointRef

(4)方法3  removeRpcEndpointRef
def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)

传入 RpcEndpoint 删除它的 RpcEndpointRef

(5)方法4  pstToAll
 /**
   * Send a message to all registered [[RpcEndpoint]]s in this process.
   *
   * This can be used to make network events known to all end points (e.g. "a new node connected").
   */
  def postToAll(message: InboxMessage): Unit = {
    val iter = endpoints.keySet().iterator()
    while (iter.hasNext) {
      val name = iter.next
        postMessage(name, message, (e) => { e match {
          case e: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${e.getMessage}")
          case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
        }}
      )}
  }

通过获取一个 endpoints 键迭代器,然后将消息分发到每一个 endpoint 

(6)方法5  postMessage
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val loop = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (loop == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        loop.post(endpointName, message)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

postMessage 是 Spark 的内部方法,用于将消息(InboxMessage)发送到指定的 RpcEndpoint。它的核心功能是:

  1. 检查目标端点是否存在或是否已停止。
  2. 如果可以发送,将消息提交到端点的消息队列。
  3. 如果遇到错误(例如端点未找到或已停止),通过回调通知调用方。
(7)方法6  postLocalMessage 和 PostOneWayMessage
 /** Posts a message sent by a local endpoint. */
  def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
    val rpcCallContext =
      new LocalNettyRpcCallContext(message.senderAddress, p)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
  }

  /** Posts a one-way message. */
  def postOneWayMessage(message: RequestMessage): Unit = {
    postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
      {
        // SPARK-31922: in local cluster mode, there's always a RpcEnvStoppedException when
        // stop is called due to some asynchronous message handling. We catch the exception
        // and log it at debug level to avoid verbose error message when user stop a local
        // cluster in spark shell.
        case re: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${re.getMessage}")
        case e if SparkEnv.get.isStopped =>
          logWarning(s"Message $message dropped due to sparkEnv is stopped. ${e.getMessage}")
        case e => throw e
      })
  }

一个方法是本地endpoint消息传输,一个方法是远程endpoint消息传输,都调用了postMessage方法,所以重点理解postMessage方法。

今天就更新这么多,接下来我们要准备着手,自己写一个SparkRpc的代码出来。

坚持才是胜利。

标签:endpoint,MessageLoop,name,RpcEndpoint,Rpc,源码,端点,new,message
From: https://blog.csdn.net/Peng909157372/article/details/145174335

相关文章

  • 【毕业设计】智慧医院管理系统程序源码
    一、环境信息开发语言:JAVAJDK版本:JDK8及以上数据库:MySql5.6及以上Maven版本:任意版本操作系统:Windows、macOS开发工具:Idea、Eclipse、MyEclipse开发框架:Springboot+HTML+jQuery+Mysql二、所有选题列表所有毕业设计选题列表,点击文章结尾下方卡片免费咨询三、功能介......
  • 【附源码】JAVA花店管理后台系统源码+SpringBoot+VUE+前后端分离
    学弟,学妹好,我是爱学习的学姐,今天带来一款优秀的项目:花店管理后台系统 。本文介绍了系统功能与部署安装步骤,如果您有任何问题,也请联系学姐,偶现在是经验丰富的程序员!一.系统演示系统测试截图     系统视频演示 https://githubs.xyz/show/341.mp4 二.系统概......
  • 基于大数据+协同过滤推荐算法+数据可视化大屏+SpringBoot的校园食堂订餐系统设计和实
    博主介绍:CSDN毕设辅导第一人、靠谱第一人、全网粉丝50W+,csdn特邀作者、博客专家、腾讯云社区合作讲师、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌技术......
  • [附源码]图书管理系统+SpringBoot+Vue前后端分离
    今天带来一款优秀的项目:图书借阅管理系统源码 。系统采用的流行的前后端分离结构,内含功能包括"系统权限角色",“登录,注册”,“图书管理”,“借阅管理”,“图书类别管理”,“系统账号管理”。如果您有任何问题,也请联系小编,小编是经验丰富的程序员!一.系统演示视频 https:......
  • springboot流浪动物领养系统源码毕设+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景在当今社会,流浪动物问题日益凸显,成为城市管理和动物保护领域的一大挑战。随着城市化进程的加速,流浪动物的数量不断增加,它们面临着食物短缺、疾病困扰......
  • 【springboot农产品托管系统源码毕设+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着现代农业的快速发展,农产品的生产、管理与销售环节日益复杂,传统的农业管理模式已难以满足当前高效、精准的农业管理需求。特别是在农产品从生产到......
  • springbootApex英雄游戏展示网站源码毕设+论文
    本系统(程序+源码)带文档lw万字以上文末可获取一份本项目的java源码和数据库参考。系统程序文件列表开题报告内容研究背景随着电子竞技与游戏文化的蓬勃发展,英雄类游戏在全球范围内赢得了广泛的关注与热爱。这类游戏以其丰富的角色设定、精彩的游戏剧情以及高度互动的游戏......
  • RPC 源码解析~Apache Dubbo
    解析RPC(远程过程调用)的源码可以帮助你深入理解其工作原理和实现细节。为了更好地进行源码解析,我们选择一个流行的RPC框架——ApacheDubbo作为示例。Dubbo是一个高性能、轻量级的开源JavaRPC框架,广泛应用于企业级应用中。Dubbo的优劣势优势高性能:Dubbo使用Nett......
  • 基于spring boot宠物领养系统的设计与实现 宠物领养系统(源码+文档)
    目录一.研究目的二.需求分析三.数据库设计 四.系统页面展示五.免费源码获取方式一.研究目的本课题是根据用户的需要以及网络的优势建立的一个宠物领养系统,来满足用宠物领养的需求。本宠物领养系统应用JSP技术,Java语言,MYSQL数据库存储数据,基于B/S结构开发。在网站的整......
  • JSP龙陵县第一中学教学资源库系统i8414(程序+源码+数据库+调试部署+开发环境)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表技术要求:开发语言:JSP前端使用:HTML5,CSS,JSP动态网页技术后端使用SpringBoot,Spring技术主数据库使用MySQL开题报告内容一、课题背景随着信息技术的不断发展......