一、上下文
《Spark-ShuffleWriter》中对ShuffleWriter的获取、分类和写入做了简单的分析,下面我们对其中的BypassMergeSortShuffleWriter做更详细的学习
二、创建ShuffleMapOutputWriter
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
public ShuffleMapOutputWriter createMapOutputWriter(...){
return new LocalDiskShuffleMapOutputWriter(
shuffleId, mapTaskId, numPartitions, blockResolver, sparkConf);
}
最终得到的是LocalDiskShuffleMapOutputWriter
其中:
blockId = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" 也是文件名称
缓存大小为 32K
三、创建SerializerInstance
final SerializerInstance serInstance = serializer.newInstance();
一次由一个线程使用,在同一个线程中它可以创建多个序列化/反序列化流
四、创建并初始化DiskBlockObjectWriter数组
1、创建
partitionWriters = new DiskBlockObjectWriter[numPartitions];
DiskBlockObjectWriter可以将JVM对象直接写入磁盘文件
数组长度就是下游分区数量,也就是一个分区一个DiskBlockObjectWriter,且每个DiskBlockObjectWriter都保持打开状态,直到该Task的数据全部写入磁盘再进行close
2、初始化
1、使用BlockManager为每个分区创建一个临时块(name="temp_shuffle_" + UUID)和临时文件
2、使用BlockManager为每个分区创建DiskBlockObjectWriter
3、为每个分区的DiskBlockObjectWriter(块写入器)设置校验和
4、将DiskBlockObjectWriter放到对应的块写入器数组中
五、将结果写入临时文件
根据Key计算出分区,根据分区从块写入器数组获取DiskBlockObjectWriter,用DiskBlockObjectWriter将结果写入临时文件
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
六、将临时文件封装成FileSegment
我们先看下游标在文件中的表示:
reportedPosition:上次更新写入指标时的位置
committedPosition:最后一次提交写入后的偏移量
for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
partitionWriterSegments[i] = writer.commitAndGet();
}
}
//------------------------------------
def commitAndGet(): FileSegment = {
//如果文件还是打开的状态,一般缓冲区里是还有数据的
if (streamOpen) {
//将缓冲区的数据刷新磁盘
objOut.flush()
bs.flush()
//关闭输入流
objOut.close()
streamOpen = false
if (syncWrites) {
// 强制对磁盘进行未完成的写入,并跟踪所需的时间
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incWriteTime(System.nanoTime() - start)
}
val pos = channel.position()
//再最后对多个分区的文件进行合并时可以有效利用在内存的部分数据来提升效率
val fileSegment = new FileSegment(file, committedPosition, pos - committedPosition)
committedPosition = pos
//在某些压缩编解码器中,流关闭后会写入更多字节
writeMetrics.incBytesWritten(committedPosition - reportedPosition)
reportedPosition = committedPosition
numRecordsWritten = 0
fileSegment
} else {
new FileSegment(file, committedPosition, 0)
}
}
七、合并文件
将所有每个分区的文件连接到一个组合文件中,并返回包含每个分区在这个文件中的长度的数组,用于reduce端拉取属于自己分区的数据
partitionLengths = writePartitionedData(mapOutputWriter);
//-------------------------------------------
private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
// 分区的跟踪位置从输出文件开始
if (partitionWriters != null) {
//获取每个分区的 FileSegment
final long writeStartTime = System.nanoTime();
try {
for (int i = 0; i < numPartitions; i++) {
final File file = partitionWriterSegments[i].file();
//创建一个可以打开输出流的写入程序,用于持久化给定分区的数据
ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
if (file.exists()) {
// spark.file.transferTo 默认 true
// 是否启用零拷贝 默认开启
if (transferToEnabled) {
// 使用WritableByteChannelWrapper使此实现和UnsafeShuffleWriter之间的资源关闭保持一致。
//打开并返回一个WritableByteChannelWrapper,用于将字节从输入字节通道传输到底层shuffle数据存储。
//在ShuffleMapTask中,此方法只会在分区写入器上调用一次,
//所有分区共用一个通道进行写入,当所有分区写完后,关闭通道,
//此方法主要用于高级优化,其中可以将字节从输入溢出文件复制到输出通道,而无需将数据复制到内存中。如果不支持此类优化,则实现应返回{@link Optional#empty()}。默认情况下,实现返回{@link Optional#empty()}。
Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
if (maybeOutputChannel.isPresent()) {
//走零拷贝
writePartitionedDataWithChannel(file, maybeOutputChannel.get());
} else {
//走传统的文件拷贝
writePartitionedDataWithStream(file, writer);
}
} else {
writePartitionedDataWithStream(file, writer);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
} finally {
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
}
//提交此对象的{@link#getPartitionWriter(int)}的所有调用返回的所有分区写入器所做的写入,
//并返回每个分区写入的字节数。
//这应该确保此模块的分区写入程序进行的写入可用于下游的reduce任务。
//校验随机数据损坏原因的随机扩展应该正确存储校验和。当发生损坏时,Spark会向shuffle扩展提供所获取分区的校验和,以帮助诊断损坏的原因。
//返回的提交消息是一个包含两个组件的结构:
// 1) 一个long数组,里面包含每个分区写入的字节数
// 2) 可供ShuffleReader使用的可选元数据blob。
return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
.getPartitionLengths();
}
1、零拷贝合并
从上面我们可以看出,如果满足两个条件就走零拷贝(writePartitionedDataWithChannel)
1、spark.file.transferTo = true 即开启零拷贝
2、输出流通道是开启状态
private void writePartitionedDataWithChannel(
File file,
WritableByteChannelWrapper outputChannel) throws IOException {
boolean copyThrewException = true;
try {
FileInputStream in = new FileInputStream(file);
try (FileChannel inputChannel = in.getChannel()) {
Utils.copyFileStreamNIO(
inputChannel, outputChannel.channel(), 0L, inputChannel.size());
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
} finally {
Closeables.close(outputChannel, copyThrewException);
}
}
//-----------------------------------------
def copyFileStreamNIO(...){
var count = 0L
while (count < bytesToCopy) {
//在Java中,零拷贝主要通过java.nio包中的FileChannel类来实现。这些方法可以直接将数据从一个文件传输到另一个文件,减少中间的数据拷贝过程
// 1、FileChannel.transferTo():将数据从文件通道传输到目标通道 利用send file系统调用
// 2、FileChannel.transferFrom():从源通道读取数据并写入到文件通道 利用mmap和堆外内存
count += input.transferTo(count + startPosition, bytesToCopy - count, output)
}
}
2、普通合并
如果不满足零拷贝条件,再走普通合并(writePartitionedDataWithStream)
private void writePartitionedDataWithStream(File file, ShufflePartitionWriter writer)
throws IOException {
boolean copyThrewException = true;
FileInputStream in = new FileInputStream(file);
OutputStream outputStream;
try {
outputStream = writer.openStream();
try {
Utils.copyStream(in, outputStream, false, false);
copyThrewException = false;
} finally {
Closeables.close(outputStream, copyThrewException);
}
} finally {
Closeables.close(in, copyThrewException);
}
}
//----------------------------------------------
//将所有数据从InputStream复制到OutputStream。
def copyStream(
in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
transferToEnabled: Boolean = false): Long = {
tryWithSafeFinally {
//将transferToEnabled显式设置为true 才可以走零拷贝 看源码可知默认不走零拷贝
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
//当两个流都是文件流时,使用transferTo来提高复制性能。
val inChannel = in.asInstanceOf[FileInputStream].getChannel()
val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
val size = inChannel.size()
copyFileStreamNIO(inChannel, outChannel, 0, size)
size
} else {
var count = 0L
val buf = new Array[Byte](8192)
var n = 0
while (n != -1) {
n = in.read(buf)
if (n != -1) {
out.write(buf, 0, n)
count += n
}
}
count
}
}
}
八、向调度器报告摘要信息
ShuffleMapTask向调度器返回的结果。包括任务存储shuffle文件的块管理器地址,以及每个reducer的输出大小,以便传递给reduce任务。
当调度器发现这个ShuffleMapTask执行完成,就会执行下一个ShuffleMapTask或者ResultTask
标签:BypassMergeSortShuffleWriter,分区,writer,写入,ShuffleWriter,file,Spark,拷贝,DiskBlockO From: https://blog.csdn.net/lu070828/article/details/141965258