Skip to content

[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path#56920

Closed
LuciferYang wants to merge 1 commit into
apache:masterfrom
LuciferYang:worktree-spark-fetch-tmpfile-leak
Closed

[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path#56920
LuciferYang wants to merge 1 commit into
apache:masterfrom
LuciferYang:worktree-spark-fetch-tmpfile-leak

Conversation

@LuciferYang

@LuciferYang LuciferYang commented Jul 1, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

OneForOneBlockFetcher.DownloadCallback.onComplete (the fetch-to-disk path) calls channel.closeAndRead() to close the download channel and obtain a ManagedBuffer over the temp file, then either registers the temp file for later cleanup or deletes it. On an IOException from closeAndRead() it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling onFailure already closes the channel and deletes the temp file; the success path did not.

This wraps closeAndRead() so a failure deletes the temp file and closes the channel before rethrowing, matching onFailure. The happy path is unchanged: the buffer is handed to the listener only after closeAndRead() succeeds.

Why are the changes needed?

The temp file is created on the fetching executor when a remote block exceeds spark.maxRemoteBlockSizeFetchToMem and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails stubs a closeAndRead() that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite' -> 14 tests pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…he fetch-to-disk path

OneForOneBlockFetcher.DownloadCallback.onComplete leaked the temporary
block file when channel.closeAndRead() threw an IOException. On that path
onComplete rethrew without any cleanup, whereas onFailure closes the
channel and deletes the temp file. Since the file was never registered
for later cleanup (registerTempFileToClean runs only after a successful
read), it was orphaned on disk. closeAndRead() typically throws from the
underlying channel close, in which case the channel may already be closed
but the partially written temp file remains.

Guard closeAndRead() so a failure deletes the temp file and closes the
channel (in case it was left open) before rethrowing, mirroring onFailure.
The buffer is handed to the listener only after closeAndRead() succeeds,
so ownership still transfers exactly once on the happy path.
LuciferYang added a commit that referenced this pull request Jul 2, 2026
…he fetch-to-disk path

### What changes were proposed in this pull request?

`OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not.

This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds.

### Why are the changes needed?

The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 62fd9cb)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang added a commit that referenced this pull request Jul 2, 2026
…he fetch-to-disk path

### What changes were proposed in this pull request?

`OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not.

This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds.

### Why are the changes needed?

The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 62fd9cb)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang added a commit that referenced this pull request Jul 2, 2026
…he fetch-to-disk path

### What changes were proposed in this pull request?

`OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not.

This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds.

### Why are the changes needed?

The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 62fd9cb)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang added a commit that referenced this pull request Jul 2, 2026
…he fetch-to-disk path

### What changes were proposed in this pull request?

`OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not.

This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds.

### Why are the changes needed?

The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak.

Authored-by: YangJie <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 62fd9cb)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
@LuciferYang

Copy link
Copy Markdown
Contributor Author

Merge summary (posted by merge_spark_pr.py):

@LuciferYang

Copy link
Copy Markdown
Contributor Author

Thank @dongjoon-hyun

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants