[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path#56920
Closed
LuciferYang wants to merge 1 commit into
Closed
[SPARK-57803][CORE] Delete the temp file when closeAndRead fails on the fetch-to-disk path#56920LuciferYang wants to merge 1 commit into
LuciferYang wants to merge 1 commit into
Conversation
…he fetch-to-disk path OneForOneBlockFetcher.DownloadCallback.onComplete leaked the temporary block file when channel.closeAndRead() threw an IOException. On that path onComplete rethrew without any cleanup, whereas onFailure closes the channel and deletes the temp file. Since the file was never registered for later cleanup (registerTempFileToClean runs only after a successful read), it was orphaned on disk. closeAndRead() typically throws from the underlying channel close, in which case the channel may already be closed but the partially written temp file remains. Guard closeAndRead() so a failure deletes the temp file and closes the channel (in case it was left open) before rethrowing, mirroring onFailure. The buffer is handed to the listener only after closeAndRead() succeeds, so ownership still transfers exactly once on the happy path.
dongjoon-hyun
approved these changes
Jul 1, 2026
LuciferYang
added a commit
that referenced
this pull request
Jul 2, 2026
…he fetch-to-disk path ### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 62fd9cb) Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang
added a commit
that referenced
this pull request
Jul 2, 2026
…he fetch-to-disk path ### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 62fd9cb) Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang
added a commit
that referenced
this pull request
Jul 2, 2026
…he fetch-to-disk path ### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 62fd9cb) Signed-off-by: yangjie01 <yangjie01@baidu.com>
LuciferYang
added a commit
that referenced
this pull request
Jul 2, 2026
…he fetch-to-disk path ### What changes were proposed in this pull request? `OneForOneBlockFetcher.DownloadCallback.onComplete` (the fetch-to-disk path) calls `channel.closeAndRead()` to close the download channel and obtain a `ManagedBuffer` over the temp file, then either registers the temp file for later cleanup or deletes it. On an `IOException` from `closeAndRead()` it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The sibling `onFailure` already closes the channel and deletes the temp file; the success path did not. This wraps `closeAndRead()` so a failure deletes the temp file and closes the channel before rethrowing, matching `onFailure`. The happy path is unchanged: the buffer is handed to the listener only after `closeAndRead()` succeeds. ### Why are the changes needed? The temp file is created on the fetching executor when a remote block exceeds `spark.maxRemoteBlockSizeFetchToMem` and is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New `OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFails` stubs a `closeAndRead()` that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it. `build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'` -> 14 tests pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code Closes #56920 from LuciferYang/worktree-spark-fetch-tmpfile-leak. Authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com> (cherry picked from commit 62fd9cb) Signed-off-by: yangjie01 <yangjie01@baidu.com>
Contributor
Author
Contributor
Author
|
Thank @dongjoon-hyun |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
OneForOneBlockFetcher.DownloadCallback.onComplete(the fetch-to-disk path) callschannel.closeAndRead()to close the download channel and obtain aManagedBufferover the temp file, then either registers the temp file for later cleanup or deletes it. On anIOExceptionfromcloseAndRead()it rethrew without any cleanup, leaking the temp file (which is registered for cleanup only after a successful read) and possibly an open channel. The siblingonFailurealready closes the channel and deletes the temp file; the success path did not.This wraps
closeAndRead()so a failure deletes the temp file and closes the channel before rethrowing, matchingonFailure. The happy path is unchanged: the buffer is handed to the listener only aftercloseAndRead()succeeds.Why are the changes needed?
The temp file is created on the fetching executor when a remote block exceeds
spark.maxRemoteBlockSizeFetchToMemand is streamed to disk. Executors are long-lived, so the orphaned temp files accumulate under the local directories until the executor exits.Does this PR introduce any user-facing change?
No.
How was this patch tested?
New
OneForOneBlockFetcherSuite.testFetchToDiskClosesAndDeletesTempFileWhenCloseAndReadFailsstubs acloseAndRead()that throws and verifies the channel is closed, the temp file is deleted, and it is never registered for cleanup. It fails without the fix and passes with it.build/sbt 'network-shuffle/testOnly *OneForOneBlockFetcherSuite'-> 14 tests pass.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code