首页 > 其他分享 >Spark-ShuffleWriter-BypassMergeSortShuffleWriter

Spark-ShuffleWriter-BypassMergeSortShuffleWriter

时间:2024-09-14 13:25:06浏览次数:11  
标签:BypassMergeSortShuffleWriter 分区 writer 写入 ShuffleWriter file Spark 拷贝 DiskBlockO

一、上下文

Spark-ShuffleWriter》中对ShuffleWriter的获取、分类和写入做了简单的分析,下面我们对其中的BypassMergeSortShuffleWriter做更详细的学习

二、创建ShuffleMapOutputWriter

ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
     .createMapOutputWriter(shuffleId, mapId, numPartitions);

public ShuffleMapOutputWriter createMapOutputWriter(...){
  return new LocalDiskShuffleMapOutputWriter(
      shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
}

最终得到的是LocalDiskShuffleMapOutputWriter

其中:

blockId =  "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" 也是文件名称

缓存大小为 32K

三、创建SerializerInstance

final SerializerInstance serInstance = serializer.newInstance();

一次由一个线程使用,在同一个线程中它可以创建多个序列化/反序列化流

四、创建并初始化DiskBlockObjectWriter数组

1、创建

partitionWriters = new DiskBlockObjectWriter[numPartitions];

DiskBlockObjectWriter可以将JVM对象直接写入磁盘文件

数组长度就是下游分区数量,也就是一个分区一个DiskBlockObjectWriter,且每个DiskBlockObjectWriter都保持打开状态,直到该Task的数据全部写入磁盘再进行close

2、初始化

1、使用BlockManager为每个分区创建一个临时块(name="temp_shuffle_" + UUID)和临时文件

2、使用BlockManager为每个分区创建DiskBlockObjectWriter

3、为每个分区的DiskBlockObjectWriter(块写入器)设置校验和

4、将DiskBlockObjectWriter放到对应的块写入器数组中

五、将结果写入临时文件

根据Key计算出分区,根据分区从块写入器数组获取DiskBlockObjectWriter,用DiskBlockObjectWriter将结果写入临时文件

while (records.hasNext()) {
   final Product2<K, V> record = records.next();
   final K key = record._1();
   partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

六、将临时文件封装成FileSegment

我们先看下游标在文件中的表示:

 reportedPosition:上次更新写入指标时的位置

committedPosition:最后一次提交写入后的偏移量

for (int i = 0; i < numPartitions; i++) {
  try (DiskBlockObjectWriter writer = partitionWriters[i]) {
     partitionWriterSegments[i] = writer.commitAndGet();
  }
}


//------------------------------------
 def commitAndGet(): FileSegment = {
    //如果文件还是打开的状态,一般缓冲区里是还有数据的
    if (streamOpen) {
      //将缓冲区的数据刷新磁盘
      objOut.flush()
      bs.flush()
      //关闭输入流
      objOut.close()
      streamOpen = false

      if (syncWrites) {
        // 强制对磁盘进行未完成的写入,并跟踪所需的时间
        val start = System.nanoTime()
        fos.getFD.sync()
        writeMetrics.incWriteTime(System.nanoTime() - start)
      }

      val pos = channel.position()
      //再最后对多个分区的文件进行合并时可以有效利用在内存的部分数据来提升效率
      val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
      committedPosition = pos
      //在某些压缩编解码器中,流关闭后会写入更多字节 
      writeMetrics.incBytesWritten(committedPosition - reportedPosition)
      reportedPosition = committedPosition
      numRecordsWritten = 0
      fileSegment
    } else {
      new FileSegment(file, committedPosition, 0)
    }
  }

七、合并文件

将所有每个分区的文件连接到一个组合文件中,并返回包含每个分区在这个文件中的长度的数组,用于reduce端拉取属于自己分区的数据

partitionLengths = writePartitionedData(mapOutputWriter);


//-------------------------------------------

  private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
    // 分区的跟踪位置从输出文件开始
    if (partitionWriters != null) {
      //获取每个分区的 FileSegment
      final long writeStartTime = System.nanoTime();
      try {
        for (int i = 0; i < numPartitions; i++) {
          final File file = partitionWriterSegments[i].file();
          //创建一个可以打开输出流的写入程序,用于持久化给定分区的数据
          ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
          if (file.exists()) {
            // spark.file.transferTo 默认 true
            // 是否启用零拷贝 默认开启
            if (transferToEnabled) {
              // 使用WritableByteChannelWrapper使此实现和UnsafeShuffleWriter之间的资源关闭保持一致。
              //打开并返回一个WritableByteChannelWrapper,用于将字节从输入字节通道传输到底层shuffle数据存储。
              //在ShuffleMapTask中,此方法只会在分区写入器上调用一次,
              //所有分区共用一个通道进行写入,当所有分区写完后,关闭通道,

              //此方法主要用于高级优化,其中可以将字节从输入溢出文件复制到输出通道,而无需将数据复制到内存中。如果不支持此类优化,则实现应返回{@link Optional#empty()}。默认情况下,实现返回{@link Optional#empty()}。
              Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
              if (maybeOutputChannel.isPresent()) {
                //走零拷贝
                writePartitionedDataWithChannel(file, maybeOutputChannel.get());
              } else {
                //走传统的文件拷贝
                writePartitionedDataWithStream(file, writer);
              }
            } else {
              writePartitionedDataWithStream(file, writer);
            }
            if (!file.delete()) {
              logger.error("Unable to delete file for partition {}", i);
            }
          }
        }
      } finally {
        writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
      }
      partitionWriters = null;
    }
    //提交此对象的{@link#getPartitionWriter(int)}的所有调用返回的所有分区写入器所做的写入,
    //并返回每个分区写入的字节数。
    //这应该确保此模块的分区写入程序进行的写入可用于下游的reduce任务。
    //校验随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。
    //返回的提交消息是一个包含两个组件的结构:
    //    1) 一个long数组,里面包含每个分区写入的字节数
    //    2) 可供ShuffleReader使用的可选元数据blob。
    return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
      .getPartitionLengths();
  }

1、零拷贝合并

从上面我们可以看出,如果满足两个条件就走零拷贝(writePartitionedDataWithChannel)

        1、spark.file.transferTo = true  即开启零拷贝

        2、输出流通道是开启状态

  private void writePartitionedDataWithChannel(
      File file,
      WritableByteChannelWrapper outputChannel) throws IOException {
    boolean copyThrewException = true;
    try {
      FileInputStream in = new FileInputStream(file);
      try (FileChannel inputChannel = in.getChannel()) {
        Utils.copyFileStreamNIO(
            inputChannel, outputChannel.channel(), 0L, inputChannel.size());
        copyThrewException = false;
      } finally {
        Closeables.close(in, copyThrewException);
      }
    } finally {
      Closeables.close(outputChannel, copyThrewException);
    }
  }


//-----------------------------------------

  def copyFileStreamNIO(...){

    var count = 0L
    while (count < bytesToCopy) {
      //在Java中,零拷贝主要通过java.nio包中的FileChannel类来实现。这些方法可以直接将数据从一个文件传输到另一个文件,减少中间的数据拷贝过程
      //    1、FileChannel.transferTo():将数据从文件通道传输到目标通道  利用send file系统调用
      //    2、FileChannel.transferFrom():从源通道读取数据并写入到文件通道 利用mmap和堆外内存
      count += input.transferTo(count + startPosition, bytesToCopy - count, output)
    }

  }

2、普通合并

如果不满足零拷贝条件,再走普通合并(writePartitionedDataWithStream)

  private void writePartitionedDataWithStream(File file, ShufflePartitionWriter writer)
      throws IOException {
    boolean copyThrewException = true;
    FileInputStream in = new FileInputStream(file);
    OutputStream outputStream;
    try {
      outputStream = writer.openStream();
      try {
        Utils.copyStream(in, outputStream, false, false);
        copyThrewException = false;
      } finally {
        Closeables.close(outputStream, copyThrewException);
      }
    } finally {
      Closeables.close(in, copyThrewException);
    }
  }


//----------------------------------------------
  //将所有数据从InputStream复制到OutputStream。
  def copyStream(
      in: InputStream,
      out: OutputStream,
      closeStreams: Boolean = false,
      transferToEnabled: Boolean = false): Long = {
    tryWithSafeFinally {
      //将transferToEnabled显式设置为true 才可以走零拷贝 看源码可知默认不走零拷贝
      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
        && transferToEnabled) {
        //当两个流都是文件流时,使用transferTo来提高复制性能。
        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
        val size = inChannel.size()
        copyFileStreamNIO(inChannel, outChannel, 0, size)
        size
      } else {
        var count = 0L
        val buf = new Array[Byte](8192)
        var n = 0
        while (n != -1) {
          n = in.read(buf)
          if (n != -1) {
            out.write(buf, 0, n)
            count += n
          }
        }
        count
      }
    }
  }

八、向调度器报告摘要信息

ShuffleMapTask向调度器返回的结果。包括任务存储shuffle文件的块管理器地址,以及每个reducer的输出大小,以便传递给reduce任务。

当调度器发现这个ShuffleMapTask执行完成,就会执行下一个ShuffleMapTask或者ResultTask

标签:BypassMergeSortShuffleWriter,分区,writer,写入,ShuffleWriter,file,Spark,拷贝,DiskBlockO
From: https://blog.csdn.net/lu070828/article/details/141965258

相关文章

  • 介绍 Apache Spark 的基本概念和在大数据分析中的应用。
    ApacheSpark是一个快速、通用、可扩展的大数据处理框架,它最初由加州大学伯克利分校的AMPLab开发,并于2010年作为开源项目发布。Spark提供了强大的数据处理能力,旨在通过内存计算来加速数据处理过程,从而比传统的基于磁盘的批处理系统(如HadoopMapReduce)快上数倍至数百......
  • 计算机毕业设计PySpark+Django深度学习游戏推荐系统 游戏可视化 游戏数据分析 游戏爬
    在撰写《PySpark+Django深度学习游戏推荐系统》的开题报告时,建议包括以下内容:###1.研究背景与意义在数字娱乐行业中,游戏推荐系统成为提升用户体验的关键工具。现有的推荐系统大多基于用户行为数据进行推荐,但随着数据量的急剧增加和数据复杂性的提升,传统的推荐算法面临挑战......
  • SparkSQL练习:对学生选课成绩进行分析计算
    题目内容:对学生选课成绩进行分析计算题目要求:(1)该系总共有多少学生;(2)该系共开设来多少门课程;(3)每个学生的总成绩多少;(4)每门课程选修的同学人数;(5)每位同学选修的课程门数;(6)该系DataBase课程共有多少人选修;(7)每位同学平均成绩;数据预览:每行数据包括以下三部分内容:学生姓名,所学......
  • 分享一个基于python的电子书数据采集与可视化分析 hadoop电子书数据分析与推荐系统 sp
    ......
  • 【Spark+Hive】基于大数据招聘数据分析预测推荐系统(完整系统源码+数据库+开发笔记+详
    文章目录【Spark+Hive】基于大数据招聘数据分析预测推荐系统(完整系统源码+数据库+开发笔记+详细部署教程+虚拟机分布式启动教程)源码获取方式在文章末尾一、 项目概述二、研究意义三、背景四、国内外研究现状五、开发技术介绍六、算法介绍 七、数据库设计八、系统......
  • 如何在Spark键值对数据中,对指定的Key进行输出/筛选/模式匹配
    在用键值对RDD进行操作时,经常会遇到不知道如何筛选出想要数据的情况,这里提供了一些解决方法目录1、对固定的Key数据进行查询2、对不固定的Key数据进行模糊查询1、对固定的Key数据进行查询代码说明:SparkConf:配置Spark应用程序的一些基本信息。SparkContext:创建Spark......
  • spark为什么比mapreduce快?
    spark为什么比mapreduce快?首先澄清几个误区:1:两者都是基于内存计算的,任何计算框架都肯定是基于内存的,所以网上说的spark是基于内存计算所以快,显然是错误的2;DAG计算模型减少的是磁盘I/O次数(相比于mapreduce计算模型而言),而不是shuffle次数,因为shuffle是根据数据重组的次数而定,所以shu......
  • Windows系统下的Spark环境配置
    一:Spark的介绍ApacheSpark是一个开源的分布式大数据处理引擎,它提供了一整套开发API,包括流计算和机器学习。Spark支持批处理和流处理,其显著特点是能够在内存中进行迭代计算,从而加快数据处理速度。尽管Spark是用Scala开发的,但它也为Java、Scala、Python和R等高级编程......
  • Pyspark中catalog的作用与常用方法
    文章目录Pysparkcatalog用法catalog介绍cache缓存表uncache清除缓存表cleanCache清理所有缓存表createExternalTable创建外部表currentDatabase返回当前默认库tableExists检查数据表是否存在,包含临时视图databaseExists检查数据库是否存在dropGlobalTempView删......
  • Spark面试高频真题一--Spark基础
    〇、前言Spark是业界常用的大规模分布式数据处理引擎,也是数仓开发最常用的工具组件,通常一二三轮面试官都会或多或少的提问相关的基础问题。下面是总结的常见的面试问题和答案参考。一、Spark基础1.spark和hive的区别是?HiveQL是基于MapReduce框架和HDFS进行数据处理。Spark......