1. Akka 概述
Akka 是 Java 虚拟机 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时,你可以理解成 Akka 是编写并发程序的框架。Akka 用 Scala 语言写成,同时提供了Scala 和 Java 的开发接口。
Akka 基于 Actor 模型,它提供了一种轻量级的并发抽象,称为 Actor,以及处理并发和分布式通信的工具。在 Akka 中,Actor 是并发执行的基本单位,它可以接收消息、处理消息和发送消息给其他 Actor。每个 Actor 都有自己的状态和行为,并且可以通过消息来进行通信和协调。
Actor 模型用于解决什么问题?
处理并发问题关键是要保证共享数据的一致性和正确性,因为程序是多线程时,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。但是当我们对关键代码加入同步条件 synchronized 后,实际上大并发就会阻塞在这段代码,对程序效率有很大影响。若是用单线程处理,不会有数据一致性的问题,但是系统的性能又不能保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。 你可以这里理解:Actor 模型是一种处理并发问题的解决方案。
2. Actor 模型
2.1 模型概述
Akka 处理并发的方法基于 Actor 模型。
在 Actor 模型中,actor 是一个并发原语,简单的说,一个 actor 就是一个工人,与进程或线程一样都能够工作或处理任务。其实这还有点不好理解,我们可以把它想象成面向对象编程语言中的一个对象实例。在 OOP 中一个对象可以访问或修改另一个对象的属性,也可以直接调用另一个对象的方法。例如下图,person1 给 person2 发送了一个消息,直接调用方法就行了。深入底层执行逻辑的话,结果就是 JVM 转到 sayHello 的代码区,一步步执行。
public class HelloWorld {
private String name = "";
public HelloWorld(String name){
this.name = name;
}
public String getName(){
return this.name;
}
public void sayHello(HelloWorld to, String msg){
System.out.println(to.getName()+" 收到 "+name+" 的消息:"+ msg);
}
}
public class OOPInvoke {
public static void main( String[] args ) {
HelloWorld person1 = new HelloWorld("Person1");
HelloWorld person2 = new HelloWorld("Person2");
person1.sayHello(person2,"Hello world");
}
}
在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。Actor 模型是作为一个并发模型设计和架构的。
Actor 和对象的不同之处在于,Actor 的状态不能直接读取、修改,Actor 的方法不能直接调用。Actor 只能通过「消息传递」的方式与外界通信。每个对象都有一个 this 指针,代表对象的地址,可以通过该地址调用方法或存取状态;与此类似,Actor 也有一个代表本身的地址,但只能向该地址发送消息。
Actor 是并发执行的最小单位,它封装了状态和行为,并且只能通过消息进行通信(Actor 之间只能通过消息通信)。Actor 之间的通信是异步的,不会阻塞或等待响应。
- Actor 通过消息传递的方式与外界通信。消息传递是异步的。每个 Actor 都有一个邮箱,该邮箱接收并缓存其他 Actor 发过来的消息,Actor 一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
- Actor 模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以 Actor 内部可以安全的处理状态,而不用考虑锁机制。
那么读者可能会问,每次只处理一个消息,这不是会严重的影响性能么?废话,一次处理一个消息当然影响性能了。不过,如果你恰当的运用 Akka 和 Actor 模型,完全可以不必关心性能的问题。下面是 Actor 模型的几个基本原则:
- 所有的计算都是在 Actor 中执行的
- Actor 之间只能通过消息进行通信交流
- 为了响应消息,Actor 可以进行下列操作:
- 更改状态或行为
- 发消息给其他 Actor
- 创建有限数量的子 Actor
看了上面几个基本原则,你是不是更加鉴定的认为 Actor 模型没啥用?嗯,这就对了,因为我当初也是这么认为的。
一次处理一个消息,没有并发,怎么提高性能?如果 Actor 只能更改状态或行为,发消息给其他 Actor,创建有限数量的子 Actor,那我的业务逻辑在哪里?Actor之间只能通过消息通信,我怎么知道另外一个 Actor 的地址?
其实 Actor 模型出现的很早,而 20 世纪 80 年代,爱立信在 Erlang 中实现了 Actor 模型,用于嵌入式电信应用程序。该实现中引入了监督机制提供的容错性概念。爱立信使用 Erlang 和 Actor 模型实现了一款日后经常被提及的应用:AXD301。这玩意儿能提供 99.9999999% 的可用性,看到没,7个9!!!绝对可以亮瞎人们的狗眼,这意味着在 100 年的时间中,AXD301 只有 3.1s 的时间会宕机。
Actor 模型的另一个重要的特性就是容错,它通过监督机制提供容错。这跟 Java 中的 throw Exception 有点类似,都是把处理响应错误的责任交给出错对象以外的实体。但在 Java 中如果一个程序或者线程抛出了一个异常,你敢放心的恢复对应的程序或线程吗?你确保恢复之后还能正常的运行吗?毕竟需要很多资源需要重新创建,但 Actor 模型可以!
如上图所示 Actor 之间是有层级关系的,子 Actor 如果出现了异常会抛给父 Actor,父 Actor 会根据情况重新构建子 Actor,子 Actor 从出现异常,到恢复之后正常运行,这段时间内的所有消息都不会丢失,等恢复之后又可以处理下一个消息。也就是说如果一个 Actor 抛出了异常,除了导致发生异常的消息外,任何消息都不会丢失。这容错性当然好了。当然了,为了实现这种特性,Akka 或 Erlang 需要做很多工作的。
Akka 中的 Actor 模型还有另外一个比较重要的两个特性:分布式与位置透明性。其实可以认为这是一个特性。Actor 模型中一个很重要的概念就是 Actor 地址,因为其他 Actor 需要通过这个地址与 Actor 进行通信。Akka 考虑到分布式的网络环境,对 Actor 地址进行了抽象,屏蔽了本地地址和远程地址的差异,对于开发者来说基本上是透明的。由于 Actor 地址是透明的,那么 Actor 又引入了集群。当然了,基于 Actor 模型和位置透明性,Akka 还有其他很多有用的组件,这里就不介绍了。
2.2 示例说明
(1)测试代码
class SayHelloActor extends Actor {
/**
* 收到消息之后处理消息的入口函数
* 1. 该方法会被该 Actor 的 MailBox(其实现了 Runnable)调用
* 2. 当该 Actor 的 MailBox 接收到消息,就会调用 receive
* 3. Receive 底层:
* -> trait Actor { type Receive = Actor.Receive }
* -> object Actor { type Receive = PartialFunction[Any, Unit] }
*
* @return
*/
override def receive: Receive = {
case "hello" => println("Hello Back!")
case "ok" => println("Ok Back!")
case "exit" =>
println("EXIT ActorSystem ...")
// 停止当前ActorRef
context.stop(self)
// 退出ActorSystem
context.system.terminate()
case _ => println("NOTHING MATCH!")
}
}
object MainApp {
// 1. 先创建一个 ActorSystem,专门用于创建 Actor
private val actorFactory = ActorSystem("firstActor")
// 2. 创建 Actor,会返回关联的 ActorRef
private val sayHelloActorRef = actorFactory.actorOf(Props[SayHelloActor], "sayHelloActor")
def main(args: Array[String]): Unit = {
// 给 sayHelloActor(的邮箱) 发消息
// sayHelloActorRef -["hello"]-> DispatcherMessage -["hello"]-> {SayHelloActor#MailBox:队列&线程} -> MailBox.Actor.receive()
sayHelloActorRef ! "hello"
sayHelloActorRef ! "ok"
sayHelloActorRef ! "2"
sayHelloActorRef ! "exit"
}
}
这是基础模式的最基本形式,给 Actor 发送消息,Actor 对消息进行响应,发送和响应是异步的,同一个 Actor 对所有的消息都是按照邮箱队列的顺序,串行调用的。
Hello Back!
Ok Back!
NOTHING MATCH!
EXIT ActorSystem ...
(2)结合示例再来说明 Actor 模型的核心概念
Actor 模型是一种并发计算模型,它将并发系统中的实体抽象为独立的 Actor,并通过消息传递进行通信。每个 Actor 都是独立的、可执行的单元,具有自己的状态和行为。Actor 之间相互解耦,通过异步方式发送和接收消息,从而实现并发和并行处理。
核心概念 | 简单说明 |
---|---|
Actor | Actor 是并发系统中的基本单位。每个 Actor 都有一个唯一的标识符,并且具有状态和行为。Actor 可以接收消息、处理消息和发送消息给其他 Actor。在 Akka 中,Actor 由 Actor 类表示,可以通过创建 Actor 类的实例来创建真实的 Actor。 |
消息传递 | 在 Actor 模型中,消息是 Actor 之间进行通信的基本单位。消息可以是任何对象,用于传递数据、指令或请求。消息是不可变的,且 Actor 之间只能通过消息进行通信。消息传递是异步的,这意味着发送消息的 Actor 不会阻塞等待响应(发送消息 Actor 也可以选择等待回复)。 |
邮箱(Mailbox) | 每个 Actor 都有一个与之关联的 Mailbox,用于存放接收到的消息。Mailbox 是线程安全的队列,保证消息按顺序被处理。当 Actor 接收到消息时,消息会被放入 Mailbox 中等待处理。 |
行为和状态 | Actor 的行为由其接收消息时执行的代码逻辑定义。每个 Actor 可以根据不同的消息采取不同的行为。Actor 还可以通过修改自己的状态来响应不同的消息。由于 Actor 之间是相互独立的,因此 Actor 的状态在不同的 Actor 之间是隔离的,不会相互影响。 |
ActorRef | ActorRef 是对 Actor 的引用,是向特定 Actor 发送消息的句柄。通过 ActorRef,可以发送消息给指定的 Actor,而无需知道 Actor 的具体标识符。ActorRef 提供了一种封装,使得 Actor 的行为和状态对外部是隐藏的(e.g. A-Actor 如果想给自己发消息,就通过 A-ActorRef;A-Actor 想给 B-Actor 发消息,就需要持有 B-ActorRef,通过 B-ActorRef 发)。 |
生命周期管理 | Akka 提供了对 Actor 生命周期的管理机制。Actor 在创建、启动、停止和重启等不同阶段都有相应的生命周期方法,可以在这些方法中执行初始化、清理、监控等操作。 |
监督机制 | Akka 中的 Actor 可以通过监督机制对其他 Actor 的异常进行监控和处理。每个 Actor 都有一个监督者(Supervisor),它负责管理和监控受管 Actor 的行为。当受管 Actor 出现异常时,监督者可以根据事先定义的策略来决定如何处理异常,例如重启 Actor、停止 Actor 或进行其他操作。 |
ActorSystem | ActorSystem 是 Akka 中的顶级抽象,代表着整个 Actor 系统。它是一个容器,负责创建和管理 Actor 的生命周期,以及提供配置、线程池等系统级的资源。是构建基于 Actor 模型的系统的入口点。每个应用程序通常只需要一个 ActorSystem 实例,但在某些情况下,可以创建多个 ActorSystem 实例以满足特定的需求。 |
通过使用 Actor 模型,Akka 实现了高度可扩展、高并发和容错的并发编程。每个 Actor 是独立的,它们之间通过消息传递进行通信,避免了传统并发编程中的锁竞争和共享状态的问题。同时,Akka 提供了监督机制和生命周期管理,帮助开发人员构建健壮的分布式系统。
2.3 补充说明
a. ActorSystem
- 创建 Actor:通过 ActorSystem,可以使用
system.actorOf
方法创建 Actor 的实例。ActorSystem 负责创建和管理所有的 Actor,并分配 Mailbox 和 Dispatcher。 - Actor 层级结构:ActorSystem 可以创建和管理多个 Actor,这些 Actor 可以根据业务需求形成层级结构。Actors 在层级结构中有不同的角色和职责,通过 ActorRef 可以在不同的 Actors 之间进行消息传递。
- 配置管理:ActorSystem 提供了配置机制,允许开发人员自定义和调整 ActorSystem 的行为。可以通过配置文件(比如 application.conf)或编程方式配置 ActorSystem 的各种参数,如线程池大小、超时时间等。
- 线程池管理:ActorSystem 实现了对线程池的管理和分配。它会根据系统负载和配置进行调度,决定将消息分发给哪个 Actor 并在哪个线程上执行。
- 监督机制和容错:ActorSystem 充当着顶级监督者的角色,负责监督整个 Actor 系统中的 Actor,并决定如何处理异常。当 Actor 异常终止时,ActorSystem 可以根据事先定义的策略(如重启、停止等)来管理和恢复 Actor 的状态。
- 生命周期管理:ActorSystem 提供了生命周期钩子方法,用于在启动和停止 ActorSystem 时执行一些初始化和清理操作。
b. Dispatcher
(1)Dispatcher
在 Akka 中,Dispatcher 负责将消息投递给 Actor 并决定使用哪个线程来执行 Actor 的逻辑。Dispatcher 是 ActorSystem 的一部分,可以根据配置和需求创建多个 Dispatcher。每个 Dispatcher 都有一个关联的线程池,用于管理任务的执行 —— 根据调度策略从 Mailbox 中获取消息,并将消息分配给合适的 Actor 执行。
(2)消息派发
当消息被发送给 Actor 时,Dispatcher 负责将消息投递给目标 Actor,并负责选择合适的线程执行 Actor 的处理逻辑。派发决策可以根据配置和调度算法进行,以满足不同的需求,例如公平派发、优先级派发等。
(3)派发器类型
Akka 提供了不同类型的 Dispatcher,每种类型适用于不同的场景和需求。常见的派发器类型包括:
- ForkJoinDispatcher:基于 Java Fork/Join 框架的 Dispatcher,适用于计算密集型任务。
- ThreadPoolDispatcher:基于线程池的 Dispatcher,适用于 I/O 密集型任务。
- DefaultDispatcher:默认的 Dispatcher,在配置中未指定时使用。
(4)配置 Dispatcher
可以通过配置文件或编程方式为每个 Actor 或 ActorSystem 分配特定的 Dispatcher。在配置文件中,可以为 Dispatcher 指定线程池大小、派发策略、优先级等参数。
(5)Dispatcher 和 Mailbox 之间的关系可以简单描述为以下几点:
- Dispatcher 知道所有的 Mailbox,因为它负责将消息投递给 Actor。
- Dispatcher 根据配置和调度算法从 Mailbox 中获取消息,然后将其分配给合适的线程执行 Actor 的逻辑。
- Mailbox 是 Actor 的一部分,负责存储接收到的消息。在获取到消息后,Mailbox 通过 Dispatcher 将消息分发给 Actor。
可以将 Dispatcher 比喻为调度中心,而 Mailbox 则是 Actor 的消息存储区。Dispatcher 从 Mailbox 获取消息,然后将其分派给合适的 Actor 执行。这种分工合作确保了 Actor 之间的消息按顺序和异步方式得到处理。
同时,Akka 还提供了不同类型的 Dispatcher 和 Mailbox 实现,以满足不同的需求和场景。可以根据具体的配置和应用需求选择适合的 Dispatcher 和 Mailbox 类型,以优化系统性能和资源利用。
总结起来,Dispatcher 负责将消息从 Mailbox 中获取,并将其分派给合适的线程执行 Actor 的逻辑。Mailbox 是 Actor 接收到消息时存放消息的缓冲区。它们共同协作,实现了消息的分发和处理。
消息派发和 Dispatcher 的工作机制可以实现 Actor 的并发处理和资源管理。Dispatcher 负责根据不同的调度策略将消息分发给 Actor,并利用线程池来管理执行 Actor 逻辑的线程。这样可以确保 Actor 之间的消息处理是异步和非阻塞的,提升了系统的并发性能和吞吐量。
3. Actor 间通讯案例
3.1 环境配置
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.12.18</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<akka.version>2.5.21</akka.version>
</properties>
<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- 多进程之间的Actor通信 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<!-- 指定插件-->
<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- maven打包的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- 指定main方法 -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>xxx</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
3.2 编写代码
a. AActor
class AActor(val bActorRef: ActorRef) extends Actor {
override def receive: Receive = {
case "start" =>
println("A-Actor start!")
self ! "r u ready?"
case "kick" =>
println("A-Actor: 发起攻击")
bActorRef ! "kick"
}
}
b. BActor
class BActor extends Actor {
override def receive: Receive = {
case "start" =>
println("B-Actor start!")
self ! "r u ready?"
case "kick" =>
println("B-Actor: 开始反击!")
sender() ! "kick"
}
}
c. MainApp
object MainApp {
def main(args: Array[String]): Unit = {
val actorFactory = ActorSystem()
val bActorRef = actorFactory.actorOf(Props[BActor], "bActor")
val aActorRef = actorFactory.actorOf(Props(new AActor(bActorRef)), "aActor")
aActorRef ! "start"
aActorRef ! "kick"
}
}
两个Actor通讯机制和Actor 自身发消息机制基本一样,只是要注意如下:
- 如果 A Actor 在需要给 B Actor 发消息,则需要持有 B Actor 的 ActorRef,可以通过创建时,传入 B Actor 的 ActorRef。
- 当 B Actor 在 receive 方法中接收到消息,需要回复时,可以通过
sender()
获取到发送 Actor 的代理对象。
如何理解 Actor 的 receive 方法被调用?
每个 Actor 对应一个 MailBox,MailBox 实现了 Runnable 接口,处于运行的状态。当有消息到达 MailBox,就会去调用 Actor 的 receive 方法。
4. Akka 网络编程
Akka 将复杂的 Actor 通信、Actor 注册、Actor 查找进行了封装。用户在写自己的 Actor 时,只需要实现 akka.actor.Actor
这个接口。
在 Akka 中,每一个 Actor 都有一个唯一的 URL,该 URL 的定义格式和万维网地址的定义格式非常相似。
每一个 Actor 通过 ActorSystem 和 Context 初始化的时候,都会得到自己唯一的路径,路径格式如下。并且可以通过 actorSelection(path)
方法查找对应路径的 Actor 对象,该方法返回该 Actor 的 ActorRef,得到 ActorRef 后就可以发送消息了。
akka.tcp://systemName@ip:port/user/topActorName/otherActorName
4.1 案例:小黄鸡客服
a. MsgProtocol
case class ClientMsg(msg: String)
case class ServerMsg(msg: String)
b. Server
class YellowChickenServer extends Actor {
override def receive: Receive = {
case "start" => println("小黄鸡开始工作啦~")
case ClientMsg(msg) => msg match {
case "twice" => sender() ! ServerMsg("元气兔兔拯救世界!")
case "once" => sender() ! ServerMsg("汪汪汪!有九个主人!")
case "slogan" => sender() ! ServerMsg("三四五代全熬死,还得看我兔瓦斯。")
case _ => sender() ! ServerMsg("[nayeon,jeongyeon,momo,sana,jihyo,mina,dahyun,chaeyoung,tzuyu]")
}
}
}
object ServerApp extends App {
private val (host, port) = ("127.0.0.1", 9999)
// 创建 config 对象:指定协议类型、监听的IP和端口
private val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
private val serverActorSystem = ActorSystem("Server", config)
private val yellowChickenServer: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "yellowChickenServer")
}
c. Client
class YellowChickenClient(val serverHost: String, val serverPort: Int) extends Actor {
private var serverActorRef: ActorSelection = _
// 该方法会在Actor运行前执行,在Akka开发中,通常将初始化工作放在preStart中。
override def preStart() = {
// ActorSystem("Server", config)
// "user" 是固定值
// serverActorSystem.actorOf(Props[YellowChickenServer], "yellowChickenServer")
println("init client...")
serverActorRef = context.actorSelection(s"akka.tcp://Server@$serverHost:$serverPort/user/yellowChickenServer")
}
override def receive: Receive = {
case "start" => println("用户来了")
case msg: String => serverActorRef ! ClientMsg(msg)
case ServerMsg(msg) => println(s"小黄鸡客服:$msg")
}
}
object ClientApp extends App {
private val (clientHost, clientPort, serverHost, serverPort) = ("127.0.0.1", 9990, "127.0.0.1", 9999)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$clientHost
|akka.remote.netty.tcp.port=$clientPort
""".stripMargin)
private val clientActorSystem = ActorSystem("Client", config)
private val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new YellowChickenClient(serverHost, serverPort)), "yellowChickenClient-01")
clientActorRef ! "start"
private var msg = ""
while (true) {
msg = StdIn.readLine()
clientActorRef ! msg
println("Enter your question: ")
}
}
4.2 案例:Spark 活性检测
a. MsgProtocol
// Master 向 Worker 返回注册成功
case object RegisteredWorkerInfo
// Worker 向 Master 发送注册信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
// Master 存储 Worker 信息的数据结构
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
var lastHeartBeatTime: Long = _
}
// Worker 通知自己该发送心跳了
case object SendHeartbeat
// Worker 向 Master 发送的心跳信息
case class Heartbeat(id: String)
// Master 通知自己该做超时检查了
case object StartTimeoutWorker
// Master 通知自己删除超时Worker
case object RemoveTimeoutWorker
b. SparkMaster
class SparkMaster extends Actor {
private val workerRegister = mutable.Map[String, WorkerInfo]()
override def receive: Receive = {
case "start" =>
println("SparkMaster start...")
self ! StartTimeoutWorker
case RegisterWorkerInfo(id, cpu, ram) if !workerRegister.contains(id) =>
val workerInfo = new WorkerInfo(id, cpu, ram)
workerRegister += (id -> workerInfo)
println(s"workersCnt=${workerRegister.size}")
sender() ! RegisteredWorkerInfo
case Heartbeat(id) =>
val workerInfo = workerRegister(id)
workerInfo.lastHeartBeatTime = System.currentTimeMillis
println(s"Master update workerId=$id's heartbeat...")
case StartTimeoutWorker =>
import context.dispatcher
context.system.scheduler.schedule(0 millis, 10000 millis, self, RemoveTimeoutWorker)
case RemoveTimeoutWorker =>
println(s"start to remove timeout worker ... workersCnt=${workerRegister.size}")
val workerInfoList = workerRegister.values
val now = System.currentTimeMillis
workerInfoList.filter(worker => now - worker.lastHeartBeatTime > 6677).foreach(worker => workerRegister.remove(worker.id))
println(s"remove timeout worker over ... workersCnt=${workerRegister.size}")
}
}
object MasterApp extends App {
// private val (host, port, name) = ("127.0.0.1", 10008, "sparkMaster")
val host = args(0)
val port = args(1)
val name = args(2)
// 创建 config 对象:指定协议类型、监听的IP和端口
private val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
private val serverActorSystem = ActorSystem("sparkMasterServer", config)
private val sparkMasterActorRef: ActorRef = serverActorSystem.actorOf(Props[SparkMaster], name)
sparkMasterActorRef ! "start"
}
c. SparkWorker
class SparkWorker(val masterName: String, val masterHost: String, val masterPort: Int) extends Actor {
private var masterActorRef: ActorSelection = _
val id = UUID.randomUUID().toString
override def preStart() = {
masterActorRef = context.actorSelection(s"akka.tcp://sparkMasterServer@$masterHost:$masterPort/user/$masterName")
}
override def receive: Receive = {
case "start" =>
println("SparkWorker start...")
masterActorRef ! RegisterWorkerInfo(id, 16, 64 * 1024)
case RegisteredWorkerInfo =>
println(s"workerId=$id registered successfully!")
import context.dispatcher
// 1. 0 millis 不延时,立即执行定时器
// 2. 3000 millis 表示每隔3s执行一次
// 3. self 表示发给自己
// 4. SendHeartbeat 发送的内容
context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartbeat)
case SendHeartbeat =>
println(s"workerId=$id sends heartbeat to master...")
masterActorRef ! Heartbeat(id)
}
}
object WorkerApp extends App {
private val (workerName, workerHost, workerPort, masterName, masterHost, masterPort) = ("sparkWorker-01", "127.0.0.1", 10001, "sparkMaster", "127.0.0.1", 10008)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$workerHost
|akka.remote.netty.tcp.port=$workerPort
""".stripMargin)
private val workerActorSystem = ActorSystem("sparkWorkerServer", config)
private val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new SparkWorker(masterName, masterHost, masterPort)), workerName)
workerActorRef ! "start"
}
标签:case,val,Scala,33,ActorSystem,Actor,消息,Akka
From: https://www.cnblogs.com/liujiaqi1101/p/17957889