接上一篇SparkRpc框架阐述
目录
(7)方法6 postLocalMessage 和 PostOneWayMessage
2、Dispatcher 调度器 具体实现
由于方法太多,我调重点讲讲,希望大家自己能好好看看源码
(1)、Dispatcher 构造体
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int)
首先我们可以看到构造器中,有NettyRpcEnv和Cpu可用核数
然后是成员变量
private val endpoints: ConcurrentMap[String, MessageLoop] =
new ConcurrentHashMap[String, MessageLoop]
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] =
new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
private val shutdownLatch = new CountDownLatch(1)
private lazy val sharedLoop = new SharedMessageLoop(nettyEnv.conf, this, numUsableCores)
/**
* True if the dispatcher has been stopped. Once stopped, all messages posted will be bounced
* immediately.
*/
@GuardedBy("this")
private var stopped = false
(1) endpoints 维护了一个多线程安全的哈希map,其中存储的信息是已经注册的 endpoints 。
(2) endpointsRefs 维护了一个多线程安全的哈希map , 其中存储的信息是 RpcEndpoint 和 RpcEndpointRef,也就是endpoint 和 endpoints 的引用
(3) shutdownLatch 是一个计数器,用于线程同步
(4) sharedLoop 是一个共享的消息循环机制,在 Spark 的 RPC(Remote Procedure Call,远程过程调用)或内部线程间消息通信中发挥作用
(5) stopped 这是一个信号,如果调度器dispatcher被停止了,也就是stopped = true 那么所有的发布的信息应该被迅速返回
(2)、 方法1 registerRpcEndpoint
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.containsKey(name)) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called.
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
}
endpointRef
}
这个方法看起来比较长,但是很好理解,简单来说就是注册一个 endpoint ,看看具体实现
首先实例化一个 RpcEndpointAddress 也就是 endpoint 的地址,然后再实例化一个 NettyEndpointRef 也就是 endpoint 的引用 ,为了解耦合所以会有引用。
紧接着就是注册逻辑了,首先加一个进程锁,保证多线程安全,然后判断 Dispatcher 是否终止,判断 endpoint 是否已经被注册,如果都没有,那么就在 endpointRefs (线程安全的哈希map)中插入 endpoint 和 endpointRef 。
最后需要进行 name 和 messageLoop 的映射 ,用到了 match 表达式 ,确定 messageLoop 的类型
返回 endpointRef
简单说说 sharedLoop 和 IsolatedRpcEndpoint 的区别
1、IsolatedRpcEndpoint
-
定义:
IsolatedRpcEndpoint
是RpcEndpoint
的一种特殊实现,设计用于需要独立资源和单独消息处理的场景。 -
特点:
- 独立的消息循环:
- 每个
IsolatedRpcEndpoint
都绑定一个专属的消息循环DedicatedMessageLoop
。 - 消息不会与其他端点共享线程或资源。
- 每个
- 隔离性强:
- 确保此端点的消息处理逻辑与其他端点完全分离。
- 适合高优先级任务、需要精确控制的场景(例如,关键任务或对延迟敏感的服务)。
- 资源独占:
- 因为每个端点有单独的消息循环,所以需要更多系统资源(例如线程)。
- 独立的消息循环:
-
适用场景:
- 任务需要 严格隔离,避免与其他端点的资源竞争。
- 消息处理逻辑复杂且对性能、时序要求高(如高优先级任务、独立服务等)。
2、sharedLoop
-
定义:
sharedLoop
是一个共享的消息循环机制,用于处理多个普通RpcEndpoint
的消息。 -
特点:
- 共享的消息循环:
- 多个端点共用一个
MessageLoop
(通常是SharedMessageLoop
)。 - 消息处理时会竞争共享线程和资源。
- 多个端点共用一个
- 效率高,资源利用率高:
- 通过共享线程池和消息队列,减少了资源的开销。
- 适合绝大多数普通端点,特别是负载较低或轻量级的任务。
- 隔离性较弱:
- 不同端点之间可能会因为资源竞争而受到干扰。
- 若某个端点发生阻塞或异常,可能会影响其他共享端点。
- 共享的消息循环:
-
适用场景:
- 普通任务,无需独立线程和资源。
- 轻量级的消息处理逻辑,对性能要求不高。
- 系统中有大量端点时,使用共享机制可以节省资源。
(3)方法2 getRpcEndpointRef
def getRpcEndpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointRefs.get(endpoint)
传入 RpcEndpoint 获取它的 RpcEndpointRef
(4)方法3 removeRpcEndpointRef
def removeRpcEndpointRef(endpoint: RpcEndpoint): Unit = endpointRefs.remove(endpoint)
传入 RpcEndpoint 删除它的 RpcEndpointRef
(5)方法4 pstToAll
/**
* Send a message to all registered [[RpcEndpoint]]s in this process.
*
* This can be used to make network events known to all end points (e.g. "a new node connected").
*/
def postToAll(message: InboxMessage): Unit = {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => { e match {
case e: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${e.getMessage}")
case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
}}
)}
}
通过获取一个 endpoints 键迭代器,然后将消息分发到每一个 endpoint 上
(6)方法5 postMessage
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val loop = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (loop == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
loop.post(endpointName, message)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
postMessage
是 Spark 的内部方法,用于将消息(InboxMessage
)发送到指定的 RpcEndpoint
。它的核心功能是:
- 检查目标端点是否存在或是否已停止。
- 如果可以发送,将消息提交到端点的消息队列。
- 如果遇到错误(例如端点未找到或已停止),通过回调通知调用方。
(7)方法6 postLocalMessage 和 PostOneWayMessage
/** Posts a message sent by a local endpoint. */
def postLocalMessage(message: RequestMessage, p: Promise[Any]): Unit = {
val rpcCallContext =
new LocalNettyRpcCallContext(message.senderAddress, p)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => p.tryFailure(e))
}
/** Posts a one-way message. */
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
{
// SPARK-31922: in local cluster mode, there's always a RpcEnvStoppedException when
// stop is called due to some asynchronous message handling. We catch the exception
// and log it at debug level to avoid verbose error message when user stop a local
// cluster in spark shell.
case re: RpcEnvStoppedException => logDebug(s"Message $message dropped. ${re.getMessage}")
case e if SparkEnv.get.isStopped =>
logWarning(s"Message $message dropped due to sparkEnv is stopped. ${e.getMessage}")
case e => throw e
})
}
一个方法是本地endpoint消息传输,一个方法是远程endpoint消息传输,都调用了postMessage方法,所以重点理解postMessage方法。
今天就更新这么多,接下来我们要准备着手,自己写一个SparkRpc的代码出来。
坚持才是胜利。
标签:endpoint,MessageLoop,name,RpcEndpoint,Rpc,源码,端点,new,message From: https://blog.csdn.net/Peng909157372/article/details/145174335