首页 > 其他分享 >Spark远程集群实现

Spark远程集群实现

时间:2022-12-20 22:44:21浏览次数:40  
标签:session 集群 org spark import Spark config 远程

内容简介

本篇文章主要讲述了Springboot项目和Spark相结合的方法。

在Spark的学习过程中,我们大多数提交作业的方式都是使用Spark的命令来提交jar包并运行,但是本次相反,我们将使用Springboot来开发Spark的相关计算任务代码,开发完成后将项目打包部署到服务器上,项目启动以后会生成一个SparkSession的长服务,各种请求可通过http来调用项目中相关的接口,再来调用Spark来进行计算。

环境准备

安装好Spark环境,单节点、集群都可以,如果是集群,则要保证节点之间的端口通信正常。

Spark版本要和代码中的Spark版本相同。

如果使用集群,则需要一个Nginx,做反向代理,用来分发项目包到各个节点。

server {
    listen       30443 default_server;
    server_name  _;

    location /jar/ {
        alias /opt/jar/;     // 放项目包的目录
    }

    location / {
        return 502;
    }
}

代码实现

POM文件只放打包相关的,依赖的包各不相同。

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.3.7.RELEASE</version>
                <configuration>
                    <includeSystemScope>true</includeSystemScope>
                    <mainClass>com.xxx.xxx.TestApplication</mainClass>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

创建SparkSession长服务实现代码:

import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.UDFRegistration;
import org.apache.spark.sql.types.StructType;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Map;

@Service
@Slf4j
public class SparkService implements DisposableBean, InitializingBean {
    private SparkSession session = null;

    public SparkSession session() {
        return session;
    }

    @Override
    public void afterPropertiesSet() {
        try {
            log.info("正在创建Spark Session......");
            SparkConf config = new SparkConf();
            config.setAppName(getClass().getSimpleName());
            config.setMaster("spark://bigdata1:7077");
            config.setJars(new String[]{ "http://bigdata1:30543/jar/Test-1.0-SNAPSHOT.jar.original" });
            config.set("spark.executor.cores", "2");  // 每个executor占用的核心数
            config.set("spark.driver.memory", "2g");
            config.set("spark.driver.cores", "2");
            config.set("spark.executor.memory", "2g");
            config.set("spark.default.parallelism", "8");  // 任务并行度
            session = SparkSession.builder().config(config).getOrCreate();
        } catch (Exception e) {
            log.error("Spark Session创建失败!!!");
            e.printStackTrace();
        }
    }

    @Override
    public void destroy() {
        FileUtil.close(session);
    }

    // 注册自定义UDF
    public SparkSession udf(SparkSession session) {
        UDFRegistration udf = session.udf();
        udf.register(ToCharUDF.NAME, new ToCharUDF(), ToCharUDF.TYPE);
        udf.register(AgeUDF.NAME, new AgeUDF(), AgeUDF.TYPE);
        udf.register(NullIfUDF.NAME,new NullIfUDF(), NullIfUDF.TYPE);
        udf.register(DatePartEpochUDF.NAME,new DatePartEpochUDF(), DatePartEpochUDF.TYPE);
        udf.register(ToNumberUDF.NAME,new ToNumberUDF(), ToNumberUDF.TYPE);
        return session;
    }
}

当需要用到Spark时先将上面的类注入:

    @Autowired
    public SparkService sparkService;

再创建一个新的session:

SparkSession session = sparkService.session().newSession(); // 没有注册自定义UDF函数
SparkSession sparkSession = sparkService.udf(session);      // 注册没有自定义UDF函数

部署运行

再说项目打包相关事宜,一共会打两个项目包,一个是以“.jar”结尾,另一个是以“.jar.original”结尾。

还要导出所有项目相关依赖的jar包放到Spark /jars目录下,使用下面命令即可:

mvn dependency:copy-dependencies

但是需要注意,自己导出来的包可能会和Spark的jars目录下的包有冲突,此时要以项目里的依赖版本为准,如果删除Spark中的冲突依赖之后导致Spark服务无法启动,则需要调整项目中引用的依赖版本向Spark服务靠齐。

做完以上几步之后就可以启动spark服务,启动项目(命令:java -jar xxx)即可。

标签:session,集群,org,spark,import,Spark,config,远程
From: https://www.cnblogs.com/chuijingjing/p/16995280.html

相关文章

  • tomcat优化-有改protocol 和 缓存 集群方案
     tomcat优化在线上环境中我们是采用了tomcat作为Web服务器,它的处理性能直接关系到用户体验,在平时的工作和学习中,归纳出以下七种调优经验。1.服务器资源  服务器所能提......
  • 远程计算机已加入AAD凭据不工作
    远程计算机已加入AAD凭据不工作解决方法计算机需要加入AAD(AzureActiveDirectory)设置-账户-其他用户-连接工作或学校用户-连接在下方的【替代操作】中选......
  • 常见集群算法解析
    Gossip协议  流行病协议,流言协议分布式网络,无集中管理节点;节点间点对点传播信息。P2P,BITCOIN,REDISCLUSTER等等简单:扩展性:网络节点可任意......
  • 树莓派集群真的可以顶上一台高性能计算机吗
    参考:​​树莓派集群计算机之集群管理篇​​​​【转载】一起做一个树莓派集群阵列吧-生肉​​   测速工具:​​https://iperf.fr/iperf-download.php​​   ==========......
  • 第14问:在 MGR 集群里,一个节点异常退出后,会发生什么?
    本文关键字:MGR、监控、Wireshark问题在一个MGR集群里,一个节点异常退出后,MySQL会如何进行调度?异常的节点什么时候会被踢出集群?实验实验开始前,给大家分享一个小经验:选择合......
  • 远程连接报错
    一、问题 远程连接报错:出现身份验证错误,要求的函数不受支持   二、解决方法1、window+R键,打开运行窗口,输入“regedit”,打开注册表2、找到注册表路径,计算机(可......
  • (转载)配置SQLServer,允许远程连接
    需要别人远程你的数据库,首先需要的是在一个局域网内,或者连接的是同一个路由器,接下来就是具体步骤:(一)首先是要检查SQLServer数据库服务器中是否允许远程链接。其具体操作为:......
  • k8s集群原理
    目录一、看图说K8S二、K8S的概念和术语三、K8S集群组件1、Master组件2、Node组件3、核心附件四、K8S的网络模型五、Kubernetes的核心对象详解1、Pod资源......
  • Kubernets 集群证书过期解决方式
    前因: 今天打开本地测试k8s集群,执行kubectlgetnodes,直接提示证书过期了,如图:  通过提示确实是过期了,那么我们来使用集群证书命令确认以下:执行kubeadmcertscheck-......
  • 使用Tomcat基于redis的session共享机制集群部署
    常见的session集群方案:session复制和session共享Session复制:指session信息会在集群节点之间复制,每个节点服务上都会有相同的session信息;主要是实现后端多个节点的冗余功......