java 如何批量从文件中读取50w数据批量插入数据库,需要用多线程,并且防止jvm内存溢出
在 Java 中从文件中读取大量数据(例如 50 万条记录)并批量插入数据库时,使用多线程和防止 JVM 内存溢出是一个挑战。以下是一个可能的解决方案,涉及分块读取文件、使用多线程进行数据库插入,以及适当的内存管理。
步骤概述
- 分块读取文件:将文件内容按块读取到内存中,每块包含一定数量的记录(例如 1000 条记录)。
- 使用多线程:创建一个线程池,每个线程负责处理一个数据块并将其插入数据库。
- 批量插入:使用 JDBC 批量插入(batch insert)来减少数据库交互次数。
- 内存管理:确保每次处理完一个数据块后,及时释放相关内存。
代码示例
以下是一个简化的示例代码,展示如何实现上述步骤:
java复制代码
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class BatchInsertExample {
private static final int BATCH_SIZE = 1000; // 每次读取的记录数
private static final int THREAD_POOL_SIZE = 10; // 线程池大小
private static final String DB_URL = "jdbc:mysql://localhost:3306/yourdatabase";
private static final String DB_USER = "yourusername";
private static final String DB_PASSWORD = "yourpassword";
public static void main(String[] args) {
String filePath = "path/to/your/largefile.txt";
ExecutorService executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
List<String[]> batchData = new ArrayList<>(BATCH_SIZE);
while ((line = br.readLine()) != null) {
String[] record = parseLine(line); // 解析每行数据
batchData.add(record);
if (batchData.size() == BATCH_SIZE) {
executorService.submit(new InsertTask(batchData));
batchData.clear(); // 清除已处理的数据,释放内存
}
}
// 提交剩余的数据(如果不足BATCH_SIZE)
if (!batchData.isEmpty()) {
executorService.submit(new InsertTask(batchData));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// 等待所有任务完成
}
}
}
private static String[] parseLine(String line) {
// 根据你的文件格式解析每行数据
// 示例:假设每行包含三个字段,用逗号分隔
return line.split(",");
}
static class InsertTask implements Runnable {
private final List<String[]> data;
public InsertTask(List<String[]> data) {
this.data = data;
}
@Override
public void run() {
String insertSQL = "INSERT INTO your_table (field1, field2, field3) VALUES (?, ?, ?)";
try (Connection conn = DriverManager.getConnection(DB_URL, DB_USER, DB_PASSWORD);
PreparedStatement pstmt = conn.prepareStatement(insertSQL)) {
conn.setAutoCommit(false); // 关闭自动提交,启用事务
for (String[] record : data) {
pstmt.setString(1, record[0]);
pstmt.setString(2, record[1]);
pstmt.setString(3, record[2]);
pstmt.addBatch();
}
int[] result = pstmt.executeBatch(); // 执行批量插入
conn.commit(); // 提交事务
// 可选:打印插入结果
for (int count : result) {
System.out.println("Inserted rows: " + count);
}
} catch (SQLException e) {
e.printStackTrace();
// 可以在这里处理回滚等逻辑
}
}
}
}
注意事项
- 数据库连接池:对于生产环境,建议使用数据库连接池(如 HikariCP)来管理数据库连接,以提高性能和资源利用率。
- 错误处理:示例代码中的错误处理较为简单,应根据实际需求增加重试机制、日志记录等。
- 数据解析:
parseLine
方法应根据你的文件格式进行适当修改。 - 内存监控:在处理大规模数据时,可以使用 JVM 监控工具(如 VisualVM)来监控内存使用情况,确保不会发生内存溢出。
通过上述方法,你可以有效地从文件中读取大量数据并批量插入数据库,同时避免 JVM 内存溢出问题。