首页 > 其他分享 >Alluxio读写流程

Alluxio读写流程

时间:2023-11-10 18:00:46浏览次数:33  
标签:流程 写入 读写 worker new UFS Alluxio block

1. 背景

https://blog.51cto.com/u_15327484文章中,介绍了Alluxio的架构。本文基于此,介绍Alluxio文件读写流程。Alluxio读写流程几乎和HDFS一致,只是Worker多了一个从UFS读写的选项,本文会省略部分流程,只介绍重点。

2. Alluxio写流程

客户端向Alluxio写数据时,可以指定是否就Alluxio中的数据写到UFS中。写UFS数据时,还可以指定是同步写入还是异步写入。Alluxio有四种写数据的模式:

  1. MUST_CACHE:数据只写到Alluxio Worker中。
  2. CACHE_THROUGH:数据写到Alluxio Woker中,并同步到UFS中。
  3. ASYNC_THTOUGH:数据写到Alluxio Worker中,并异步持久化到UFS中。
  4. THROUGH:只持久化到UFS中。

2.1 Alluxio客户端发起写入操作

业务代码中,对Alluxio写入一般先调用createFile,再开始write写入数据:

FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Create a file and get its output stream
FileOutStream out = fs.createFile(path);
// Write data
out.write(...);
// Close and complete file
out.close();

客户端执行写入的流程如下:

Untitled.png

对于createFile请求,最终会调用RetryHandlingFileSystemMasterClient.createFile,通过grpc向Leader Master发起请求,Leader会将元数据在内存中进行修改,并将entry持久化到所有master的磁盘中:

public URIStatus createFile(final AlluxioURI path, final CreateFilePOptions options)
      throws AlluxioStatusException {
    return retryRPC(
        () -> new URIStatus(GrpcUtils.fromProto(mClient.createFile(CreateFilePRequest.newBuilder()
            .setPath(getTransportPath(path)).setOptions(options).build()).getFileInfo())),
        RPC_LOG, "CreateFile", "path=%s,options=%s", path, options);
  }

FileOutStream.write方法最终会调用AlluxioFileOutStream.writeInternal,首先检查是否需要创建一个block,然后准备向Worker中写入数据:

private void writeInternal(int b) throws IOException {
    if (mShouldCacheCurrentBlock) {
      try {
        if (mCurrentBlockOutStream == null || mCurrentBlockOutStream.remaining() == 0) {
          //按需请求新增block
          getNextBlock();
        }
        //写入数据
        mCurrentBlockOutStream.write(b);
      } catch (IOException e) {
        handleCacheWriteException(e);
      }
    }

    if (mUnderStorageType.isSyncPersist()) {
      mUnderStorageOutputStream.write(b);
      Metrics.BYTES_WRITTEN_UFS.inc();
    }
    mBytesWritten++;
  }

getNextBlock方法中,最终调用AlluxioFileOutStream.getNewBlockIdForFile通过grpc请求创建一个block:

public long getNewBlockIdForFile(final AlluxioURI path)
      throws AlluxioStatusException {
    return retryRPC(
        () -> mClient.getNewBlockIdForFile(
            GetNewBlockIdForFilePRequest.newBuilder().setPath(getTransportPath(path))
                .setOptions(GetNewBlockIdForFilePOptions.newBuilder().build()).build())
            .getId(),
        RPC_LOG, "GetNewBlockIdForFile", "path=%s", path);
  }

BlockOutStream.write会调用writeInternal方法写chunk,每个chunk默认1MB:

private void writeInternal(ByteBuf b, int off, int len) throws IOException {
    if (len == 0) {
      return;
    }

    while (len > 0) {
      updateCurrentChunk(false);
      int toWrite = Math.min(len, mCurrentChunk.writableBytes());
      mCurrentChunk.writeBytes(b, off, toWrite);
      off += toWrite;
      len -= toWrite;
    }
    updateCurrentChunk(false);
  }

写满一个chunk后,执行updateCurrentChunk方法,将数据输出到worker中:

private final List<DataWriter> mDataWriters;
if (mCurrentChunk.writableBytes() == 0 || lastChunk) {
      try {
        if (mCurrentChunk.readableBytes() > 0) {
          for (DataWriter dataWriter : mDataWriters) {
            mCurrentChunk.retain();
            dataWriter.writeChunk(mCurrentChunk.duplicate());
          }
        } else {
          Preconditions.checkState(lastChunk);
        }
      } finally {
        // If the packet has bytes to read, we increment its refcount explicitly for every packet
        // writer. So we need to release here. If the packet has no bytes to read, then it has
        // to be the last packet. It needs to be released as well.
        mCurrentChunk.release();
        mCurrentChunk = null;
      }
    }

最终执行GrpcDataWriter.writeChunk写入到mStream中,可以看到mStream其实就是客户端向worker发起writeBlock请求后的输出流:

private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
//mStream通过向worker发起writeBlock请求,获得输出流
mStream = new GrpcBlockingStream<>(mClient.get()::writeBlock, writerBufferSizeMessages,
          MoreObjects.toStringHelper(this)
              .add("request", mPartialRequest)
              .add("address", address)
              .toString());

public void writeChunk(final ByteBuf buf) throws IOException {
    mPosToQueue += buf.readableBytes();
    try {
      WriteRequest request = WriteRequest.newBuilder().setCommand(mPartialRequest).setChunk(
          Chunk.newBuilder()
              .setData(UnsafeByteOperations.unsafeWrap(buf.nioBuffer()))
              .build()).build();
      if (mStream instanceof GrpcDataMessageBlockingStream) {
        //客户端通过输出流,将数据发送给worker
        ((GrpcDataMessageBlockingStream<WriteRequest, WriteResponse>) mStream)
            .sendDataMessage(new DataMessage<>(request, new NettyDataBuffer(buf)), mDataTimeoutMs);
      } else {
        mStream.send(request, mDataTimeoutMs);
      }
    } finally {
      buf.release();
    }
  }

2.2 Worker处理写入请求

下图是block数据既写入Worker,又写入UFS流程:

Untitled 1.png

其中,重要流程是DelegationWriteHandler.createWriterHandler方法,根据用户请求,可以执行三种不同的写入逻辑:

  1. ALLUXIO_BLOCK:block数据直接入到Worker中。
  2. UFS_FILE:block数据只写入到UFS中。
  3. UFS_FALLBACK_BLOCK:block数据写入到Worker和UFS中。

DelegationWriteHandler.createWriterHandler方法如下:

private AbstractWriteHandler createWriterHandler(alluxio.grpc.WriteRequest request) {
    switch (request.getCommand().getType()) {
      case ALLUXIO_BLOCK:
        return new BlockWriteHandler(mBlockWorker, mResponseObserver,
            mUserInfo, mDomainSocketEnabled);
      case UFS_FILE:
        return new UfsFileWriteHandler(mUfsManager, mResponseObserver,
            mUserInfo);
      case UFS_FALLBACK_BLOCK:
        return new UfsFallbackBlockWriteHandler(
            mBlockWorker, mUfsManager, mResponseObserver, mUserInfo, mDomainSocketEnabled);
      default:
        throw new IllegalArgumentException(String.format("Invalid request type %s",
            request.getCommand().getType().name()));
    }
  }

UfsFallbackBlockWriteHandler.writeBuf中,先通过BlockWriteHandler.writeBuf将数据写入到worker中,再写入到UFS中:

protected void writeBuf(BlockWriteRequestContext context,
      StreamObserver<WriteResponse> responseObserver, DataBuffer buf, long pos) throws Exception {
    if (context.isWritingToLocal()) {
      long posBeforeWrite = pos - buf.readableBytes();
      try {
        //写入到worker中
        mBlockWriteHandler.writeBuf(context, responseObserver, buf, pos);
        return;
      } //省略
      // close the block writer first
      if (context.getBlockWriter() != null) {
        context.getBlockWriter().close();
      }
      // prepare the UFS block and transfer data from the temp block to UFS
      //创建Ufs的Block,如果不存在文件,就在Ufs中创建文件
      createUfsBlock(context);
      if (posBeforeWrite > 0) {
        //传输数据到Ufs中
        transferToUfsBlock(context, posBeforeWrite);
      }
      // close the original block writer and remove the temp file
      mBlockWriteHandler.cancelRequest(context);
    }
    if (context.getOutputStream() == null) {
      createUfsBlock(context);
    }
    buf.readBytes(context.getOutputStream(), buf.readableBytes());
  }

createUfsBlock方法中,就负责将worker中的数据读取出来,写入到UFS中:

private void transferToUfsBlock(BlockWriteRequestContext context, long pos) throws IOException {
    OutputStream ufsOutputStream = context.getOutputStream();
    long blockId = context.getRequest().getId();
    Optional<TempBlockMeta> block = mWorker.getBlockStore().getTempBlockMeta(blockId);
    Preconditions.checkState(block.isPresent()
        && Files.copy(Paths.get(block.get().getPath()), ufsOutputStream) == pos);
  }

3. Alluxio读流程

3.1 客户端发起读流程

业务读取Alluxio流程代码如下所示:

FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Open the file for reading
FileInStream in = fs.openFile(path);
// Read data
in.read(...);
// Close file relinquishing the lock
in.close();

openFile主要是进行初始化操作,将配置信息封装到AlluxioFileInStream对象中。随后执行AlluxioFileInStream.read方法开始读取。

AlluxioFileInStream.read方法经过一系列的调用,执行BlockInStream.readChunk一次读取每个chunk:

private void readChunk() throws IOException {
    if (mDataReader == null) {
      mDataReader = mDataReaderFactory.create(mPos, mLength - mPos);
    }

    if (mCurrentChunk != null && mCurrentChunk.readableBytes() == 0) {
      mCurrentChunk.release();
      mCurrentChunk = null;
    }
    if (mCurrentChunk == null) {
      mCurrentChunk = mDataReader.readChunk();
    }
  }

最终调用GrpcDataReader.readChunkInternal方法读取block。GrpcDataReader首先会通过执行readBlock访问worker获取输入流,在读取输入流中的数据:


private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
//调用readBlock访问worker,构建输入流
mStream = new GrpcDataMessageBlockingStream<>(mClient.get()::readBlock,
            readerBufferSizeMessages,
            desc, null, mMarshaller);

private DataBuffer readChunkInternal() throws IOException {
    Preconditions.checkState(!mClient.get().isShutdown(),
        "Data reader is closed while reading data chunks.");
    DataBuffer buffer = null;
    ReadResponse response = null;
    //获取数据
      DataMessage<ReadResponse, DataBuffer> message =
          ((GrpcDataMessageBlockingStream<ReadRequest, ReadResponse>) mStream)
              .receiveDataMessage(mDataTimeoutMs);
    //忽略
    return buffer;
  }

3.2 worker服务端处理读流程

Worker接收读请求后,执行流程如下所示:

Untitled 2.png

Worker服务端收到readBlock请求后,BlockReadHandler.getDataBuffer方法负责获取block输入流,然后返回给客户端:

protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset, int len)
        throws Exception {
      @Nullable
      BlockReader blockReader = null;
      try {
        //构建blockReader
        openBlock(context);
        openMs = System.currentTimeMillis() - startMs;
        blockReader = context.getBlockReader();
        Preconditions.checkState(blockReader != null);
        startTransferMs = System.currentTimeMillis();
        ByteBuf buf;
        switch (mBlockStoreType) {
          case PAGE:
            if (mIsReaderBufferPooled) {
              buf = PooledDirectNioByteBuf.allocate(len);
            } else {
              buf = Unpooled.directBuffer(len, len);
            }
            try {
              //blockReader读取数据
              while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) {
              }
              return new NettyDataBuffer(buf.retain());
            } finally {
              buf.release();
            }
         //省略
    }

openBlock方法负责构建BlockReader,它最后调用PagedBlockStore.createBlockReader。它的逻辑如下:

  1. 如果block元数据存在,通过元数据读取worker已经存在的block。
  2. 如果block元数据不存在,就访问UFS读取block数据;并检查是否需要写入block到worker中,如果不需要worker缓存,数据会直接从UFS中通过worker传输到客户端。
public BlockReader createBlockReader(long sessionId, long blockId, long offset,
                                       boolean positionShort, Protocol.OpenUfsBlockOptions options)
      throws IOException {
    BlockLock blockLock = mLockManager.acquireBlockLock(sessionId, blockId, BlockLockType.READ);
    //查看worker中是否存在该block,存在就构建reader读worker中的block,获取读锁
    try (LockResource lock = new LockResource(mPageMetaStore.getLock().readLock())) {
      Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
      if (blockMeta.isPresent()) {
        final BlockPageEvictor evictor = blockMeta.get().getDir().getEvictor();
        evictor.addPinnedBlock(blockId);
        return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
          evictor.removePinnedBlock(blockId);
          unpinBlock(blockLock);
        });
      }
    }
    // this is a block that needs to be read from UFS
    //如果worker中不存在block,就要申请写锁从UFS读取block写入到worker中
    try (LockResource lock = new LockResource(mPageMetaStore.getLock().writeLock())) {
      // in case someone else has added this block while we wait for the lock,
      // just use the block meta; otherwise create a new one and add to the metastore
      Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
      //获取到写锁后如果worker又缓存了block,依然读取worker中的block
      if (blockMeta.isPresent()) {
        blockMeta.get().getDir().getEvictor().addPinnedBlock(blockId);
        return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
          blockMeta.get().getDir().getEvictor().removePinnedBlock(blockId);
          unpinBlock(blockLock);
        });
      }
      long blockSize = options.getBlockSize();
      PagedBlockStoreDir dir =
          (PagedBlockStoreDir) mPageMetaStore.allocate(BlockPageId.fileIdOf(blockId, blockSize),
              blockSize);
      PagedBlockMeta newBlockMeta = new PagedBlockMeta(blockId, blockSize, dir);
      //如果用户设置了不缓存block到worker,就直接读取UFS
      if (options.getNoCache()) {
        // block does not need to be cached in Alluxio, no need to add and commit it
        unpinBlock(blockLock);
        final UfsBlockReadOptions readOptions;
        try {
          readOptions = UfsBlockReadOptions.fromProto(options);
        } catch (IllegalArgumentException e) {
          throw new AlluxioRuntimeException(Status.INTERNAL,
              String.format("Block %d may need to be read from UFS, but key UFS read options "
                  + "is missing in client request", blockId), e, ErrorType.Internal, false);
        }
        //构建一个UFSBlockReader
        return new PagedUfsBlockReader(mUfsManager, mUfsInStreamCache, newBlockMeta,
            offset, readOptions, mPageSize);
      }
      //新构建一个block
      mPageMetaStore.addBlock(newBlockMeta);
      dir.getEvictor().addPinnedBlock(blockId);
      //默认情况下,将block信息从UFS中写入到worker中
      return new DelegatingBlockReader(getBlockReader(newBlockMeta, offset, options), () -> {
        //先
        commitBlockToMaster(newBlockMeta);
        newBlockMeta.getDir().getEvictor().removePinnedBlock(blockId);
        unpinBlock(blockLock);
      });
    }
  }

4. HDFS客户端访问Alluxio过程

HDFS的配置文件core-site.xml中增加Alluxio文件系统配置:

<property>
        <name>fs.alluxio.impl</name>
        <value>alluxio.hadoop.FileSystem</value>
  </property>
  <property>
        <name>fs.AbstractFileSystem.alluxio.impl</name>
        <value>alluxio.hadoop.AlluxioFileSystem</value>
        <description>The Alluxio AbstractFileSystem (Hadoop 2.x)</description>
  </property>

在hadoop的安装包hadoop/share/hadoop/common中,增加alluxio SDK包alluxio-client.jar。

随后,业务方在代码中设置alluxio前缀的访问:

public void create() throws URISyntaxException, IOException, InterruptedException {
        // 配置文件
        Configuration conf = new Configuration();
        // 获取文件系统
        FileSystem fs = FileSystem.get(new URI("alluxio://{alluxio leader}/path"), conf, 访问的用户);
        // 创建文件并写入数据
        FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
        out.write("Hello, HDFS".getBytes());
        out.flush();
        // 关闭流
        fs.close();
    }

FileSystem解析到alluixo前缀,从而获取fs.alluxio.impl配置alluxio.hadoop.FileSystem,通过alluxio.hadoop.FileSystem创建Alluxio的文件系统客户端对象:

private static FileSystem createFileSystem(URI uri, Configuration conf)
      throws IOException {
    Tracer tracer = FsTracer.get(conf);
    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
      scope.addKVAnnotation("scheme", uri.getScheme());
      //查询core-site.xml中fs.alluxio.impl的配置,为alluxio.hadoop.FileSystem
      Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
      //通过反射创建FileSystem子类alluxio.hadoop.FileSystemm对象
      FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
      //alluxio.hadoop.FileSystem初始化操作
      fs.initialize(uri, conf);
      return fs;
    }
  }

标签:流程,写入,读写,worker,new,UFS,Alluxio,block
From: https://blog.51cto.com/u_15327484/8305929

相关文章

  • (四)Spring源码解析:bean的加载流程解析
    一、概述在前几讲中,我们着重的分析了Spring对xml配置文件的解析和注册过程。那么,本节内容,将会试图分析一下bean的加载过程。具体代码,如下图所示:1.1>doGetBean(...)针对bean的创建和加载,我们可以看出来逻辑都是在doGetBean(...)这个方法中的,所以,如下就是针对于这个方法的整体源码注......
  • python读写文件
    str="Hello,World!"#打开一个文件,如果文件不存在,它将被创建file=open("my_file.txt","w")#将字符串写入文件file.write(str)#关闭文件file.close()#重新打开文件以读取内容file=open("my_file.txt","r")content=file.read()file.clos......
  • 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSyst
    文章目录Flink系列文章一、Table&SQLConnectors1、概述2、支持的外部连接3、使用示例:kafka4、Transformtableconnector/formatresources5、SchemaMapping6、Metadata7、PrimaryKey8、TimeAttributes9、ProctimeAttributes10、RowtimeAttributes11、完整示例1)、建表2)、......
  • 关于设备节点的读写权限 ---wrx
    涉及到外设接口访问权限,需要开启外设访问的节点,对用户开通读写权限确定外设节点,可询问驱动,查看节点liunx读写权限||手动调试修改设备中节点的读写权限||adb调试指令如下 ......
  • 流程控制
    运算符赋值——=+=-=*=/=将等号右边的值赋予给左边,要求左边必须是一个容器//赋值运算符letnum=4num+=5console.log(num)一元——正负号//一元运算符leti=1//i++++iconsole.log(......
  • JVM启动流程和基本结构
    JVM启动流程和基本结构JVM启动流程JVM基本结构详细介绍内存空间作用Java堆(Heap)对于大多数应用来说,Java堆(JavaHeap)是Java虚拟机所管理的内存中最大的一块。Java堆是被所有线程共享的一块内存区域,在虚拟机启动时创建。此内存区域的唯一目的就是存放对象实例,几乎所有的对象实例都在这......
  • Apache Ratis在Alluxio中应用
    1.背景在alluxio1.8中,alluxiomaster只支持单节点部署,一旦挂掉,整个集群将不可用。alluxio2.x后,提供了高可用方案:Alluxio组件中嵌入ApacheRatis代码,由Ratis负责选举leader,Alluxio的各个master在同步editlog时,由Ratis提供editlog的一致性传输。Ratis服务基于Raft共识算法,该算......
  • Day03java流程控制
    所有学习内容来自:狂神说javaJava流程控制一、用户交互ScannerScanner对象java提供了这样一个工具类可以获取用户的输入。java.util.Scanner是java5的新特征,可以通过Scanner类来获取用户的输入。基本语法:Scanners=newScanner(System.in)使用next()与nextLine()方法获取......
  • CFS(四)新任务的创建流程
    前言新任务产生接口有clone、fork等系统调用,这些系统调用的都是通过do_fork函数实现。本文主要对do_fork中CFS新任务的调度初始化过程进行了探究,看看一个CFS新任务如何完成调度信息的初始化以及进入就绪队列的。CFS的调度信息初始化long_do_fork(...){ /*任务信息初始化*/......
  • 聊城实用新型专利申请的流程是什么
    聊城实用新型专利申请的流程是什么恒标知产刘经理申请实用新型专利的流程包括以下几个步骤:首先需进行实用性检索,确定专利主题是否已被他人申请或授权;然后撰写专利申请书并提交相关材料;接着进行初审和复审,并可能需要进行答辩;最后,专利被授权后,需要缴纳年费来维持专利权利。实用新型专......