-
Notifications
You must be signed in to change notification settings - Fork 671
[Draft] Support distributed cache storage #5459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces distributed cache storage capabilities to FastDeploy, specifically adding support for MooncakeStore as a third-level (L3) cache backend. The implementation enables hierarchical KV cache management with GPU → CPU → distributed storage tiers, improving cache efficiency for large-scale deployments.
Key Changes:
- Added MooncakeStore integration as a distributed storage backend for KV cache with async read/write operations
- Implemented new cache transfer operations with storage-aware swap mechanisms (GPU↔Storage)
- Extended prefix cache manager with storage block matching and prefetch capabilities
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 29 comments.
Show a summary per file
| File | Description |
|---|---|
fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py |
Core MooncakeStore implementation with batch get/set operations and zero-copy transfers |
fastdeploy/cache_manager/transfer_factory/kvcache_storage.py |
Abstract base class defining KVCache storage interface |
fastdeploy/cache_manager/cache_transfer_manager.py |
Extended transfer manager with storage backend support and async operations |
fastdeploy/cache_manager/prefix_cache_manager.py |
Added storage block matching, prefetch, and write-back capabilities |
fastdeploy/engine/args_utils.py |
Added CLI arguments for storage backend and write policy configuration |
fastdeploy/config.py |
Updated configuration to support hierarchical KVCache settings |
fastdeploy/inter_communicator/engine_cache_queue.py |
Added barriers for storage-GPU synchronization |
custom_ops/gpu_ops/swap_cache_layout.cu |
New CUDA kernel for cache layout swapping between GPU and CPU |
tests/operators/test_swap_layout.py |
Unit tests for cache layout swap operations |
fastdeploy/cache_manager/transfer_factory/mooncake_store/README.md |
Documentation for MooncakeStore setup and usage |
| # logger.info(f"GPU2STORAGE {swap_node_ids} {gpu_block_id} {transfer_task_id}") | ||
| self.write_to_storage_thread_pool.submit( | ||
| self.write_back_storage_task, | ||
| swap_node_ids, |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out debug log should be removed before committing. This appears to be leftover debugging code.
| # logger.info(f"GPU2STORAGE {swap_node_ids} {gpu_block_id} {transfer_task_id}") | |
| self.write_to_storage_thread_pool.submit( | |
| self.write_back_storage_task, | |
| swap_node_ids, | |
| self.write_to_storage_thread_pool.submit( | |
| self.write_back_storage_task, | |
| swap_node_ids, | |
| swap_node_ids, |
| 同步ssd任务 | ||
| 当issue_ssd_task中设置is_sync为False时需主动调用该函数同步结果 |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstring should be in English for consistency with the rest of the codebase. The Chinese docstring should be translated to English.
| 同步ssd任务 | |
| 当issue_ssd_task中设置is_sync为False时需主动调用该函数同步结果 | |
| Synchronize SSD task. | |
| When is_sync is set to False in issue_ssd_task, this function should be called manually to synchronize the result. |
| return | ||
|
|
||
| for i in range(len(keys)): | ||
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | ||
| return | ||
|
|
||
| self._put_batch_zero_copy_impl(keys, target_location, target_sizes) | ||
|
|
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function returns early without returning a value when the conditions are met, but the function signature indicates it should return bool. The early returns at lines 181 and 185 don't return anything, which will return None instead of a boolean. Either return False or True as appropriate, or change the return type annotation.
| return | |
| for i in range(len(keys)): | |
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | |
| return | |
| self._put_batch_zero_copy_impl(keys, target_location, target_sizes) | |
| return False | |
| for i in range(len(keys)): | |
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | |
| return False | |
| self._put_batch_zero_copy_impl(keys, target_location, target_sizes) | |
| return True |
| return | ||
|
|
||
| for i in range(len(keys)): | ||
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | ||
| return |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function returns early without returning a value when the conditions are met. The early returns at lines 213 and 217 don't return anything, which will return None. This is inconsistent with the return type annotation paddle.Tensor | None. Consider explicitly returning None for clarity.
| return | |
| for i in range(len(keys)): | |
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | |
| return | |
| return None | |
| for i in range(len(keys)): | |
| if keys[i] is None or target_location[i] is None or target_sizes[i] is None: | |
| return None |
| { | ||
| "local_hostname":"localhost", | ||
| "metadata_server":"http://127.0.0.1:7882/metadata", | ||
| "protocol":"rdma", | ||
| "device_name": "mlx5_2,mlx5_3", | ||
| "master_server_address":"127.0.0.1:7721" | ||
| } |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The configuration file contains localhost addresses which are likely test/development values. Consider adding a comment in the file or adjacent README indicating this is a sample configuration that should be customized for production use.
| True, | ||
| ) | ||
|
|
||
| def request_match_storage_blocks(self, request, extra_gpu_block_ids): |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function request_match_storage_blocks lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.
| def request_match_storage_blocks(self, request, extra_gpu_block_ids): | |
| def request_match_storage_blocks(self, request, extra_gpu_block_ids): | |
| """ | |
| Requests and matches storage blocks for a given request, potentially prefetching | |
| key-value cache blocks from the storage backend if available. | |
| Args: | |
| request: An object containing the request information. It must have | |
| the attributes `request_id` (unique identifier for the request) | |
| and `prompt_token_ids` (list of input token IDs). | |
| extra_gpu_block_ids: A list of additional GPU block IDs to be used | |
| during the prefetch operation. | |
| Returns: | |
| list: A list of storage block IDs that have been matched or prefetched | |
| for the given request. If no storage backend is available, returns | |
| an empty list. | |
| """ |
|
|
||
| async def _run_async_write(self, uncached_keys_k, uncached_keys_v, uncached_block_ids): | ||
| try: | ||
| # logger.info(f"[rank {self.rank}/{self.n_ranks}] write cache to storage {uncached_keys_k} {uncached_block_ids}") |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out debug log should be removed before committing. This appears to be leftover debugging code.
| # logger.info(f"[rank {self.rank}/{self.n_ranks}] write cache to storage {uncached_keys_k} {uncached_block_ids}") |
| ): | ||
|
|
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function write_back_storage lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.
| ): | |
| ): | |
| """ | |
| Initiates a write-back operation from GPU or CPU cache to storage for the specified task. | |
| Args: | |
| task_id (Any): Unique identifier for the cache write-back task. | |
| hash_keys (Any): Keys identifying the cache blocks to be written back. | |
| gpu_block_ids (list or None): List of GPU block IDs to write back, or None if not applicable. | |
| cpu_block_ids (list or None): List of CPU block IDs to write back, or None if not applicable. | |
| is_sync (bool, optional): If True, the method waits for the write-back operation to complete before returning. | |
| If False, the operation is performed asynchronously. Defaults to True. | |
| timeout (float, optional): Timeout in seconds for the write-back operation. Defaults to 0.1. | |
| Returns: | |
| None | |
| Side Effects: | |
| - Schedules a cache data transfer task to storage. | |
| - If is_sync is True, blocks until the write-back is complete. | |
| """ |
| self.task_write_back_event[task_id].wait() | ||
| del self.task_write_back_event[task_id] | ||
|
|
||
| def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1): |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function prefetch_kv_cache lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.
| def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1): | |
| def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1): | |
| """ | |
| Prefetch key-value cache blocks from storage to GPU memory. | |
| This function initiates a data transfer task to move cache blocks identified by | |
| `hash_keys` from storage to GPU, assigning them to the specified `gpu_block_ids`. | |
| The operation can be performed synchronously or asynchronously. | |
| Args: | |
| task_id (str): Unique identifier for the prefetch task. | |
| hash_keys (list): List of hash keys identifying the cache blocks to prefetch. | |
| gpu_block_ids (list): List of GPU block IDs where the cache blocks will be loaded. | |
| is_sync (bool, optional): If True, waits for the prefetch operation to complete before returning. | |
| If False, returns immediately after initiating the transfer. Defaults to True. | |
| timeout (float, optional): Timeout in seconds for the transfer operation. Defaults to 0.1. | |
| Returns: | |
| list: List of storage block IDs that were prefetched if `is_sync` is True; otherwise, an empty list. | |
| """ |
| ) | ||
| for i in range(self.layer_num): | ||
| assert gpu_key_register_buffer[i].numpy()[0, 0, 0, 0] == i | ||
| print("swap cache layout(device to host):", time.time() - ss) |
Copilot
AI
Dec 9, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent use of parentheses. Line 37 uses standard ASCII parentheses "()", but this line uses Chinese full-width parentheses "()". Use consistent ASCII parentheses throughout.
| print("swap cache layout(device to host):", time.time() - ss) | |
| print("swap cache layout(device to host):", time.time() - ss) |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #5459 +/- ##
==========================================
Coverage ? 59.58%
==========================================
Files ? 334
Lines ? 41446
Branches ? 6252
==========================================
Hits ? 24696
Misses ? 14878
Partials ? 1872
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Motivation
Support distributed cache storage
Modifications
xxx
Usage or Command
xxx
Accuracy Tests
xxx
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.