diff --git a/doc/developer/foundationdb.md b/doc/developer/foundationdb.md index 7fbffe201b35e..77fd4686a5eb9 100644 --- a/doc/developer/foundationdb.md +++ b/doc/developer/foundationdb.md @@ -30,15 +30,12 @@ Consensus offers an interface where keys are mapped to values identified by sequ Sequence numbers are integers that increase monotonically, ensuring that updates to a key are ordered. We map the consensus structure to the following schema in FoundationDB: -* Looking up the latest sequence number for a key: - ``` - ./data/ -> - ``` * Looking up data by key and sequence number: ``` ./data// -> ``` The `data` directory contains entries for each key and sequence number pair, mapping to the corresponding value. + The latest sequence number for a key is determined by a reverse scan of the data entries, which ensures locality between the current sequence number lookup and the actual data. * The `keys` directory contains entries for each key to simplify listing all keys: ``` ./keys/ -> [] diff --git a/src/persist/src/foundationdb.rs b/src/persist/src/foundationdb.rs index 2d058a6553ec1..61b206008bbe6 100644 --- a/src/persist/src/foundationdb.rs +++ b/src/persist/src/foundationdb.rs @@ -12,8 +12,11 @@ //! We're storing the consensus data in a subspace. Each key maps to a subspace //! with the following structure: //! * `./keys/ -> ()` to track existing keys. -//! * `./data/ -> ` mapping to the latest seqno for the key. //! * `./data// -> ` mapping seqnos to data blobs. +//! +//! The current seqno for a key is determined by a reverse scan of the data +//! entries, rather than a separate head pointer. This ensures locality between +//! the latest data and any metadata lookups. use std::io::Write; @@ -27,8 +30,7 @@ use mz_foundationdb::directory::{ Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace, }; use mz_foundationdb::tuple::{ - PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack, - unpack, + PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, }; use mz_foundationdb::{ Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption, @@ -201,16 +203,21 @@ impl FdbConsensus { } } + /// Returns the latest entry for a key by reverse scanning the data entries. + /// + /// If `snapshot` is true, uses snapshot reads which don't create conflict + /// ranges. Use snapshot=true for read-only `head()` calls, and snapshot=false + /// for `compare_and_set()` where we need conflict detection. async fn head_trx( &self, trx: &Transaction, data_key: &Subspace, + snapshot: bool, ) -> Result, FdbTransactError> { let mut range = RangeOption::from(data_key).rev(); range.limit = Some(1); range.mode = mz_foundationdb::options::StreamingMode::Exact; - // Allow snapshot reads as we don't need the latest data for head, just some recent data. - let values = trx.get_range(&range, 1, true).await?; + let values = trx.get_range(&range, 1, snapshot).await?; if let Some(kv) = values.first() { let seqno = data_key.unpack(kv.key())?; Ok(Some(VersionedData { @@ -229,18 +236,14 @@ impl FdbConsensus { new: &VersionedData, key: &str, ) -> Result { - let seqno = trx - .get(data_key.bytes(), false) - .await? - .map(|data| unpack(&data)) - .transpose()?; + // Use non-snapshot read to create conflict ranges for concurrent writes. + let current = self.head_trx(trx, data_key, false).await?; + let current_seqno = current.map(|v| v.seqno); - if expected != &seqno { + if expected != ¤t_seqno { return Ok(CaSResult::ExpectationMismatch); } - trx.set(data_key.bytes(), &pack(&new.seqno)); - if expected.is_none() { // If expected is `None`, it's a new key which we need to register in the keys directory. let key = self.keys.pack(&key); @@ -294,10 +297,11 @@ impl FdbConsensus { data_key: &Subspace, until: &SeqNo, ) -> Result<(), FdbTransactError> { - let seqno = trx.get(data_key.bytes(), false).await?; - if let Some(seqno) = &seqno { - let current_seqno: SeqNo = unpack(seqno)?; - if current_seqno < *until { + // Snapshot read is fine here - truncate is idempotent and the validation + // only gets more permissive if a concurrent CaS increases the seqno. + let current = self.head_trx(trx, data_key, true).await?; + if let Some(current) = current { + if current.seqno < *until { return Err(ExternalError::Determinate( anyhow!("upper bound too high for truncate: {until}").into(), ) @@ -351,7 +355,8 @@ impl Consensus for FdbConsensus { .db .transact_boxed( &data_key, - |trx, data_key| self.head_trx(trx, data_key).boxed(), + // Use snapshot read - we don't need strict consistency for head(). + |trx, data_key| self.head_trx(trx, data_key, true).boxed(), TransactOption::default(), ) .await?;