首页 > 其他分享 >Kafka的文件顺序读写

Kafka的文件顺序读写

时间:2023-06-19 22:44:29浏览次数:48  
标签:文件 顺序 读写 list System Kafka file 磁盘 data

背景说明

Kafka使用起来很方便,而且磁盘写入性能非常好,那么它是如何实现的呢。

Kafka的文档说明中,有这样一段:

大致意思是磁盘的读写性能并不弱,现代操作系统都会对磁盘的操作进行预读/缓存,合理复用操作系统的磁盘IO特性,可以提高Kafka的日志磁盘写入性能。

数据写入

以Kafka生产者视角为例,数据经网络传输至Broker,再持久化数据到日志文件。

kafka利用Linux操作系统的两个特性来进行文件的高效持久化:

PageCache

在Linux系统中,所有文件IO操作都是通过PageCache机制实现的。

PageCache是磁盘文件的一种读写管理方式,对于Linux操作系统,磁盘上文件都是由一系列的数据块组成,数据块的大小一般是4KB。

用户应用写入数据后,也并不是直接写到磁盘的,而是先写到操作系统的Buffer中,再提交到PageCache中。最后由操作系统刷入磁盘,至于什么时候刷入磁盘,由操作系统决定。对于用户应用来说,数据提交到内核的PageCache缓冲区后,即可认为数据写入成功。

Kafka进行写操作时,是提前创建一个文件,后续内容追加到该文件对应磁盘空间中,达到顺序读写的目的。减少随机IO带来的损耗。

MMAP

mmap(Memory Mapped Files)是Linux操作系统提供的一种内存映射文件的方法,它可以将一个文件描述符映射进内存中,实现文件到物理内存的映射,完成MMAP映射后,用户对内存的所有操作会被自动的刷新到磁盘上,应用把数据写入到内存,就等价于数据写入到磁盘。这样就达到了通过直接操作内存即可直接操作磁盘上的文件的目的,结合PageCache机制,使得文件非常高效写入进磁盘中。

数据读取

以Kafka消费者视角为例,数据从Broker磁盘读取,再经网络传输至Consumer。

Linux操作系统基于PageCache的提供预读机制(PAGE_READAHEAD),场景举例:

  • 用户应用请求读取磁盘上文件 A 的 offset 为 0-3KB 范围内的数据,由于磁盘的基本读写文件块大小:4KB,于是操作系统一次至少会读 一个文件块(0-4KB)的内容。
  • 同时操作系统出于读取优化的目的,会选择将相邻磁盘块 offset [4-8KB)、[8-12KB) 以及 [12-16KB) 都加载到内存,于是额外在内存存放了 3 个 文件块的数据;

这就是PageCache的数据文件预读取,对于文件的第一个读操作,操作系统在读入所请文件块的同时,会读入紧随其后的少数几个文件块。从这里看出,用户应用仅仅只是读取一个文件块中3K的内容,从操作系统层面来讲,也会提前预读取更多的数据,以便在下次读取消息时, 就能提高PageCache的命中效率,从而提高IO读的效率。

在Linux的PageCache的预读取机制下,磁盘读性能会接近于内存。

测试代码

//jdk.18
import org.apache.commons.io.FileUtils;

import java.io.*;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class FileRandomRWTest {
    static int LINE_COUNT = 10000000;

    public static void main(String[] args) throws IOException, InterruptedException {
        //生成测试数据
        String source_file_path = "source_data.txt";
        List<String> data_list = generatorData(source_file_path);

        String seq_data = "seq_data.txt";
        String random_data = "random_data.txt";

        writeSequenceFileTest(seq_data, data_list);
        Thread.sleep(5000);
        writeRandomFileTest(random_data, data_list);
    }

    public static List<String> generatorData(String outputFilePath) throws IOException {
        System.out.println("generatorData start...");
        long start = System.currentTimeMillis();

        List<String> list = new ArrayList<>();
        for (int i = 0; i < LINE_COUNT; i++) {
            UUID uuid = UUID.randomUUID();
            list.add(uuid.toString());
        }
//        File outputFile = new File(outputFilePath);
//        //如果文件存在,则删除
//        if (outputFile.exists())
//            FileUtils.delete(outputFile);
//
//        FileUtils.writeLines(outputFile, list);

        long stop = System.currentTimeMillis();
        System.out.println("generatorData end.duration(s):" + (stop - start) / 1000);
        return list;
    }

    public static void writeRandomFileTest(String filePath, List<String> data_list) throws IOException {
        System.out.println("writeRandomFileTest start...");
        long start = System.currentTimeMillis();

        File file = new File(filePath);
        //如果文件存在,则删除
        if (file.exists())
            FileUtils.delete(new File(filePath));
        file.createNewFile();

        FileWriter writer = new FileWriter(file, true);

        try {
            for (int i = 0; i < data_list.size(); i++) {
                String content = data_list.get(i);
                fileRandomWrite(writer, content + "\n");
            }
        } finally {
            writer.close();
        }
        long stop = System.currentTimeMillis();
        System.out.println("writeRandomFileTest end.duration(s):" + (stop - start) / 1000);
    }

    public static void fileRandomWrite(FileWriter writer, String content) throws IOException {
        //创建FileWriter对象
        try {
            writer.write(content);//写入内容
            writer.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void writeSequenceFileTest(String filePath, List<String> data_list) throws IOException {
        System.out.println("writeSequenceFileTest start...");
        long start = System.currentTimeMillis();

        File file = new File(filePath);
        //如果文件存在,则删除
        if (file.exists())
            FileUtils.delete(new File(filePath));
        file.createNewFile();

        RandomAccessFile randomAccessTargetFile;

        randomAccessTargetFile = new RandomAccessFile(file, "rw");
        FileChannel channel = randomAccessTargetFile.getChannel();

        MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, (long) 1024 * 1024 * 600);

        int position = 0;
        for (int i = 0; i < data_list.size(); i++) {
            String content = data_list.get(i);
            position = fileSequenceWrite(map, content + "\n", position, content.length());
        }

        channel.close();
        long stop = System.currentTimeMillis();
        System.out.println("writeSequenceFileTest end.duration(s):" + (stop - start) / 1000);
    }

    public static int fileSequenceWrite(MappedByteBuffer map, String content, int position, long size) {
        try {
            map.position(position);
            map.put(content.getBytes());

            position = map.position();
            return position;
        } catch (Exception e) {
            e.printStackTrace();

        }
        return 0;
    }

    public static void fileRead(String filePath) {
        File file = new File(filePath);
        if (file.exists()) {
            try {
                //创建FileReader对象,读取文件中的内容
                FileReader reader = new FileReader(file);
                char[] ch = new char[1];
                while (reader.read(ch) != -1) {
                    System.out.print(ch);
                }
                reader.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }

        }
    }
}


/*
输出结果:
generatorData start...
generatorData end.duration(s):9
writeSequenceFileTest start...
writeSequenceFileTest end.duration(s):3
writeRandomFileTest start...
writeRandomFileTest end.duration(s):18
*/

测试结论

经过多轮测试,使用顺序写入比随机性入在测试环境能提供6倍的速度提升,效果确实非常明显。

标签:文件,顺序,读写,list,System,Kafka,file,磁盘,data
From: https://www.cnblogs.com/panshan-lurenjia/p/17492431.html

相关文章

  • Kafka学习
    Kafka学习https://blog.csdn.net/Eternal_Blue/article/details/95598942https://www.cnblogs.com/swordfall/p/10014300.html‍常用命令启停开启zookeeper:./zkServer.shstart关闭zookeeper:./zkServer.shstop查看zookeeper的启动状态:./zkServer.shstatusKafka启......
  • Oracle优化器对谓词顺序处理的一个场景
    最近听了个讲座,其中介绍到了Oracle的谓词,原始版本的例子,如下所示,从数据上能看到,c1='3'的时候,c2的值是个字符串类型的数字,SQL>createtabletest(c1char(1),c2varchar2(1));Tablecreated.SQL>insertintotestvalues('1','A');1rowcreated.SQL>insertintotes......
  • centos7-docker安装与配置kafka+zookeeper+kafkamanager
    一、默认docker环境已经OK拉镜像dockerpullwurstmeister/zookeeperdockerpullwurstmeister/kafkadockerpullsheepkiller/kafka-manager删镜像dockerrmi+镜像id查看镜像[root@build-science-system-services-03~]#dockerimages二、运行相关容器启动zookeeper:......
  • MySQL中SQL语句的执行顺序(详细)
    一:SQL语句的执行顺序作为一个开发人员,在开发中基本上每时每刻都要和数据库打交到;虽然写过无数的SQL语句,但是写好一个SQL可不是这么简单的,它涉及到各式各样的优化和书写方式;但下面我以MySQL中的SQL执行顺序来作为讲解,对其进行剖析。1:SQL数据的准备为了可以为下文做铺垫,......
  • kafka常用命令
    转 https://www.cnblogs.com/xgss/articles/16580433.html 1启动zookeeperbin/zookeeper-server-start.shconfig/zookeeper.properties 2启动kafkabin/kafka-server-start.shconfig/server.properties多个kafka的话,在各个虚拟机上运行kafka启动命令多次即......
  • 函数重载及其匹配顺序
    1.函数重载概念:同一个作用域内,同名函数,参数列表不同(参数个数、类型或顺序不同)。与返回值类型没有关系。如果函数名和参数列表相同,无论返回值类型是否相同,会编译错误(函数重定义)const是有效的重载。仅默认参数值不同不是有效的重载。例如intfun(inta=0);//voidfun(i......
  • NOI / 1.9编程基础之顺序 09:直方图
    描述给定一个非负整数数组,统计里面每一个数的出现次数。我们只统计到数组里最大的数。假设Fmax(Fmax<10000)是数组里最大的数,那么我们只统计{0,1,2.....Fmax}里每个数出现的次数。输入第一行n是数组的大小。1<=n<=10000。紧接着一行是数组的n个元素。输出按顺序输......
  • mycat读写分离
          ......
  • SpringBoot整合Kafka
    第一步: 第二步: 第三步:  ......
  • kafka的启动--windows版
    首先下载并安装kafka然后进入到安装目录输入cmd然后先启动zookeerper输入下面的命令zookeeper-server-start.bat../../config/zookeeper.properties再启动kafka,输入下面命令kafka-server-start.bat../../config/server.properties已完成启动!! ......