将 2GB 的大文件分割为 2048 个大小为 512KB 的小文件,采用流式读取方式处理,避免一次性加载整个文件导致内存溢出。
初始化一个长度为 2048 的哈希表数组,用于分别统计各个小文件中单词的出现频率。
利用多线程并行处理机制遍历所有 2048 个小文件,对每个文件中的单词执行哈希取模运算(
int hash = Math.abs(word.hashCode() % hashTableSize)
),并通过hashTables[hash].merge(word, 1, Integer::sum)
的方式将单词频率累加到对应的哈希表中。遍历这 2048 个哈希表,对每个哈希表中的单词按频率排序,将频率排名前 100 的单词存入一个小顶堆。
经过上述处理后,小顶堆中最终保留的 100 个单词即为整个文件中出现频率最高的前 100 个单词。
package com.example.bigfiletop100;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
public class Top100Words {
private static final int HASH_TABLE_SIZE = 2048;
private static final int TOP_WORDS_COUNT = 100;
public static void main(String[] args) throws IOException, InterruptedException {
String largeFilePath = "C:\\Users\\亚洲\\IdeaProjects\\springboot-easyexcel-mybatisplus\\src\\main\\java\\com\\example\\bigfiletop100\\input.txt"; // 替换为大文件的路径
String tempDirPath = "C:\\Users\\亚洲\\IdeaProjects\\springboot-easyexcel-mybatisplus\\src\\main\\java\\com\\example\\bigfiletop100\\temp_files"; // 替换为临时目录的路径
Path tempDir = Paths.get(tempDirPath);
if (!Files.exists(tempDir)) {
Files.createDirectories(tempDir);
}
// 1. 分割大文件为512KB的小文件(改进版)
splitFile(largeFilePath, tempDir.toString(), 512 * 1024);
// 2. 初始化哈希表数组
ConcurrentHashMap<Integer, Map<String, Integer>> hashTables = new ConcurrentHashMap<>();
for (int i = 0; i < HASH_TABLE_SIZE; i++) {
hashTables.put(i, new ConcurrentHashMap<>());
}
// 3. 并行遍历小文件,统计单词频率
File[] smallFiles = new File(tempDir.toString()).listFiles((dir, name) -> name.endsWith(".txt"));
if (smallFiles == null || smallFiles.length == 0) {
System.err.println("No small files found.");
return;
}
ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CountDownLatch latch = new CountDownLatch(smallFiles.length);
for (File smallFile : smallFiles) {
executor.submit(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(smallFile), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
String[] words = line.trim().split("\\s+");
for (String word : words) {
if (word.isEmpty()) continue;
int hash = Math.abs(word.hashCode() % HASH_TABLE_SIZE);
hashTables.get(hash).merge(word, 1, Integer::sum);
}
}
} catch (IOException e) {
System.err.println("Error reading file: " + smallFile.getName());
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
executor.shutdown();
latch.await(); // 等待所有任务完成
// 4. 合并哈希表,并把频率前100的单词存入小顶堆
PriorityQueue<Map.Entry<String, Integer>> minHeap = new PriorityQueue<>(
TOP_WORDS_COUNT,
(o1, o2) -> o1.getValue().compareTo(o2.getValue())
);
hashTables.values().stream()
.filter(Objects::nonNull)
.flatMap(map -> map.entrySet().stream())
.forEach(entry -> {
if (minHeap.size() < TOP_WORDS_COUNT) {
minHeap.offer(entry);
} else if (entry.getValue() > minHeap.peek().getValue()) {
minHeap.poll();
minHeap.offer(entry);
}
});
// 5. 输出结果
System.out.println("Top " + TOP_WORDS_COUNT + " words:");
List<Map.Entry<String, Integer>> topWords = new ArrayList<>(minHeap);
topWords.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); // 按降序排列
topWords.forEach(entry -> System.out.println(entry.getKey() + ": " + entry.getValue()));
// 清理临时文件和目录
Arrays.stream(smallFiles).forEach(File::delete);
Files.deleteIfExists(tempDir);
}
private static void splitFile(String largeFilePath, String tempDir, int size) throws IOException {
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(largeFilePath))) {
byte[] buffer = new byte[size];
int bytesRead;
int fileCount = 0;
while ((bytesRead = bis.read(buffer)) != -1) {
File smallFile = new File(tempDir, "part-" + fileCount++ + ".txt");
// 避免截断单词
int actualBytesRead = bytesRead;
if (bytesRead <= size && buffer[bytesRead - 1] != '\n') {
// 查找最后一个空格或换行符作为分割点
for (int i = bytesRead - 1; i >= 0; i--) {
if (buffer[i] == ' ' || buffer[i] == '\n') {
actualBytesRead = i + 1;
bis.reset();
bis.skip(actualBytesRead);
break;
}
}
}
try (BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(smallFile))) {
bos.write(buffer, 0, actualBytesRead);
}
}
}
}
}
关键说明
文件分割优化:
分割时避免截断单词(通过查找最后一个非字母位置),确保每个单词完整属于一个小文件,避免统计误差。内存控制:
单个小文件 512KB,2G 文件约 4096 个小文件(而非 2048,更保守),每个小文件单独处理,避免 OOM。多线程效率:
使用线程池(大小为 CPU 核心数)并行处理小文件,提升统计速度,哈希表数组避免线程竞争(每个哈希槽独立)。小顶堆筛选:
堆大小固定为 100,每次插入复杂度 O (log 100),整体效率远高于全量排序(O (n log n))。单词处理:
统一转为小写(忽略大小写差异),用正则提取纯字母单词(可根据需求调整匹配规则,如保留数字)。
此方案适用于大文件场景,通过分治思想将问题拆解为可并行处理的子任务,再通过堆排序高效筛选结果。