cloud_topics: L1 prefetch service (replace l1_reader_cache)#30947
Open
Lazin wants to merge 4 commits into
Open
cloud_topics: L1 prefetch service (replace l1_reader_cache)#30947Lazin wants to merge 4 commits into
Lazin wants to merge 4 commits into
Conversation
Collaborator
Contributor
|
Any OMB results? |
Contributor
Author
Not yet |
eddc3a1 to
80cb1d9
Compare
80cb1d9 to
56d572c
Compare
Replace l1_reader_cache with a prefetch-based L1 read path. A per-shard l1_fetch_service drives one fetch_stream per partition that walks the metastore lookahead, downloads L1 objects via a chunk_downloader (concurrent ranged GETs with ordered reassembly through a chunk_reassembler), and decodes batches ahead of consumer demand. A memory_first_reader serves those batches from each stream's own batch_cache index, ghost-filling reconciliation gaps so a real gap is distinguishable from an evicted offset. Prefetch is paced by an adaptive prefetch_pacer (window/chunk sizing) and bounded by a per-shard l1_memory_broker reservation accounting against a configurable budget. Caching is best-effort: prefetched batches are inserted into the shared batch cache and may be evicted under pressure, in which case a subsequent fetch re-produces them; when the batch cache is disabled the L1 read path returns no data (surfaced as a rate-limited warning) rather than falling back to direct object storage. frontend::make_reader routes fetches at or below the last reconciled offset through l1_fetch_service::get_reader; L0 continues to serve the tail. Adds the cloud_topics_l1_prefetch_* configuration knobs and prefetch metrics. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
…ough
An exhausted fetch_stream reached end-of-stream and stopped re-querying
the metastore, so it never produces beyond produced_through() even though
position() sits one offset past it. find_reusable reused it up to
position(), so at the L0->L1 boundary -- a stream exhausts at the current
LRO and reconciliation then advances LRO past it, making the boundary
offset L1-resident in a new object -- a fetch for that offset reused the
dead stream and parked the reader forever. Consumers hung at the boundary
("unable to fetch offsets" / consume timeouts) once LRO crossed where they
were reading.
Cap an exhausted stream's reusable range at produced_through() so the
boundary offset spins a fresh stream that re-queries the metastore and
downloads the newly reconciled object.
Test: exhausted_stream_not_reused_after_lro_advance reproduces the hang
(reader returns no data) and passes with the fix.
Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
… rename Rebasing onto current dev picked up two cloud_io changes that postdate the original commits: the scheduler->admission_control rename (header + bazel target) and the new skip_cache parameter on io::read_object / read_object_as_iobuf. Point chunk_downloader and the prefetch BUILD targets at cloud_io/admission_control_types, and update the test io fakes to the 4-arg read_object signature. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
56d572c to
de752fb
Compare
memory_first_reader blocked on wait_for_offset for offsets past the last reconciled offset, waiting up to the fetch deadline (paced by the reconciler tick) for reconciliation to catch up instead of ceding to L0. On a large backlog this starves the consume. The former level_one_reader avoided this by ending the stream once the metastore had no further object, so the next fetch fell through to L0. Cap the L1 reader's max_offset at the last reconciled offset in make_reader (cloud mode) so it ends the stream at the reconciled boundary; the consumer's next fetch re-evaluates and the L0 reader serves the un-reconciled tail. Signed-off-by: Evgeny Lazin <4lazin@gmail.com>
Collaborator
Retry command for Build#86594please wait until all jobs are finished before running the slash command |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Replaces the passive
l1_reader_cachewith an active, per-shard L1 prefetch service that drives L1 producers ahead of demand into the in-memory record batch cache, so Kafka fetches are served from memory instead of paying per-object metadata/footer/open latency on the read critical path.Motivation
The previous L1 read path was structurally throughput-limited:
get_extent_metadata_forwards→leader_router, cross-node), a footer object-storage GET, and an object open, paid serially at each object boundary (lookahead existed but defaulted to off).slice / latency.What this PR does
A per-shard
l1_fetch_service(registry + scheduler + memory broker) ownsfetch_streamproducers keyed by(tidp, read-position). Each stream runs a prefetch chain — metastore lookahead → footer → concurrent byte-range downloads → reassembly into whole batches → inserted into the shared batch cache. Kafka fetches receive a lightweightmemory_first_readerthat reads only from the cache; a miss is a demand signal the scheduler prioritizes.get_readeronly reuses a warm stream when the requested offset is still resident (residency-gated reuse), so an evicted offset self-heals on retry rather than stalling.New components live under
src/v/cloud_topics/level_one/prefetch/(chunk_reassembler,chunk_downloader,l1_memory_broker,prefetch_pacer,fetch_stream,memory_first_reader,l1_fetch_service+prefetch_probe).l1_reader_cacheis removed and its config knobs deprecated. New tunables:cloud_topics_l1_prefetch_*.Limitation: the L1 read path requires the batch cache; if it is disabled (
disable_batch_cache, or a topic withcache_enabled=false) L1 reads return no data (by design; a rate-limited warning is emitted). Network-bandwidth budgeting and per-stream fairness are deferred.Backports Required
Release Notes
Improvements
cloud_topics_l1_prefetch_*tunables control the memory budget, download concurrency, and pacing.