Skip to content

Conversation

@juncaipeng
Copy link
Collaborator

Motivation

Support distributed cache storage

Modifications

xxx

Usage or Command

xxx

Accuracy Tests

xxx

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings December 9, 2025 09:41
@paddle-bot
Copy link

paddle-bot bot commented Dec 9, 2025

Thanks for your contribution!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces distributed cache storage capabilities to FastDeploy, specifically adding support for MooncakeStore as a third-level (L3) cache backend. The implementation enables hierarchical KV cache management with GPU → CPU → distributed storage tiers, improving cache efficiency for large-scale deployments.

Key Changes:

  • Added MooncakeStore integration as a distributed storage backend for KV cache with async read/write operations
  • Implemented new cache transfer operations with storage-aware swap mechanisms (GPU↔Storage)
  • Extended prefix cache manager with storage block matching and prefetch capabilities

Reviewed changes

Copilot reviewed 20 out of 20 changed files in this pull request and generated 29 comments.

Show a summary per file
File Description
fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py Core MooncakeStore implementation with batch get/set operations and zero-copy transfers
fastdeploy/cache_manager/transfer_factory/kvcache_storage.py Abstract base class defining KVCache storage interface
fastdeploy/cache_manager/cache_transfer_manager.py Extended transfer manager with storage backend support and async operations
fastdeploy/cache_manager/prefix_cache_manager.py Added storage block matching, prefetch, and write-back capabilities
fastdeploy/engine/args_utils.py Added CLI arguments for storage backend and write policy configuration
fastdeploy/config.py Updated configuration to support hierarchical KVCache settings
fastdeploy/inter_communicator/engine_cache_queue.py Added barriers for storage-GPU synchronization
custom_ops/gpu_ops/swap_cache_layout.cu New CUDA kernel for cache layout swapping between GPU and CPU
tests/operators/test_swap_layout.py Unit tests for cache layout swap operations
fastdeploy/cache_manager/transfer_factory/mooncake_store/README.md Documentation for MooncakeStore setup and usage

Comment on lines +729 to +732
# logger.info(f"GPU2STORAGE {swap_node_ids} {gpu_block_id} {transfer_task_id}")
self.write_to_storage_thread_pool.submit(
self.write_back_storage_task,
swap_node_ids,
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out debug log should be removed before committing. This appears to be leftover debugging code.

Suggested change
# logger.info(f"GPU2STORAGE {swap_node_ids} {gpu_block_id} {transfer_task_id}")
self.write_to_storage_thread_pool.submit(
self.write_back_storage_task,
swap_node_ids,
self.write_to_storage_thread_pool.submit(
self.write_back_storage_task,
swap_node_ids,
swap_node_ids,

Copilot uses AI. Check for mistakes.
Comment on lines +1021 to +1022
同步ssd任务
当issue_ssd_task中设置is_sync为False时需主动调用该函数同步结果
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring should be in English for consistency with the rest of the codebase. The Chinese docstring should be translated to English.

Suggested change
同步ssd任务
当issue_ssd_task中设置is_sync为False时需主动调用该函数同步结果
Synchronize SSD task.
When is_sync is set to False in issue_ssd_task, this function should be called manually to synchronize the result.

Copilot uses AI. Check for mistakes.
Comment on lines +181 to +188
return

for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return

self._put_batch_zero_copy_impl(keys, target_location, target_sizes)

Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function returns early without returning a value when the conditions are met, but the function signature indicates it should return bool. The early returns at lines 181 and 185 don't return anything, which will return None instead of a boolean. Either return False or True as appropriate, or change the return type annotation.

Suggested change
return
for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return
self._put_batch_zero_copy_impl(keys, target_location, target_sizes)
return False
for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return False
self._put_batch_zero_copy_impl(keys, target_location, target_sizes)
return True

Copilot uses AI. Check for mistakes.
Comment on lines +213 to +217
return

for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function returns early without returning a value when the conditions are met. The early returns at lines 213 and 217 don't return anything, which will return None. This is inconsistent with the return type annotation paddle.Tensor | None. Consider explicitly returning None for clarity.

Suggested change
return
for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return
return None
for i in range(len(keys)):
if keys[i] is None or target_location[i] is None or target_sizes[i] is None:
return None

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +7
{
"local_hostname":"localhost",
"metadata_server":"http://127.0.0.1:7882/metadata",
"protocol":"rdma",
"device_name": "mlx5_2,mlx5_3",
"master_server_address":"127.0.0.1:7721"
}
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The configuration file contains localhost addresses which are likely test/development values. Consider adding a comment in the file or adjacent README indicating this is a sample configuration that should be customized for production use.

Copilot uses AI. Check for mistakes.
True,
)

def request_match_storage_blocks(self, request, extra_gpu_block_ids):
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function request_match_storage_blocks lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.

Suggested change
def request_match_storage_blocks(self, request, extra_gpu_block_ids):
def request_match_storage_blocks(self, request, extra_gpu_block_ids):
"""
Requests and matches storage blocks for a given request, potentially prefetching
key-value cache blocks from the storage backend if available.
Args:
request: An object containing the request information. It must have
the attributes `request_id` (unique identifier for the request)
and `prompt_token_ids` (list of input token IDs).
extra_gpu_block_ids: A list of additional GPU block IDs to be used
during the prefetch operation.
Returns:
list: A list of storage block IDs that have been matched or prefetched
for the given request. If no storage backend is available, returns
an empty list.
"""

Copilot uses AI. Check for mistakes.

async def _run_async_write(self, uncached_keys_k, uncached_keys_v, uncached_block_ids):
try:
# logger.info(f"[rank {self.rank}/{self.n_ranks}] write cache to storage {uncached_keys_k} {uncached_block_ids}")
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out debug log should be removed before committing. This appears to be leftover debugging code.

Suggested change
# logger.info(f"[rank {self.rank}/{self.n_ranks}] write cache to storage {uncached_keys_k} {uncached_block_ids}")

Copilot uses AI. Check for mistakes.
Comment on lines +990 to +991
):

Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function write_back_storage lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.

Suggested change
):
):
"""
Initiates a write-back operation from GPU or CPU cache to storage for the specified task.
Args:
task_id (Any): Unique identifier for the cache write-back task.
hash_keys (Any): Keys identifying the cache blocks to be written back.
gpu_block_ids (list or None): List of GPU block IDs to write back, or None if not applicable.
cpu_block_ids (list or None): List of CPU block IDs to write back, or None if not applicable.
is_sync (bool, optional): If True, the method waits for the write-back operation to complete before returning.
If False, the operation is performed asynchronously. Defaults to True.
timeout (float, optional): Timeout in seconds for the write-back operation. Defaults to 0.1.
Returns:
None
Side Effects:
- Schedules a cache data transfer task to storage.
- If is_sync is True, blocks until the write-back is complete.
"""

Copilot uses AI. Check for mistakes.
self.task_write_back_event[task_id].wait()
del self.task_write_back_event[task_id]

def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1):
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function prefetch_kv_cache lacks a docstring explaining its purpose, parameters, and return value. Please add comprehensive documentation for this new function.

Suggested change
def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1):
def prefetch_kv_cache(self, task_id, hash_keys, gpu_block_ids, is_sync=True, timeout=0.1):
"""
Prefetch key-value cache blocks from storage to GPU memory.
This function initiates a data transfer task to move cache blocks identified by
`hash_keys` from storage to GPU, assigning them to the specified `gpu_block_ids`.
The operation can be performed synchronously or asynchronously.
Args:
task_id (str): Unique identifier for the prefetch task.
hash_keys (list): List of hash keys identifying the cache blocks to prefetch.
gpu_block_ids (list): List of GPU block IDs where the cache blocks will be loaded.
is_sync (bool, optional): If True, waits for the prefetch operation to complete before returning.
If False, returns immediately after initiating the transfer. Defaults to True.
timeout (float, optional): Timeout in seconds for the transfer operation. Defaults to 0.1.
Returns:
list: List of storage block IDs that were prefetched if `is_sync` is True; otherwise, an empty list.
"""

Copilot uses AI. Check for mistakes.
)
for i in range(self.layer_num):
assert gpu_key_register_buffer[i].numpy()[0, 0, 0, 0] == i
print("swap cache layout(device to host):", time.time() - ss)
Copy link

Copilot AI Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent use of parentheses. Line 37 uses standard ASCII parentheses "()", but this line uses Chinese full-width parentheses "()". Use consistent ASCII parentheses throughout.

Suggested change
print("swap cache layoutdevice to host):", time.time() - ss)
print("swap cache layout(device to host):", time.time() - ss)

Copilot uses AI. Check for mistakes.
@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 22.30216% with 432 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@1f63000). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/cache_manager/cache_transfer_manager.py 16.66% 103 Missing and 2 partials ⚠️
.../transfer_factory/mooncake_store/mooncake_store.py 28.67% 97 Missing ⚠️
fastdeploy/cache_manager/prefix_cache_manager.py 21.92% 86 Missing and 3 partials ⚠️
...r_factory/mooncake_store/test_mooncake_transfer.py 0.00% 55 Missing ⚠️
fastdeploy/engine/sched/resource_manager_v1.py 2.32% 42 Missing ⚠️
...nager/transfer_factory/mooncake_store/unit_test.py 0.00% 40 Missing ⚠️
fastdeploy/cache_manager/ops.py 0.00% 2 Missing ⚠️
...tdeploy/cache_manager/transfer_factory/__init__.py 71.42% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #5459   +/-   ##
==========================================
  Coverage           ?   59.58%           
==========================================
  Files              ?      334           
  Lines              ?    41446           
  Branches           ?     6252           
==========================================
  Hits               ?    24696           
  Misses             ?    14878           
  Partials           ?     1872           
Flag Coverage Δ
GPU 59.58% <22.30%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants