1. 背景
在https://blog.51cto.com/u_15327484文章中,介绍了Alluxio的架构。本文基于此,介绍Alluxio文件读写流程。Alluxio读写流程几乎和HDFS一致,只是Worker多了一个从UFS读写的选项,本文会省略部分流程,只介绍重点。
2. Alluxio写流程
客户端向Alluxio写数据时,可以指定是否就Alluxio中的数据写到UFS中。写UFS数据时,还可以指定是同步写入还是异步写入。Alluxio有四种写数据的模式:
- MUST_CACHE:数据只写到Alluxio Worker中。
- CACHE_THROUGH:数据写到Alluxio Woker中,并同步到UFS中。
- ASYNC_THTOUGH:数据写到Alluxio Worker中,并异步持久化到UFS中。
- THROUGH:只持久化到UFS中。
2.1 Alluxio客户端发起写入操作
业务代码中,对Alluxio写入一般先调用createFile,再开始write写入数据:
FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Create a file and get its output stream
FileOutStream out = fs.createFile(path);
// Write data
out.write(...);
// Close and complete file
out.close();
客户端执行写入的流程如下:
对于createFile请求,最终会调用RetryHandlingFileSystemMasterClient.createFile,通过grpc向Leader Master发起请求,Leader会将元数据在内存中进行修改,并将entry持久化到所有master的磁盘中:
public URIStatus createFile(final AlluxioURI path, final CreateFilePOptions options)
throws AlluxioStatusException {
return retryRPC(
() -> new URIStatus(GrpcUtils.fromProto(mClient.createFile(CreateFilePRequest.newBuilder()
.setPath(getTransportPath(path)).setOptions(options).build()).getFileInfo())),
RPC_LOG, "CreateFile", "path=%s,options=%s", path, options);
}
FileOutStream.write方法最终会调用AlluxioFileOutStream.writeInternal,首先检查是否需要创建一个block,然后准备向Worker中写入数据:
private void writeInternal(int b) throws IOException {
if (mShouldCacheCurrentBlock) {
try {
if (mCurrentBlockOutStream == null || mCurrentBlockOutStream.remaining() == 0) {
//按需请求新增block
getNextBlock();
}
//写入数据
mCurrentBlockOutStream.write(b);
} catch (IOException e) {
handleCacheWriteException(e);
}
}
if (mUnderStorageType.isSyncPersist()) {
mUnderStorageOutputStream.write(b);
Metrics.BYTES_WRITTEN_UFS.inc();
}
mBytesWritten++;
}
getNextBlock方法中,最终调用AlluxioFileOutStream.getNewBlockIdForFile通过grpc请求创建一个block:
public long getNewBlockIdForFile(final AlluxioURI path)
throws AlluxioStatusException {
return retryRPC(
() -> mClient.getNewBlockIdForFile(
GetNewBlockIdForFilePRequest.newBuilder().setPath(getTransportPath(path))
.setOptions(GetNewBlockIdForFilePOptions.newBuilder().build()).build())
.getId(),
RPC_LOG, "GetNewBlockIdForFile", "path=%s", path);
}
BlockOutStream.write会调用writeInternal方法写chunk,每个chunk默认1MB:
private void writeInternal(ByteBuf b, int off, int len) throws IOException {
if (len == 0) {
return;
}
while (len > 0) {
updateCurrentChunk(false);
int toWrite = Math.min(len, mCurrentChunk.writableBytes());
mCurrentChunk.writeBytes(b, off, toWrite);
off += toWrite;
len -= toWrite;
}
updateCurrentChunk(false);
}
写满一个chunk后,执行updateCurrentChunk方法,将数据输出到worker中:
private final List<DataWriter> mDataWriters;
if (mCurrentChunk.writableBytes() == 0 || lastChunk) {
try {
if (mCurrentChunk.readableBytes() > 0) {
for (DataWriter dataWriter : mDataWriters) {
mCurrentChunk.retain();
dataWriter.writeChunk(mCurrentChunk.duplicate());
}
} else {
Preconditions.checkState(lastChunk);
}
} finally {
// If the packet has bytes to read, we increment its refcount explicitly for every packet
// writer. So we need to release here. If the packet has no bytes to read, then it has
// to be the last packet. It needs to be released as well.
mCurrentChunk.release();
mCurrentChunk = null;
}
}
最终执行GrpcDataWriter.writeChunk写入到mStream中,可以看到mStream其实就是客户端向worker发起writeBlock请求后的输出流:
private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
//mStream通过向worker发起writeBlock请求,获得输出流
mStream = new GrpcBlockingStream<>(mClient.get()::writeBlock, writerBufferSizeMessages,
MoreObjects.toStringHelper(this)
.add("request", mPartialRequest)
.add("address", address)
.toString());
public void writeChunk(final ByteBuf buf) throws IOException {
mPosToQueue += buf.readableBytes();
try {
WriteRequest request = WriteRequest.newBuilder().setCommand(mPartialRequest).setChunk(
Chunk.newBuilder()
.setData(UnsafeByteOperations.unsafeWrap(buf.nioBuffer()))
.build()).build();
if (mStream instanceof GrpcDataMessageBlockingStream) {
//客户端通过输出流,将数据发送给worker
((GrpcDataMessageBlockingStream<WriteRequest, WriteResponse>) mStream)
.sendDataMessage(new DataMessage<>(request, new NettyDataBuffer(buf)), mDataTimeoutMs);
} else {
mStream.send(request, mDataTimeoutMs);
}
} finally {
buf.release();
}
}
2.2 Worker处理写入请求
下图是block数据既写入Worker,又写入UFS流程:
其中,重要流程是DelegationWriteHandler.createWriterHandler方法,根据用户请求,可以执行三种不同的写入逻辑:
- ALLUXIO_BLOCK:block数据直接入到Worker中。
- UFS_FILE:block数据只写入到UFS中。
- UFS_FALLBACK_BLOCK:block数据写入到Worker和UFS中。
DelegationWriteHandler.createWriterHandler方法如下:
private AbstractWriteHandler createWriterHandler(alluxio.grpc.WriteRequest request) {
switch (request.getCommand().getType()) {
case ALLUXIO_BLOCK:
return new BlockWriteHandler(mBlockWorker, mResponseObserver,
mUserInfo, mDomainSocketEnabled);
case UFS_FILE:
return new UfsFileWriteHandler(mUfsManager, mResponseObserver,
mUserInfo);
case UFS_FALLBACK_BLOCK:
return new UfsFallbackBlockWriteHandler(
mBlockWorker, mUfsManager, mResponseObserver, mUserInfo, mDomainSocketEnabled);
default:
throw new IllegalArgumentException(String.format("Invalid request type %s",
request.getCommand().getType().name()));
}
}
UfsFallbackBlockWriteHandler.writeBuf中,先通过BlockWriteHandler.writeBuf将数据写入到worker中,再写入到UFS中:
protected void writeBuf(BlockWriteRequestContext context,
StreamObserver<WriteResponse> responseObserver, DataBuffer buf, long pos) throws Exception {
if (context.isWritingToLocal()) {
long posBeforeWrite = pos - buf.readableBytes();
try {
//写入到worker中
mBlockWriteHandler.writeBuf(context, responseObserver, buf, pos);
return;
} //省略
// close the block writer first
if (context.getBlockWriter() != null) {
context.getBlockWriter().close();
}
// prepare the UFS block and transfer data from the temp block to UFS
//创建Ufs的Block,如果不存在文件,就在Ufs中创建文件
createUfsBlock(context);
if (posBeforeWrite > 0) {
//传输数据到Ufs中
transferToUfsBlock(context, posBeforeWrite);
}
// close the original block writer and remove the temp file
mBlockWriteHandler.cancelRequest(context);
}
if (context.getOutputStream() == null) {
createUfsBlock(context);
}
buf.readBytes(context.getOutputStream(), buf.readableBytes());
}
createUfsBlock方法中,就负责将worker中的数据读取出来,写入到UFS中:
private void transferToUfsBlock(BlockWriteRequestContext context, long pos) throws IOException {
OutputStream ufsOutputStream = context.getOutputStream();
long blockId = context.getRequest().getId();
Optional<TempBlockMeta> block = mWorker.getBlockStore().getTempBlockMeta(blockId);
Preconditions.checkState(block.isPresent()
&& Files.copy(Paths.get(block.get().getPath()), ufsOutputStream) == pos);
}
3. Alluxio读流程
3.1 客户端发起读流程
业务读取Alluxio流程代码如下所示:
FileSystem fs = FileSystem.Factory.get();
AlluxioURI path = new AlluxioURI("/myFile");
// Open the file for reading
FileInStream in = fs.openFile(path);
// Read data
in.read(...);
// Close file relinquishing the lock
in.close();
openFile主要是进行初始化操作,将配置信息封装到AlluxioFileInStream对象中。随后执行AlluxioFileInStream.read方法开始读取。
AlluxioFileInStream.read方法经过一系列的调用,执行BlockInStream.readChunk一次读取每个chunk:
private void readChunk() throws IOException {
if (mDataReader == null) {
mDataReader = mDataReaderFactory.create(mPos, mLength - mPos);
}
if (mCurrentChunk != null && mCurrentChunk.readableBytes() == 0) {
mCurrentChunk.release();
mCurrentChunk = null;
}
if (mCurrentChunk == null) {
mCurrentChunk = mDataReader.readChunk();
}
}
最终调用GrpcDataReader.readChunkInternal方法读取block。GrpcDataReader首先会通过执行readBlock访问worker获取输入流,在读取输入流中的数据:
private final GrpcBlockingStream<ReadRequest, ReadResponse> mStream;
//调用readBlock访问worker,构建输入流
mStream = new GrpcDataMessageBlockingStream<>(mClient.get()::readBlock,
readerBufferSizeMessages,
desc, null, mMarshaller);
private DataBuffer readChunkInternal() throws IOException {
Preconditions.checkState(!mClient.get().isShutdown(),
"Data reader is closed while reading data chunks.");
DataBuffer buffer = null;
ReadResponse response = null;
//获取数据
DataMessage<ReadResponse, DataBuffer> message =
((GrpcDataMessageBlockingStream<ReadRequest, ReadResponse>) mStream)
.receiveDataMessage(mDataTimeoutMs);
//忽略
return buffer;
}
3.2 worker服务端处理读流程
Worker接收读请求后,执行流程如下所示:
Worker服务端收到readBlock请求后,BlockReadHandler.getDataBuffer方法负责获取block输入流,然后返回给客户端:
protected DataBuffer getDataBuffer(BlockReadRequestContext context, long offset, int len)
throws Exception {
@Nullable
BlockReader blockReader = null;
try {
//构建blockReader
openBlock(context);
openMs = System.currentTimeMillis() - startMs;
blockReader = context.getBlockReader();
Preconditions.checkState(blockReader != null);
startTransferMs = System.currentTimeMillis();
ByteBuf buf;
switch (mBlockStoreType) {
case PAGE:
if (mIsReaderBufferPooled) {
buf = PooledDirectNioByteBuf.allocate(len);
} else {
buf = Unpooled.directBuffer(len, len);
}
try {
//blockReader读取数据
while (buf.writableBytes() > 0 && blockReader.transferTo(buf) != -1) {
}
return new NettyDataBuffer(buf.retain());
} finally {
buf.release();
}
//省略
}
openBlock方法负责构建BlockReader,它最后调用PagedBlockStore.createBlockReader。它的逻辑如下:
- 如果block元数据存在,通过元数据读取worker已经存在的block。
- 如果block元数据不存在,就访问UFS读取block数据;并检查是否需要写入block到worker中,如果不需要worker缓存,数据会直接从UFS中通过worker传输到客户端。
public BlockReader createBlockReader(long sessionId, long blockId, long offset,
boolean positionShort, Protocol.OpenUfsBlockOptions options)
throws IOException {
BlockLock blockLock = mLockManager.acquireBlockLock(sessionId, blockId, BlockLockType.READ);
//查看worker中是否存在该block,存在就构建reader读worker中的block,获取读锁
try (LockResource lock = new LockResource(mPageMetaStore.getLock().readLock())) {
Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
if (blockMeta.isPresent()) {
final BlockPageEvictor evictor = blockMeta.get().getDir().getEvictor();
evictor.addPinnedBlock(blockId);
return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
evictor.removePinnedBlock(blockId);
unpinBlock(blockLock);
});
}
}
// this is a block that needs to be read from UFS
//如果worker中不存在block,就要申请写锁从UFS读取block写入到worker中
try (LockResource lock = new LockResource(mPageMetaStore.getLock().writeLock())) {
// in case someone else has added this block while we wait for the lock,
// just use the block meta; otherwise create a new one and add to the metastore
Optional<PagedBlockMeta> blockMeta = mPageMetaStore.getBlock(blockId);
//获取到写锁后如果worker又缓存了block,依然读取worker中的block
if (blockMeta.isPresent()) {
blockMeta.get().getDir().getEvictor().addPinnedBlock(blockId);
return new DelegatingBlockReader(getBlockReader(blockMeta.get(), offset, options), () -> {
blockMeta.get().getDir().getEvictor().removePinnedBlock(blockId);
unpinBlock(blockLock);
});
}
long blockSize = options.getBlockSize();
PagedBlockStoreDir dir =
(PagedBlockStoreDir) mPageMetaStore.allocate(BlockPageId.fileIdOf(blockId, blockSize),
blockSize);
PagedBlockMeta newBlockMeta = new PagedBlockMeta(blockId, blockSize, dir);
//如果用户设置了不缓存block到worker,就直接读取UFS
if (options.getNoCache()) {
// block does not need to be cached in Alluxio, no need to add and commit it
unpinBlock(blockLock);
final UfsBlockReadOptions readOptions;
try {
readOptions = UfsBlockReadOptions.fromProto(options);
} catch (IllegalArgumentException e) {
throw new AlluxioRuntimeException(Status.INTERNAL,
String.format("Block %d may need to be read from UFS, but key UFS read options "
+ "is missing in client request", blockId), e, ErrorType.Internal, false);
}
//构建一个UFSBlockReader
return new PagedUfsBlockReader(mUfsManager, mUfsInStreamCache, newBlockMeta,
offset, readOptions, mPageSize);
}
//新构建一个block
mPageMetaStore.addBlock(newBlockMeta);
dir.getEvictor().addPinnedBlock(blockId);
//默认情况下,将block信息从UFS中写入到worker中
return new DelegatingBlockReader(getBlockReader(newBlockMeta, offset, options), () -> {
//先
commitBlockToMaster(newBlockMeta);
newBlockMeta.getDir().getEvictor().removePinnedBlock(blockId);
unpinBlock(blockLock);
});
}
}
4. HDFS客户端访问Alluxio过程
HDFS的配置文件core-site.xml中增加Alluxio文件系统配置:
<property>
<name>fs.alluxio.impl</name>
<value>alluxio.hadoop.FileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.alluxio.impl</name>
<value>alluxio.hadoop.AlluxioFileSystem</value>
<description>The Alluxio AbstractFileSystem (Hadoop 2.x)</description>
</property>
在hadoop的安装包hadoop/share/hadoop/common中,增加alluxio SDK包alluxio-client.jar。
随后,业务方在代码中设置alluxio前缀的访问:
public void create() throws URISyntaxException, IOException, InterruptedException {
// 配置文件
Configuration conf = new Configuration();
// 获取文件系统
FileSystem fs = FileSystem.get(new URI("alluxio://{alluxio leader}/path"), conf, 访问的用户);
// 创建文件并写入数据
FSDataOutputStream out = fs.create(new Path("/root/test3.txt"));
out.write("Hello, HDFS".getBytes());
out.flush();
// 关闭流
fs.close();
}
FileSystem解析到alluixo前缀,从而获取fs.alluxio.impl配置alluxio.hadoop.FileSystem,通过alluxio.hadoop.FileSystem创建Alluxio的文件系统客户端对象:
private static FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
Tracer tracer = FsTracer.get(conf);
try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
scope.addKVAnnotation("scheme", uri.getScheme());
//查询core-site.xml中fs.alluxio.impl的配置,为alluxio.hadoop.FileSystem
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
//通过反射创建FileSystem子类alluxio.hadoop.FileSystemm对象
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
//alluxio.hadoop.FileSystem初始化操作
fs.initialize(uri, conf);
return fs;
}
}
标签:流程,写入,读写,worker,new,UFS,Alluxio,block
From: https://blog.51cto.com/u_15327484/8305929