首页 > 编程语言 >Spark 源码解析(二) 根据 SparkRpc 自己动手实践一个跨节点通信

Spark 源码解析(二) 根据 SparkRpc 自己动手实践一个跨节点通信

时间:2025-01-17 14:30:10浏览次数:3  
标签:val maven 源码 RpcEnv org apache Master Spark SparkRpc

 目录

一、框架流程:

二、Maven 搭建 Scala 导入POM依赖

三、根据流程进行编写

1、实例 Master

2、创建 RpcEnv

3、创建 RpcEndpoint

4、生成 RpcEndpointRef

5、RpcEndpointRef发送消息

 6、防止还没收到消息程序就结束运行

7、验证一下,看看结果

四、完整代码


一、框架流程:

1、实例 Master

2、创建 RpcEnv

3、MasterRpcEnv 注册

4、生成 RpcEndpointRef

5、RpcEndpointRef发送消息

二、Maven 搭建 Scala 导入POM依赖

首先是主模块

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.spark</groupId>
    <artifactId>10</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>
    <modules>
        <module>11</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

然后创建一个子模块

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.apache.spark</groupId>
        <artifactId>10</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>11</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.arakhne.afc.slf4j</groupId>
            <artifactId>slf4j-log4j</artifactId>
            <version>17.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.13.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.13</artifactId>
            <version>3.5.4</version>
        </dependency>
    </dependencies>

</project>

三、根据流程进行编写

在子模块下创建Scala类Master

1、实例 Master

class Master(val rpcEnv:RpcEnv) extends ThreadSafeRpcEndpoint {

  override def receive: PartialFunction[Any, Unit] = {
    case "hello" => println("hello,这是一个测试")
    case _ => println("receive uncatchable information!")
  }
}

我们首先需要伪装一下包名,否则无法调用 RpcEnv

然后继承线程安全的 ThreadSafeRpcEndpoint 成为一个Endpoint

重写receive方法,用到了偏函数。

2、创建 RpcEnv

    val sparkConf:SparkConf = new SparkConf()
    val securityManager:SecurityManager = new SecurityManager(sparkConf)

    // 1、创建RpcEnv

    val rpcEnv1 = RpcEnv.create("master","127.0.0.1",10000,sparkConf,securityManager)

3、创建 RpcEndpoint

    // 2、创建RpcEndpoint

    val master = new Master(rpcEnv1)

Master 继承了 ThreadSafeRpcEndpoint 所以可以直接这么写

4、生成 RpcEndpointRef

    // 3、向RpcEnv注册RpcEndpoint并返回RpcEndpointRef

    val endpointEndpointRef = rpcEnv1.setupEndpoint("endpoint1", master)

代码实际上很简单,因为都集成好了

5、RpcEndpointRef发送消息

    // 4、使用RpcEndpointRef发送消息

    endpointEndpointRef.send("hello")

 6、防止还没收到消息程序就结束运行

    rpcEnv.awaitTermination()

7、验证一下,看看结果

成功!

再试试传输别的信息

endpointEndpointRef.send("123456")

 没有问题。

四、完整代码

package org.apache.spark.psy

import org.apache.spark.SparkConf
import org.apache.spark.rpc.{RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.SecurityManager


class Master(val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint{
  override def receive: PartialFunction[Any, Unit] = {
    case "hello" => println("hello,这是一个测试")
    case _ => println("receive uncatchable information!")
  }
}

object Master{
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    val securityManager = new SecurityManager(conf)

    //1.创建RpcEnv
    val rpcEnv = RpcEnv.create("SparkMaster","localhost",8888,conf,securityManager)
    //2.创建RpcEndPoint
    val master = new Master(rpcEnv)
    //3.使用RpcEnv注册RpcEndPoint
    val masterEndpointRef = rpcEnv.setupEndpoint("Master",master)

    //4.通过返回的RpcEndPointRef发送消息
    masterEndpointRef.send("123456")

    rpcEnv.awaitTermination()
  }
}

OK,代码不是很难,因为都已经集成好了,接下来我们要试着自己去集成代码,自己尝试完成一个 应用 Rpc 框架

加油!坚持就是胜利!

标签:val,maven,源码,RpcEnv,org,apache,Master,Spark,SparkRpc
From: https://blog.csdn.net/Peng909157372/article/details/145203016

相关文章

  • 基于php购物商城在线购物网站电商系统蛋糕网站php+mysql毕业设计课程设计毕设指导计算
    一、功能介绍php在线购物商城电商网站详细技术:HTML+CSS+JS+PHP+MYSQL系统分为用户和管理员两种身份用户功能如下:1.登陆注册2.查看商品详情、蛋糕资讯3.加入购物车、结算订单4.评价5.修改密码6.搜索蛋糕7.退出登录管理员功能如下:1.登录退出2.蛋糕管理(添加、修改和......
  • 2025毕设springboot 华南地区走失人员信息管理系统论文+源码
    系统程序文件列表开题报告内容研究背景华南地区,作为中国人口密集、流动性大的区域之一,近年来随着社会经济的快速发展,人员流动频繁,这也导致了走失人员事件的频发。无论是因认知障碍、精神健康问题还是意外迷路,每一例走失事件都牵动着无数家庭的心。然而,传统的寻找走失人员的......
  • 2025毕设springboot 火锅店管理系统论文+源码
    系统程序文件列表开题报告内容研究背景在当今快节奏的社会中,餐饮行业尤其是火锅店作为深受消费者喜爱的餐饮形式之一,面临着日益激烈的市场竞争。传统的火锅店管理方式往往依赖于人工记录与操作,不仅效率低下,还容易出错,难以满足现代餐饮业对高效、准确和个性化服务的需求。随......
  • 2025毕设springboot 婚纱租赁系统论文+源码
    系统程序文件列表开题报告内容研究背景随着婚庆行业的蓬勃发展,婚纱作为婚礼中不可或缺的重要元素,其市场需求日益旺盛。然而,传统购买婚纱的方式不仅成本高昂,而且在婚礼结束后往往面临闲置或处理困难的问题。因此,婚纱租赁作为一种经济、环保且灵活的消费模式逐渐受到新人的青......
  • 【一看就会】Autoware.universe的“规划”部分源码梳理【一】
    文章目录前言一、planning实现的几个功能1.规划起点到目标点的路线2.规划跟随路线的轨迹3.确保车辆不与障碍物发生碰撞4.确保车辆遵守交通规则5.规划车辆可行的轨迹二、全局路径生成模块——mission_planner1.mission_planner.cpp1.输入输出:2.代码流程:3.源码注释2.rout......
  • 2025毕设springboot 基于Android的“自律打卡”演示录像120239论文+源码
    系统程序文件列表开题报告内容研究背景在快速发展的数字化时代,智能手机已成为人们日常生活中不可或缺的一部分,特别是Android系统因其开源性和广泛的应用基础,深受用户喜爱。随着生活节奏的加快,越来越多的人开始重视时间管理和自我提升,自律打卡作为一种有效的自我激励方式,逐......
  • 2025毕设springboot 基于Android的“课堂管理助手”移动应用开发论文+源码
    系统程序文件列表开题报告内容研究背景随着信息技术的飞速发展,教育领域正经历着深刻的变革。传统课堂管理模式逐渐暴露出效率低下、信息孤岛等问题,无法满足现代教育对于高效、互动和个性化的需求。特别是在高等教育和职业教育中,师生间的信息交流、课程管理、作业提交与批改......
  • 【LeetCode】力扣刷题热题100道(31-35题)附源码 搜索二维矩阵 岛屿数量 腐烂的橙子 课程
    一、搜索二维矩阵编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性:每行的元素从左到右升序排列。每列的元素从上到下升序排列。可以使用从右上角开始搜索的方法来有效地找到目标值。选择起始位置:从矩阵的右上角开始。......
  • 2025毕设springboot 基于android的健身运动演示录像120239论文+源码
    系统程序文件列表开题报告内容研究背景随着现代生活节奏的加快,人们越来越注重健康与健身。然而,忙碌的工作和生活压力往往使得人们难以找到合适的时间和方式来进行系统的健身锻炼。在这一背景下,智能手机应用程序(App)凭借其便捷性和普及性,成为了人们追求健康生活的重要工具。......
  • 基于javaweb的SpringBoot景区旅游管理系统设计和实现(源码+文档+部署讲解)
    个人名片......