-
Notifications
You must be signed in to change notification settings - Fork 108
Open
Labels
Type: bugSomething isn't workingSomething isn't working
Description
Describe the bug, including details regarding any error messages, version, and platform.
There is a bug in the root allocator where it doesn't release the memory when it's closed. Attached the sample code that prints the RSS memory.
I ran this code with the following jvm params
--add-opens=java.base/java.nio=ALL-UNNAMED -Xms1g -Xmx1g -XX:+AlwaysPreTouch
Code
public final class ArrowOnlyParquetMemoryRepro {
private static final long ARROW_MEMORY_LIMIT_BYTES = 100L * 1024L * 1024L; // 100 MB
private static final int ARROW_BATCH_SIZE_ROWS = 1024 * 1024; // 1 million rows
private ArrowOnlyParquetMemoryRepro() {
}
public static void main(String[] args) throws Exception {
// Parse arguments
File parquetFile = null;
List<String> scanCols = null;
for (int i = 0; i < args.length; i++) {
if ("--path".equals(args[i]) && i + 1 < args.length) {
parquetFile = new File(args[i + 1]);
} else if ("--scanCols".equals(args[i]) && i + 1 < args.length) {
scanCols = parseCsv(args[i + 1]);
}
}
if (parquetFile == null || scanCols == null || scanCols.isEmpty()) {
System.err.println("Usage: ArrowOnlyParquetMemoryRepro --path <file.parquet> --scanCols <col1,col2,...>");
System.exit(1);
}
if (!parquetFile.exists() || !parquetFile.isFile()) {
System.err.println("File does not exist: " + parquetFile.getAbsolutePath());
System.exit(1);
}
log("config", "file=" + parquetFile.getAbsolutePath() + " cols=" + scanCols);
logRss("start");
// Scan each column
for (String col : scanCols) {
logRss("before_scan:" + col);
scanColumn(parquetFile, col);
logRss("after_scan:" + col);
}
logRss("end");
// Force GC and observe RSS
System.gc();
Thread.sleep(1000);
logRss("after_gc");
// Keep process alive for external memory inspection (e.g., via `ps` or `/proc/<pid>/status`)
log("info", "Process will now sleep. Use 'ps -o rss,pid,command' or 'cat /proc/<pid>/status' to inspect RSS.");
while (true) {
Thread.sleep(60_000);
System.gc();
logRss("sleeping");
}
}
private static void scanColumn(File parquetFile, String col) throws Exception {
long rowsScanned = 0;
try (RootAllocator allocator = new RootAllocator(ARROW_MEMORY_LIMIT_BYTES);
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, parquetFile.toURI().toString());
Dataset dataset = datasetFactory.finish()) {
ScanOptions options = new ScanOptions.Builder(ARROW_BATCH_SIZE_ROWS)
.columns(Optional.of(new String[]{col}))
.build();
try (Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()) {
while (reader.loadNextBatch()) {
VectorSchemaRoot root = reader.getVectorSchemaRoot();
if (root == null || root.getFieldVectors().isEmpty()) {
continue;
}
FieldVector vector = root.getVector(col);
if (vector == null) {
log("warn", "Column not found in file: " + col);
return;
}
int rowCount = root.getRowCount();
// Access values to ensure they're materialized
for (int i = 0; i < rowCount; i++) {
vector.getObject(i);
}
rowsScanned += rowCount;
}
}
}
log("scan", "col=" + col + " rowsScanned=" + rowsScanned);
}
private static List<String> parseCsv(String csv) {
List<String> result = new ArrayList<>();
for (String part : csv.split(",")) {
String trimmed = part.trim();
if (!trimmed.isEmpty()) {
result.add(trimmed);
}
}
return result;
}
private static void logRss(String phase) {
long rssKb = readRssKb();
if (rssKb < 0) {
log("rss", "phase=" + phase + " rssKb=N/A (not on Linux)");
} else {
log("rss", "phase=" + phase + " rssKb=" + rssKb + " rssMb=" + (rssKb / 1024));
}
}
private static long readRssKb() {
File status = new File("/proc/self/status");
if (!status.exists()) {
return -1;
}
try (BufferedReader br = new BufferedReader(new FileReader(status, StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("VmRSS:")) {
String[] parts = line.trim().split("\\s+");
if (parts.length >= 2) {
return Long.parseLong(parts[1]);
}
}
}
return -1;
} catch (Exception e) {
return -1;
}
}
private static void log(String tag, String msg) {
System.out.println("[" + tag + "] " + msg);
}
}
This is the output
[rss] phase=start rssKb=1121340 rssMb=1095
[rss] phase=before_scan:col_name rssKb=1121808 rssMb=1095
2026-01-08T10:58:01,040 INFO [main] org.apache.arrow.memory.BaseAllocator - Debug mode disabled. Enable with the VM option -Darrow.memory.debug.allocator=true.
2026-01-08T10:58:01,049 INFO [main] org.apache.arrow.memory.DefaultAllocationManagerOption - allocation manager type not specified, using netty as the default type
2026-01-08T10:58:01,076 INFO [main] org.apache.arrow.memory.CheckAllocator - Using DefaultAllocationManager at <class name>
[scan] col=col_name rowsScanned=2734751
[rss] phase=after_scan:col_name rssKb=1313364 rssMb=1282
[rss] phase=before_scan:col_name rssKb=1313364 rssMb=1282
[scan] col=col_name rowsScanned=2734751
[rss] phase=after_scan:col_name rssKb=1331144 rssMb=1299
[rss] phase=before_scan:col_name rssKb=1331144 rssMb=1299
[scan] col=col_name rowsScanned=2734751
[rss] phase=after_scan:col_name rssKb=1338228 rssMb=1306
[rss] phase=end rssKb=1338228 rssMb=1306
[rss] phase=after_gc rssKb=1338228 rssMb=1306
[info] Process will now sleep. Use 'ps -o rss,pid,command' or 'cat /proc/<pid>/status' to inspect RSS.
[rss] phase=sleeping rssKb=1338228 rssMb=1306
Metadata
Metadata
Assignees
Labels
Type: bugSomething isn't workingSomething isn't working