Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
950 changes: 907 additions & 43 deletions Cargo.lock

Large diffs are not rendered by default.

70 changes: 70 additions & 0 deletions src/storage-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,69 @@ pub const STORAGE_ROCKSDB_USE_MERGE_OPERATOR: Config<bool> = Config::new(
"Use the native rocksdb merge operator where possible.",
);

/// Whether to use the object storage (SlateDB) backend for upsert instead of RocksDB.
pub const STORAGE_UPSERT_USE_OBJECT_STORAGE: Config<bool> = Config::new(
"storage_upsert_use_object_storage",
false,
"Use SlateDB (object storage) backend for upsert instead of RocksDB.",
);

/// Whether to use in-memory storage for the SlateDB upsert backend (for testing).
/// Only used when `storage_upsert_use_object_storage` is true.
/// When true, ignores S3 bucket/region/endpoint settings and uses in-memory storage.
pub const STORAGE_UPSERT_OBJECT_STORAGE_IN_MEMORY: Config<bool> = Config::new(
"storage_upsert_object_storage_in_memory",
true,
"Use in-memory storage for SlateDB upsert backend (for testing). When false, uses S3.",
);

/// The S3 bucket name for the SlateDB upsert backend.
/// Only used when `storage_upsert_use_object_storage` is true and
/// `storage_upsert_object_storage_in_memory` is false.
pub const STORAGE_UPSERT_OBJECT_STORAGE_S3_BUCKET: Config<&'static str> = Config::new(
"storage_upsert_object_storage_s3_bucket",
"",
"S3 bucket name for SlateDB upsert backend.",
);

/// The AWS region for the SlateDB upsert backend S3 bucket.
/// Only used when `storage_upsert_use_object_storage` is true and
/// `storage_upsert_object_storage_in_memory` is false.
pub const STORAGE_UPSERT_OBJECT_STORAGE_S3_REGION: Config<&'static str> = Config::new(
"storage_upsert_object_storage_s3_region",
"us-east-1",
"AWS region for SlateDB upsert backend S3 bucket.",
);

/// Optional custom endpoint for the SlateDB upsert backend (e.g., for MinIO or LocalStack).
/// Only used when `storage_upsert_use_object_storage` is true and
/// `storage_upsert_object_storage_in_memory` is false.
pub const STORAGE_UPSERT_OBJECT_STORAGE_S3_ENDPOINT: Config<&'static str> = Config::new(
"storage_upsert_object_storage_s3_endpoint",
"",
"Custom S3 endpoint for SlateDB upsert backend. Empty string uses AWS default.",
);

/// AWS access key ID for the SlateDB upsert backend.
/// Only used when `storage_upsert_use_object_storage` is true and
/// `storage_upsert_object_storage_in_memory` is false.
/// Empty string uses environment variables or default AWS credential chain.
pub const STORAGE_UPSERT_OBJECT_STORAGE_S3_ACCESS_KEY_ID: Config<&'static str> = Config::new(
"storage_upsert_object_storage_s3_access_key_id",
"",
"AWS access key ID for SlateDB upsert backend. Empty uses default credential chain.",
);

/// AWS secret access key for the SlateDB upsert backend.
/// Only used when `storage_upsert_use_object_storage` is true and
/// `storage_upsert_object_storage_in_memory` is false.
/// Empty string uses environment variables or default AWS credential chain.
pub const STORAGE_UPSERT_OBJECT_STORAGE_S3_SECRET_ACCESS_KEY: Config<&'static str> = Config::new(
"storage_upsert_object_storage_s3_secret_access_key",
"",
"AWS secret access key for SlateDB upsert backend. Empty uses default credential chain.",
);

/// If `storage_upsert_prevent_snapshot_buffering` is true, this prevents the upsert
/// operator from buffering too many events from the upstream snapshot. In the absence
/// of hydration flow control, this could prevent certain workloads from causing egregiously
Expand Down Expand Up @@ -376,6 +439,13 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&STORAGE_RECLOCK_TO_LATEST)
.add(&STORAGE_ROCKSDB_CLEANUP_TRIES)
.add(&STORAGE_ROCKSDB_USE_MERGE_OPERATOR)
.add(&STORAGE_UPSERT_USE_OBJECT_STORAGE)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_IN_MEMORY)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_S3_BUCKET)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_S3_REGION)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_S3_ENDPOINT)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_S3_ACCESS_KEY_ID)
.add(&STORAGE_UPSERT_OBJECT_STORAGE_S3_SECRET_ACCESS_KEY)
.add(&STORAGE_SERVER_MAINTENANCE_INTERVAL)
.add(&STORAGE_SUSPEND_AND_RESTART_DELAY)
.add(&STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING)
Expand Down
2 changes: 2 additions & 0 deletions src/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mz-interchange = { path = "../interchange" }
mz-kafka-util = { path = "../kafka-util" }
mz-mysql-util = { path = "../mysql-util" }
mz-ore = { path = "../ore", features = ["async", "tracing", "chrono", "metrics", "columnation"] }
object_store = { version = "0.12.5", features = ["aws"] }
mz-persist = { path = "../persist" }
mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
Expand Down Expand Up @@ -97,6 +98,7 @@ thiserror = { version = "2.0.18" }
uuid = { version = "1.19.0", features = ["serde", "v4"] }
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
arrow-ipc = "57"
slatedb = "0.10.1"

[dev-dependencies]
async-trait = "0.1.89"
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ pub struct StorageInstanceContext {
/// The memory limit of the materialize cluster replica. This will
/// be used to calculate and configure the maximum inflight bytes for backpressure
pub cluster_memory_limit: Option<usize>,
/// A unique identifier for this replica instance. Used to differentiate
/// multiple replicas when using shared object storage backends.
pub replica_id: uuid::Uuid,
}

impl StorageInstanceContext {
Expand All @@ -405,10 +408,15 @@ impl StorageInstanceContext {
rocksdb::Env::mem_env()?
};

// Generate a unique ID for this replica instance. This is used to
// differentiate replicas when using shared object storage backends.
let replica_id = uuid::Uuid::new_v4();

Ok(Self {
scratch_directory,
rocksdb_env,
cluster_memory_limit,
replica_id,
})
}
}
Expand Down
181 changes: 152 additions & 29 deletions src/storage/src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,79 @@ use types::{

#[cfg(test)]
pub mod memory;
pub(crate) mod object_storage;
pub(crate) mod rocksdb;
// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
pub(crate) mod types;

use serde::de::DeserializeOwned;
use types::{GetStats, MergeStats, MergeValue, PutStats, PutValue, UpsertValueAndSize};

pub type UpsertValue = Result<Row, Box<UpsertError>>;

/// An enum that wraps the different upsert state backends, allowing runtime
/// selection of the backend via dyncfg.
pub(crate) enum UpsertBackend<T, O> {
RocksDB(rocksdb::RocksDB<T, O>),
ObjectStorage(object_storage::ObjectStorageUpsertBackend<T, O>),
}

#[async_trait::async_trait(?Send)]
impl<T, O> UpsertStateBackend<T, O> for UpsertBackend<T, O>
where
O: Send + Sync + Serialize + DeserializeOwned + 'static,
T: Eq + Send + Sync + Serialize + DeserializeOwned + 'static,
{
fn supports_merge(&self) -> bool {
match self {
UpsertBackend::RocksDB(inner) => inner.supports_merge(),
UpsertBackend::ObjectStorage(inner) => inner.supports_merge(),
}
}

async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
where
P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>,
{
match self {
UpsertBackend::RocksDB(inner) => inner.multi_put(puts).await,
UpsertBackend::ObjectStorage(inner) => inner.multi_put(puts).await,
}
}

async fn multi_get<'r, G, R>(
&mut self,
gets: G,
results_out: R,
) -> Result<GetStats, anyhow::Error>
where
G: IntoIterator<Item = UpsertKey>,
R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
{
match self {
UpsertBackend::RocksDB(inner) => inner.multi_get(gets, results_out).await,
UpsertBackend::ObjectStorage(inner) => inner.multi_get(gets, results_out).await,
}
}

async fn multi_merge<P>(&mut self, merges: P) -> Result<MergeStats, anyhow::Error>
where
P: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>,
{
match self {
UpsertBackend::RocksDB(inner) => inner.multi_merge(merges).await,
UpsertBackend::ObjectStorage(inner) => inner.multi_merge(merges).await,
}
}

async fn close(self) -> Result<(), anyhow::Error> {
match self {
UpsertBackend::RocksDB(inner) => inner.close().await,
UpsertBackend::ObjectStorage(inner) => inner.close().await,
}
}
}

#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct UpsertKey([u8; 32]);

Expand Down Expand Up @@ -254,6 +321,34 @@ where
let rocksdb_use_native_merge_operator =
dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());

// Whether to use the object storage (SlateDB) backend instead of RocksDB.
let use_object_storage =
dyncfgs::STORAGE_UPSERT_USE_OBJECT_STORAGE.get(storage_configuration.config_set());

// Object storage configuration (only used if use_object_storage is true)
let object_storage_config = object_storage::ObjectStorageConfig {
in_memory: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_IN_MEMORY
.get(storage_configuration.config_set()),
s3_bucket: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_S3_BUCKET
.get(storage_configuration.config_set())
.to_string(),
s3_region: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_S3_REGION
.get(storage_configuration.config_set())
.to_string(),
s3_endpoint: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_S3_ENDPOINT
.get(storage_configuration.config_set())
.to_string(),
s3_access_key_id: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_S3_ACCESS_KEY_ID
.get(storage_configuration.config_set())
.to_string(),
s3_secret_access_key: dyncfgs::STORAGE_UPSERT_OBJECT_STORAGE_S3_SECRET_ACCESS_KEY
.get(storage_configuration.config_set())
.to_string(),
replica_id: instance_context.replica_id,
// Use the same dyncfg as RocksDB for controlling merge operator usage
use_merge_operator: rocksdb_use_native_merge_operator,
};

let upsert_config = UpsertConfig {
shrink_upsert_unused_buffers_by_ratio: storage_configuration
.parameters
Expand Down Expand Up @@ -283,43 +378,61 @@ where
?rocksdb_dir,
?tuning,
?rocksdb_use_native_merge_operator,
?use_object_storage,
?object_storage_config,
"rendering upsert source"
);

let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);

let env = instance_context.rocksdb_env.clone();
let source_id = source_config.id;
let worker_id = source_config.worker_id;

// A closure that will initialize and return a configured RocksDB instance
let rocksdb_init_fn = move || async move {
let merge_operator = if rocksdb_use_native_merge_operator {
Some((
"upsert_state_snapshot_merge_v1".to_string(),
|a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
},
))
} else {
None
};
rocksdb::RocksDB::new(
mz_rocksdb::RocksDBInstance::new(
&rocksdb_dir,
mz_rocksdb::InstanceOptions::new(
env,
rocksdb_cleanup_tries,
merge_operator,
// For now, just use the same config as the one used for
// merging snapshots.
upsert_bincode_opts(),
),
tuning,
rocksdb_shared_metrics,
rocksdb_instance_metrics,
// A closure that will initialize and return a configured backend instance
let backend_init_fn = move || async move {
if use_object_storage {
// Use SlateDB (object storage) backend
let backend = object_storage::ObjectStorageUpsertBackend::from_config(
&object_storage_config,
source_id,
worker_id,
)
.unwrap(),
)
.await
.expect("failed to create object storage backend");
UpsertBackend::ObjectStorage(backend)
} else {
// Use RocksDB backend
let merge_operator = if rocksdb_use_native_merge_operator {
Some((
"upsert_state_snapshot_merge_v1".to_string(),
|a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
},
))
} else {
None
};
let rocksdb_backend = rocksdb::RocksDB::new(
mz_rocksdb::RocksDBInstance::new(
&rocksdb_dir,
mz_rocksdb::InstanceOptions::new(
env,
rocksdb_cleanup_tries,
merge_operator,
// For now, just use the same config as the one used for
// merging snapshots.
upsert_bincode_opts(),
),
tuning,
rocksdb_shared_metrics,
rocksdb_instance_metrics,
)
.unwrap(),
);
UpsertBackend::RocksDB(rocksdb_backend)
}
};

upsert_operator(
Expand All @@ -330,7 +443,7 @@ where
previous_token,
upsert_metrics,
source_config,
rocksdb_init_fn,
backend_init_fn,
upsert_config,
storage_configuration,
prevent_snapshot_buffering,
Expand Down Expand Up @@ -929,6 +1042,16 @@ where
output_handle.give_container(&output_cap, &mut output_updates);
}
}

// Clean up the backend resources before shutting down.
if let Err(e) = state.close().await {
tracing::warn!(
"timely-{} upsert source {} failed to close state backend: {}",
source_config.worker_id,
source_config.id,
e
);
}
});

(
Expand Down
5 changes: 5 additions & 0 deletions src/storage/src/upsert/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ where
}
Ok(stats)
}

async fn close(self) -> Result<(), anyhow::Error> {
// In-memory storage has nothing to clean up.
Ok(())
}
}
Loading