在大数据时代,处理超大规模数据是算法工程师需要面对的重要问题。本文将以在内存受限环境下,求一个大文件中词频最高的Top N词为例,探讨一种基于堆结构与外部排序的解决方案。
问题描述
给定一个1G大小的文件file.txt,里面每行是一个词,词的大小不超过16字节。内存限制为1M。要求返回文件中词频最高的100个词。
常规方法及不足
最简单的方法是将文件全部读入内存,统计每个词的频数,最后取频数最大的100个词。但文件大小远超内存限制,无法操作。
一种改进是分批读入文件,每次统计一批词频,最后合并结果。这种方法可以控制内存使用,但需要多轮遍历文件,当文件很大时IO成本非常高。且还需要频繁合并中间结果。
再一种方法是使用外部排序算法。将文件逐行读入,并排序,然后统计词频输出Top N结果。此方法依然需要多轮磁盘IO,效率较低。
基于堆结构的解法
基于上述分析,需要一种可以动态维护topk结果的数据结构。堆可以提供这种能力。
具体地,可以使用一个小根堆,堆的大小固定为N(此处为100)。每次从文件中读取一定大小的词,统计词频保存到一个哈希表中。然后遍历这个哈希表,把词频作为值,词语作为键,逐个插入小根堆。如果堆大小超过N,则移除堆顶最小的元素。重复这一过程,直到文件读取完毕,则堆中的N个元素就是全局topk结果。
堆结构保证了每次只需要维护规模为N的中间结果,而不是全量结果,因此可以控制内存占用。
算法实现
基于小根堆,可以设计一个内存受限的词频统计算法:
- 初始化大小为N的小根堆,用于保存topk结果import java.io.*; import java.util.*; public class TopKFrequentWords { private static final int N = 100; // 返回topk结果数 private static final int BATCH_SIZE = 100; // 每批读入行数 public static List<String> topKFrequent(String file, int k, int batchSize) throws IOException { PriorityQueue<WordFreq> pq = new PriorityQueue<>((a, b) -> Integer.compare(a.freq, b.freq)); BufferedReader reader = new BufferedReader(new FileReader(file)); String line; HashMap<String, Integer> freq = new HashMap<>(); while ((line = reader.readLine()) != null) { // 统计每批词频 freq.put(line, freq.getOrDefault(line, 0) + 1); if (freq.size() >= batchSize) { // 加载到堆中 for (Map.Entry<String, Integer> entry : freq.entrySet()) { pq.offer(new WordFreq(entry.getKey(), entry.getValue())); if (pq.size() > k) { pq.poll(); } } freq.clear(); // 清空当前批次结果 } } // 加载最后一个批次 for (Map.Entry<String, Integer> entry : freq.entrySet()) { pq.offer( new WordFreq(entry.getKey(), entry.getValue())); } // 构建结果列表 List<String> topK = new ArrayList<>(); while (!pq.isEmpty()) { topK.add(pq.poll().word); } Collections.reverse(topK); return topK; } public static class WordFreq { String word; int freq; public WordFreq(String word, int freq) { this.word = word; this.freq = freq; } } }这个示例定义了一个小根堆,每次从文件中读取一批数据进行统计,并维护堆中的topk词频结果。最后遍历堆构建结果列表。可以控制每批次处理数据量,保证内存不超限。总结本文针对内存受限环境下的大文件Top N词频问题,给出一种基于堆结构与外部排序的解决方案,主要有以下优点:import java.io.*; import java.util.*; public class TopKFrequentWords { private static final int N = 100; // 返回topk结果数 private static final int BATCH_SIZE = 100; // 每批读入行数 public static List<String> topKFrequent(String file, int k, int batchSize) throws IOException { PriorityQueue<WordFreq> pq = new PriorityQueue<>((a, b) -> Integer.compare(a.freq, b.freq)); BufferedReader reader = new BufferedReader(new FileReader(file)); String line; HashMap<String, Integer> freq = new HashMap<>(); while ((line = reader.readLine()) != null) { // 统计每批词频 freq.put(line, freq.getOrDefault(line, 0) + 1); if (freq.size() >= batchSize) { // 加载到堆中 for (Map.Entry<String, Integer> entry : freq.entrySet()) { pq.offer(new WordFreq(entry.getKey(), entry.getValue())); if (pq.size() > k) { pq.poll(); } } freq.clear(); // 清空当前批次结果 } } // 加载最后一个批次 for (Map.Entry<String, Integer> entry : freq.entrySet()) { pq.offer( new WordFreq(entry.getKey(), entry.getValue())); } // 构建结果列表 List<String> topK = new ArrayList<>(); while (!pq.isEmpty()) { topK.add(pq.poll().word); } Collections.reverse(topK); return topK; } public static class WordFreq { String word; int freq; public WordFreq(String word, int freq) { this.word = word; this.freq = freq; } } }这个示例定义了一个小根堆,每次从文件中读取一批数据进行统计,并维护堆中的topk词频结果。最后遍历堆构建结果列表。可以控制每批次处理数据量,保证内存不超限。总结本文针对内存受限环境下的大文件Top N词频问题,给出一种基于堆结构与外部排序的解决方案,主要有以下优点:
- 可以分批处理文件,控制内存占用;
- 堆结构可以动态维护流式数据的topk结果;
- 只需要一轮外部排序,效率较高。
- 可以分批处理文件,控制内存占用;
- 堆结构可以动态维护流式数据的topk结果;
- 只需要一轮外部排序,效率较高。 当然,如果数据量级更大,还可以考虑MapReduce等分布式计算框架。但本文的方法可以覆盖很多常见场景,并可以扩展解决更多类似问题。
- 逐批从文件中读取一定行数的词,统计到哈希表F中
- 遍历F,将词频作为值,词语作为键,插入小根堆
- 堆大小超过N,则移除堆顶最小元素
- 重复步骤2-4,直到文件读完
- 堆中的N个元素即为全局topk结果