Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions doc/developer/foundationdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,12 @@ Consensus offers an interface where keys are mapped to values identified by sequ
Sequence numbers are integers that increase monotonically, ensuring that updates to a key are ordered.
We map the consensus structure to the following schema in FoundationDB:

* Looking up the latest sequence number for a key:
```
./data/<key> -> <sequence_number>
```
* Looking up data by key and sequence number:
```
./data/<key>/<sequence_number> -> <value>
```
The `data` directory contains entries for each key and sequence number pair, mapping to the corresponding value.
The latest sequence number for a key is determined by a reverse scan of the data entries, which ensures locality between the current sequence number lookup and the actual data.
* The `keys` directory contains entries for each key to simplify listing all keys:
```
./keys/<key> -> []
Expand Down
41 changes: 23 additions & 18 deletions src/persist/src/foundationdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
//! We're storing the consensus data in a subspace. Each key maps to a subspace
//! with the following structure:
//! * `./keys/<key> -> ()` to track existing keys.
//! * `./data/<key> -> <seqno>` mapping to the latest seqno for the key.
//! * `./data/<key>/<seqno> -> <data>` mapping seqnos to data blobs.
//!
//! The current seqno for a key is determined by a reverse scan of the data
//! entries, rather than a separate head pointer. This ensures locality between
//! the latest data and any metadata lookups.

use std::io::Write;

Expand All @@ -27,8 +30,7 @@ use mz_foundationdb::directory::{
Directory, DirectoryError, DirectoryLayer, DirectoryOutput, DirectorySubspace,
};
use mz_foundationdb::tuple::{
PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset, pack,
unpack,
PackError, PackResult, Subspace, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
};
use mz_foundationdb::{
Database, FdbBindingError, FdbError, KeySelector, RangeOption, TransactError, TransactOption,
Expand Down Expand Up @@ -201,16 +203,21 @@ impl FdbConsensus {
}
}

/// Returns the latest entry for a key by reverse scanning the data entries.
///
/// If `snapshot` is true, uses snapshot reads which don't create conflict
/// ranges. Use snapshot=true for read-only `head()` calls, and snapshot=false
/// for `compare_and_set()` where we need conflict detection.
async fn head_trx(
&self,
trx: &Transaction,
data_key: &Subspace,
snapshot: bool,
) -> Result<Option<VersionedData>, FdbTransactError> {
let mut range = RangeOption::from(data_key).rev();
range.limit = Some(1);
range.mode = mz_foundationdb::options::StreamingMode::Exact;
// Allow snapshot reads as we don't need the latest data for head, just some recent data.
let values = trx.get_range(&range, 1, true).await?;
let values = trx.get_range(&range, 1, snapshot).await?;
if let Some(kv) = values.first() {
let seqno = data_key.unpack(kv.key())?;
Ok(Some(VersionedData {
Expand All @@ -229,18 +236,14 @@ impl FdbConsensus {
new: &VersionedData,
key: &str,
) -> Result<CaSResult, FdbTransactError> {
let seqno = trx
.get(data_key.bytes(), false)
.await?
.map(|data| unpack(&data))
.transpose()?;
// Use non-snapshot read to create conflict ranges for concurrent writes.
let current = self.head_trx(trx, data_key, false).await?;
let current_seqno = current.map(|v| v.seqno);

if expected != &seqno {
if expected != &current_seqno {
return Ok(CaSResult::ExpectationMismatch);
}

trx.set(data_key.bytes(), &pack(&new.seqno));

if expected.is_none() {
// If expected is `None`, it's a new key which we need to register in the keys directory.
let key = self.keys.pack(&key);
Expand Down Expand Up @@ -294,10 +297,11 @@ impl FdbConsensus {
data_key: &Subspace,
until: &SeqNo,
) -> Result<(), FdbTransactError> {
let seqno = trx.get(data_key.bytes(), false).await?;
if let Some(seqno) = &seqno {
let current_seqno: SeqNo = unpack(seqno)?;
if current_seqno < *until {
// Snapshot read is fine here - truncate is idempotent and the validation
// only gets more permissive if a concurrent CaS increases the seqno.
let current = self.head_trx(trx, data_key, true).await?;
if let Some(current) = current {
if current.seqno < *until {
return Err(ExternalError::Determinate(
anyhow!("upper bound too high for truncate: {until}").into(),
)
Expand Down Expand Up @@ -351,7 +355,8 @@ impl Consensus for FdbConsensus {
.db
.transact_boxed(
&data_key,
|trx, data_key| self.head_trx(trx, data_key).boxed(),
// Use snapshot read - we don't need strict consistency for head().
|trx, data_key| self.head_trx(trx, data_key, true).boxed(),
TransactOption::default(),
)
.await?;
Expand Down
Loading