目录
一、框架流程:
1、实例 Master
2、创建 RpcEnv
3、Master向 RpcEnv 注册
4、生成 RpcEndpointRef
5、RpcEndpointRef发送消息
二、Maven 搭建 Scala 导入POM依赖
首先是主模块
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.spark</groupId>
<artifactId>10</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>11</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
然后创建一个子模块
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>10</artifactId>
<version>1.0-SNAPSHOT</version>
</parent>
<artifactId>11</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.arakhne.afc.slf4j</groupId>
<artifactId>slf4j-log4j</artifactId>
<version>17.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.15</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
<version>3.5.4</version>
</dependency>
</dependencies>
</project>
三、根据流程进行编写
在子模块下创建Scala类Master
1、实例 Master
class Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint {
override def receive: PartialFunction[Any, Unit] = {
case "hello" => println("hello,这是一个测试")
case _ => println("receive uncatchable information!")
}
}
我们首先需要伪装一下包名,否则无法调用 RpcEnv
然后继承线程安全的 ThreadSafeRpcEndpoint 成为一个Endpoint
重写receive方法,用到了偏函数。
2、创建 RpcEnv
val sparkConf:SparkConf = new SparkConf()
val securityManager:SecurityManager = new SecurityManager(sparkConf)
// 1、创建RpcEnv
val rpcEnv1 = RpcEnv.create("master","127.0.0.1",10000,sparkConf,securityManager)
3、创建 RpcEndpoint
// 2、创建RpcEndpoint
val master = new Master(rpcEnv1)
Master 继承了 ThreadSafeRpcEndpoint 所以可以直接这么写。
4、生成 RpcEndpointRef
// 3、向RpcEnv注册RpcEndpoint并返回RpcEndpointRef
val endpointEndpointRef = rpcEnv1.setupEndpoint("endpoint1", master)
代码实际上很简单,因为都集成好了
5、RpcEndpointRef发送消息
// 4、使用RpcEndpointRef发送消息
endpointEndpointRef.send("hello")
6、防止还没收到消息程序就结束运行
rpcEnv.awaitTermination()
7、验证一下,看看结果
成功!
再试试传输别的信息
endpointEndpointRef.send("123456")
没有问题。
四、完整代码
package org.apache.spark.psy
import org.apache.spark.SparkConf
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.SecurityManager
class Master(val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint{
override def receive: PartialFunction[Any, Unit] = {
case "hello" => println("hello,这是一个测试")
case _ => println("receive uncatchable information!")
}
}
object Master{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
val securityManager = new SecurityManager(conf)
//1.创建RpcEnv
val rpcEnv = RpcEnv.create("SparkMaster","localhost",8888,conf,securityManager)
//2.创建RpcEndPoint
val master = new Master(rpcEnv)
//3.使用RpcEnv注册RpcEndPoint
val masterEndpointRef = rpcEnv.setupEndpoint("Master",master)
//4.通过返回的RpcEndPointRef发送消息
masterEndpointRef.send("123456")
rpcEnv.awaitTermination()
}
}
OK,代码不是很难,因为都已经集成好了,接下来我们要试着自己去集成代码,自己尝试完成一个 应用 Rpc 框架
加油!坚持就是胜利!
标签:val,maven,源码,RpcEnv,org,apache,Master,Spark,SparkRpc From: https://blog.csdn.net/Peng909157372/article/details/145203016