背景说明
Kafka使用起来很方便,而且磁盘写入性能非常好,那么它是如何实现的呢。
在Kafka的文档说明中,有这样一段:
大致意思是磁盘的读写性能并不弱,现代操作系统都会对磁盘的操作进行预读/缓存,合理复用操作系统的磁盘IO特性,可以提高Kafka的日志磁盘写入性能。
数据写入
以Kafka生产者视角为例,数据经网络传输至Broker,再持久化数据到日志文件。
kafka利用Linux操作系统的两个特性来进行文件的高效持久化:
PageCache
在Linux系统中,所有文件IO操作都是通过PageCache机制实现的。
PageCache是磁盘文件的一种读写管理方式,对于Linux操作系统,磁盘上文件都是由一系列的数据块组成,数据块的大小一般是4KB。
用户应用写入数据后,也并不是直接写到磁盘的,而是先写到操作系统的Buffer中,再提交到PageCache中。最后由操作系统刷入磁盘,至于什么时候刷入磁盘,由操作系统决定。对于用户应用来说,数据提交到内核的PageCache缓冲区后,即可认为数据写入成功。
Kafka进行写操作时,是提前创建一个文件,后续内容追加到该文件对应磁盘空间中,达到顺序读写的目的。减少随机IO带来的损耗。
MMAP
mmap(Memory Mapped Files)是Linux操作系统提供的一种内存映射文件的方法,它可以将一个文件描述符映射进内存中,实现文件到物理内存的映射,完成MMAP映射后,用户对内存的所有操作会被自动的刷新到磁盘上,应用把数据写入到内存,就等价于数据写入到磁盘。这样就达到了通过直接操作内存即可直接操作磁盘上的文件的目的,结合PageCache机制,使得文件非常高效写入进磁盘中。
数据读取
以Kafka消费者视角为例,数据从Broker磁盘读取,再经网络传输至Consumer。
Linux操作系统基于PageCache的提供预读机制(PAGE_READAHEAD),场景举例:
- 用户应用请求读取磁盘上文件 A 的 offset 为 0-3KB 范围内的数据,由于磁盘的基本读写文件块大小:4KB,于是操作系统一次至少会读 一个文件块(0-4KB)的内容。
- 同时操作系统出于读取优化的目的,会选择将相邻磁盘块 offset [4-8KB)、[8-12KB) 以及 [12-16KB) 都加载到内存,于是额外在内存存放了 3 个 文件块的数据;
这就是PageCache的数据文件预读取,对于文件的第一个读操作,操作系统在读入所请文件块的同时,会读入紧随其后的少数几个文件块。从这里看出,用户应用仅仅只是读取一个文件块中3K的内容,从操作系统层面来讲,也会提前预读取更多的数据,以便在下次读取消息时, 就能提高PageCache的命中效率,从而提高IO读的效率。
在Linux的PageCache的预读取机制下,磁盘读性能会接近于内存。
测试代码
//jdk.18
import org.apache.commons.io.FileUtils;
import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
public class FileRandomRWTest {
static int LINE_COUNT = 10000000;
public static void main(String[] args) throws IOException, InterruptedException {
//生成测试数据
String source_file_path = "source_data.txt";
List<String> data_list = generatorData(source_file_path);
String seq_data = "seq_data.txt";
String random_data = "random_data.txt";
writeSequenceFileTest(seq_data, data_list);
Thread.sleep(5000);
writeRandomFileTest(random_data, data_list);
}
public static List<String> generatorData(String outputFilePath) throws IOException {
System.out.println("generatorData start...");
long start = System.currentTimeMillis();
List<String> list = new ArrayList<>();
for (int i = 0; i < LINE_COUNT; i++) {
UUID uuid = UUID.randomUUID();
list.add(uuid.toString());
}
// File outputFile = new File(outputFilePath);
// //如果文件存在,则删除
// if (outputFile.exists())
// FileUtils.delete(outputFile);
//
// FileUtils.writeLines(outputFile, list);
long stop = System.currentTimeMillis();
System.out.println("generatorData end.duration(s):" + (stop - start) / 1000);
return list;
}
public static void writeRandomFileTest(String filePath, List<String> data_list) throws IOException {
System.out.println("writeRandomFileTest start...");
long start = System.currentTimeMillis();
File file = new File(filePath);
//如果文件存在,则删除
if (file.exists())
FileUtils.delete(new File(filePath));
file.createNewFile();
FileWriter writer = new FileWriter(file, true);
try {
for (int i = 0; i < data_list.size(); i++) {
String content = data_list.get(i);
fileRandomWrite(writer, content + "\n");
}
} finally {
writer.close();
}
long stop = System.currentTimeMillis();
System.out.println("writeRandomFileTest end.duration(s):" + (stop - start) / 1000);
}
public static void fileRandomWrite(FileWriter writer, String content) throws IOException {
//创建FileWriter对象
try {
writer.write(content);//写入内容
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void writeSequenceFileTest(String filePath, List<String> data_list) throws IOException {
System.out.println("writeSequenceFileTest start...");
long start = System.currentTimeMillis();
File file = new File(filePath);
//如果文件存在,则删除
if (file.exists())
FileUtils.delete(new File(filePath));
file.createNewFile();
RandomAccessFile randomAccessTargetFile;
randomAccessTargetFile = new RandomAccessFile(file, "rw");
FileChannel channel = randomAccessTargetFile.getChannel();
MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, (long) 1024 * 1024 * 600);
int position = 0;
for (int i = 0; i < data_list.size(); i++) {
String content = data_list.get(i);
position = fileSequenceWrite(map, content + "\n", position, content.length());
}
channel.close();
long stop = System.currentTimeMillis();
System.out.println("writeSequenceFileTest end.duration(s):" + (stop - start) / 1000);
}
public static int fileSequenceWrite(MappedByteBuffer map, String content, int position, long size) {
try {
map.position(position);
map.put(content.getBytes());
position = map.position();
return position;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
public static void fileRead(String filePath) {
File file = new File(filePath);
if (file.exists()) {
try {
//创建FileReader对象,读取文件中的内容
FileReader reader = new FileReader(file);
char[] ch = new char[1];
while (reader.read(ch) != -1) {
System.out.print(ch);
}
reader.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}
/*
输出结果:
generatorData start...
generatorData end.duration(s):9
writeSequenceFileTest start...
writeSequenceFileTest end.duration(s):3
writeRandomFileTest start...
writeRandomFileTest end.duration(s):18
*/
测试结论
经过多轮测试,使用顺序写入比随机性入在测试环境能提供6倍的速度提升,效果确实非常明显。
标签:文件,顺序,读写,list,System,Kafka,file,磁盘,data From: https://www.cnblogs.com/panshan-lurenjia/p/17492431.html