首页 > 其他分享 >大数据 | Spark基本使用示例

大数据 | Spark基本使用示例

时间:2023-09-27 10:35:37浏览次数:75  
标签:基本 Task 示例 -- Executor table Spark spark

欢迎参观我的博客,一个Vue 与 SpringBoot结合的产物:https://poetize.cn

原文链接:https://poetize.cn/article?id=47

基本概念

SparkSubmit(进程)

应用提交的客户端程序。

Driver(线程)

含有 SparkContext 实例的线程。它负责创建逻辑和物理计划,并与集群管理器协调调度任务。

Executor(进程)

Executor 是一个执行 Task 的容器,负责调用 Task 的 runTask 方法来执行 Task 的运算逻辑。

Task

一段计算逻辑的封装对象。

Shuffle

在 Spark 中,Shuffle 是指在不同阶段之间重新分配数据的过程。它通常发生在需要对数据进行聚合或分组操作的时候,例如 reduceByKey 或 groupByKey 等操作。

在 Shuffle 过程中,Spark 会将数据按照键值进行分区,并将属于同一分区的数据发送到同一个计算节点上。这样,每个计算节点就可以独立地处理属于它自己分区的数据。

Spark执行流程

  • 构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone或Yarn)注册并申请运行Executor资源。
  • 资源管理器为Executor分配资源并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上。
  • SparkContext构建DAG图,将DAG图分解成多个Stage,并把每个Stage的TaskSet(任务集)发送给Task Scheduler (任务调度器)。
  • Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor,同时,SparkContext将应用程序代码发放给Executor。
  • Task在Executor上运行,把执行结果反馈给Task Scheduler,然后再反馈给DAG Scheduler。
  • 当一个阶段完成后,Spark会根据数据依赖关系将结果传输给下一个阶段,并开始执行下一个阶段的任务。
  • 最后,当所有阶段都完成后,Spark会将最终结果返回给驱动程序,并完成作业的执行。

创建Spark客户端

SparkConf sparkConf = new SparkConf();
sparkConf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
sparkConf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
sparkConf.set("spark.sql.warehouse.dir", basePath);
sparkConf.set("hive.metastore.uris", metaUri);
System.setProperty("HADOOP_USER_NAME", "root");
SparkSession spark = SparkSession.builder()
        .appName(appName)
        .master(master)
        .config(sparkConf)
        .enableHiveSupport()
        .getOrCreate();

Spark下载文件到本地

Dataset<Row> result = spark.sql("select * from tableName");
result.write().mode(SaveMode.Overwrite).csv("file:///D:\\Spark\\data");

Spark广播变量

广播变量的好处:不是每个Task一份变量副本,而是变成每个节点的Executor才一份副本。这样的话,就可以让变量产生的副本大大减少。

List<String> data = new ArrayList<>();
Broadcast<List<String>> dataBc = javaSparkContext.broadcast(data);

Spark累加器

//生成累加器
LongAccumulator change = spark.sparkContext().longAccumulator();

//在算子中累加
change.add(1L);

//获取结果
long count = change.count()

Spark Dataset 算子

Dataset<JSONObject> jSONObjectDataset = sourceDataset.filter((FilterFunction<JSONObject>) row -> true)
        .map((MapFunction<JSONObject, JSONObject>) row -> new JSONObject(), Encoders.javaSerialization(JSONObject.class));

Dataset<String> result = jSONObjectDataset.map((MapFunction<JSONObject, String>) row -> JSON.toJSONString(row), Encoders.STRING());

创建StructType

List<String> fields = ...;
List<StructField> structFields = fields.stream().map(f -> DataTypes.createStructField(f, DataTypes.StringType, true)).collect(Collectors.toList());
StructType structType = DataTypes.createStructType(structFields);

使用StructType创建Row

StructType schema = new StructType(new StructField[]{
        new StructField("id", DataTypes.StringType, true, Metadata.empty()),
        new StructField("title", DataTypes.StringType, true, Metadata.empty())
});

JavaRDD<Row> rowJavaRDD = jsonJavaRDD.map(json -> RowFactory.create(
        json.getString("id"),
        json.getString("title")
        )
);

Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, schema);
dataFrame.write().partitionBy("partition_time_hive").mode(SaveMode.Overwrite).orc(tablePath);

使用Java对象创建Row

//JavaData需要实现Serializable
JavaRDD<JavaData> javaData = ...;
Dataset<Row> rowDataset = sparkSession.createDataFrame(javaData, JavaData.class);

从JVM内存中创建Dataset

List<JSONObject> cleanList = ...;
Dataset<JSONObject> dataset = spark.createDataset(cleanList, Encoders.javaSerialization(JSONObject.class));

Spark分区写入Hdfs数据

public class Partitioner extends Partitioner {

    private final int numPartitions;

    public PersonPartitioner(int numPartitions) {
        this.numPartitions = numPartitions;
    }

    @Override
    public int numPartitions() {
    	//传入的分区数
        return numPartitions;
    }

    @Override
    public int getPartition(Object key) {
    	//根据key获取分区
        int partition = ...;
        return partition;
    }
}

public static void writeToHdfs(Dataset<JSONObject> result, String path) {
    JavaPairRDD<String, String> pairData = result.toJavaRDD().mapToPair(map -> new Tuple2<>(map.getString("status"), JSON.toJSONString(map)));
    pairData.partitionBy(new Partitioner(2)).map(tuple -> tuple._2).saveAsTextFile(path);
}

Spark分区写入Mysql数据

public static final String TABLE_TEMP_SUFFIX = "_temp";

public static void writeToMysql(Dataset<Row> result, JdbcConfig jdbcConfig, String table, DataSource dataSource) {
    try {
        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "create table " + table + EtlConstant.TABLE_TEMP_SUFFIX + " like " + table);
        }

        result.write().format("jdbc")
                .mode(SaveMode.Overwrite)
                .option("driver", jdbcConfig.getDriverClassName())
                .option("url", jdbcConfig.getUrl())
                .option("dbtable", table + EtlConstant.TABLE_TEMP_SUFFIX)
                .option("user", jdbcConfig.getUsername())
                .option("password", jdbcConfig.getPassword())
                //JDBC批大小,默认 1000,灵活调整该值可以提高写入性能
                .option("batchsize", 1000)
                //事务级别,默认为 READ_UNCOMMITTED,无事务要求可以填 NONE 以提高性能
                .option("isolationLevel", "NONE")
                //需要注意该配置项,Overwrite 模式下,不设置为 true 会删表重建
                .option("truncate", "true")
                .save();

        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "drop table if exists " + table);
            JdbcUtils.executeSql(connection, "rename table " + table + EtlConstant.TABLE_TEMP_SUFFIX + " to " + table);
        }
    } finally {
        try (Connection connection = dataSource.getConnection()) {
            JdbcUtils.executeSql(connection, "drop table if exists " + table + EtlConstant.TABLE_TEMP_SUFFIX);
        }
    }
}

Spark任务提交的Yarn

/opt/spark-3.0.0/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.am.waitTime=6000 \
--conf spark.sql.broadcastTimeout=6000 \
--conf spark.network.timeout=600 \
--num-executors 1 \
--driver-memory 3G \
--executor-memory 3G \
--jars /home/developer/base-spark.jar \
--class com.task.dwd.Task \
/home/developer/spark.jar dev yarn

标签:基本,Task,示例,--,Executor,table,Spark,spark
From: https://www.cnblogs.com/loveer/p/17732073.html

相关文章

  • 大数据 | Hive使用示例
    欢迎参观我的博客,一个Vue与SpringBoot结合的产物:https://poetize.cn博客:https://gitee.com/littledokey/poetize-vue2.git聊天室:https://gitee.com/littledokey/poetize-im-vue3.git后端:https://gitee.com/littledokey/poetize.git七牛云登录/注册地址(文件服务器,CDN):https:/......
  • SpringBoot | 加密算法使用示例
    欢迎参观我的博客,一个Vue与SpringBoot结合的产物:https://poetize.cn博客:https://gitee.com/littledokey/poetize-vue2.git聊天室:https://gitee.com/littledokey/poetize-im-vue3.git后端:https://gitee.com/littledokey/poetize.git七牛云登录/注册地址(文件服务器,CDN):https:/......
  • SpringBoot | Dubbo之Filter使用示例
    欢迎参观我的博客,一个Vue与SpringBoot结合的产物:https://poetize.cn博客:https://gitee.com/littledokey/poetize-vue2.git聊天室:https://gitee.com/littledokey/poetize-im-vue3.git后端:https://gitee.com/littledokey/poetize.git七牛云登录/注册地址(文件服务器,CDN):https:/......
  • macos M1安装MongoDB及基本crud操作
    安装:官网教程InstallMongoDBCommunityEditiononmacOS—MongoDBManual我看的是这个教程:Mac安装MongoDb保姆级教程以及踩坑笔记(图文详解(xjx100.cn) 里面有详细步骤及报错/解决.庆幸我在安装过程没遇到什么错误MongoDB教程MongoDB教程|菜鸟教程(runoob.com)Mo......
  • 【C语言初阶篇】for语句的基本语法和使用规则!
    <br><br/><center><fontcolor="#006666">......
  • Golang method | Interfaces 示例
    方法与接口(methodInterface) packageinterFacesimport("fmt""log")//managerAppstoreonlineaccounttypeAccountstruct{surNamestringgivenNamestring}//方法的调用如果需要对调用对象做修改操作,则需要使用`*`引用其指针创建方法。func(a*Acc......
  • 本地测试Spark的svm算法
    上一篇介绍了逻辑回归算法,发现分类效果不好,通过这次的svm发现是因为训练数据不行,于是网上找了部分训练数据,发现实际上分类效果还可以。    训练数据,第一个值是标签,下面的数据是某种花的相关特征。1|5.1,3.5,1.4,0.21|4.9,3,1.4,0.21|4.7,3.2,1.3,0.21|4.6,3.1,1.5,0.21......
  • ERP基本流程
    ERP基本流程对于进销存管理系统来讲,我在进行分析和查阅资料后,得到的基本流程如下:1.需求分析和规划:1)确定企业业务的需求和目标2)总结目前企业运作存在的问题以及业务流程的瓶颈3)指定ERP系统的实施计划和时间表2.系统设计和选择1)定义业务流程和功能需求2)评估调研不同供应商所......
  • 【北亚企安数据恢复】Ceph分布式存储基本架构&概念&Ceph数据恢复流程
    Ceph存储基本架构:Ceph存储可分为块存储,对象存储和文件存储。Ceph基于对象存储,对外提供三种存储接口,故称为统一存储。Ceph的底层是RADOS(分布式对象存储系统),RADOS由两部分组成:OSD和MON。MON负责监控整个集群,维护集群的健康状态,维护展示集群状态的各种图表,如OSDMap、MonitorMap、......
  • git基本用法
    应用场景:从master分支copy一个本地分支作为开发分支1、查看当前分支(当前分支可以直接查看或者命令查看)gitbranch或者gitstatus2、切换分支到master(你在哪个分支创建本地分支,就切换到那个分支,比如master/uat等)gitcheckoutmaster3、将代码更新到最新版本gitpul......