diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index e53bbe3295b3e..a2c61ae9fc728 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -176,7 +176,7 @@ pub enum Command { instance_id: ComputeInstanceId, tx: oneshot::Sender< Result< - mz_compute_client::controller::instance::Client, + mz_compute_client::controller::instance_client::InstanceClient, mz_compute_client::controller::error::InstanceMissing, >, >, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 44bb4981d853f..2dfb34d12d012 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -105,7 +105,7 @@ use mz_catalog::memory::objects::{ use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent}; use mz_compute_client::as_of_selection; use mz_compute_client::controller::error::{ - CollectionLookupError, DataflowCreationError, InstanceMissing, + CollectionFrontiersError, CollectionMissing, DataflowCreationError, InstanceMissing, }; use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::DataflowDescription; @@ -3814,7 +3814,7 @@ impl Coordinator { objects: BTreeSet, t: Timestamp, state: WatchSetResponse, - ) -> Result<(), CollectionLookupError> { + ) -> Result<(), CollectionFrontiersError> { let ws_id = self.controller.install_compute_watch_set(objects, t)?; self.connection_watch_sets .entry(conn_id.clone()) @@ -3823,7 +3823,6 @@ impl Coordinator { self.installed_watch_sets.insert(ws_id, (conn_id, state)); Ok(()) } - /// Install a _watch set_ in the controller that is automatically associated with the given /// connection id. The watchset will be automatically cleared if the connection terminates /// before the watchset completes. @@ -3833,7 +3832,7 @@ impl Coordinator { objects: BTreeSet, t: Timestamp, state: WatchSetResponse, - ) -> Result<(), CollectionLookupError> { + ) -> Result<(), CollectionMissing> { let ws_id = self.controller.install_storage_watch_set(objects, t)?; self.connection_watch_sets .entry(conn_id.clone()) diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index aa3a63b7a821b..0248ee00fadc3 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -1927,9 +1927,7 @@ impl Coordinator { if let Some(ws) = watch_set { if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) { let _ = tx.send(Err( - AdapterError::concurrent_dependency_drop_from_collection_lookup_error( - e, cluster_id, - ), + AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e), )); return; } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 45b0b49a9c7d9..11ff2ca8e0a21 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -1222,10 +1222,7 @@ impl crate::coord::Coordinator { if let Some(ws) = watch_set { self.install_peek_watch_sets(conn_id.clone(), ws) .map_err(|e| { - AdapterError::concurrent_dependency_drop_from_collection_lookup_error( - e, - compute_instance, - ) + AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e) })?; } @@ -1289,10 +1286,7 @@ impl crate::coord::Coordinator { // for the error case (no ExecuteContextExtra is created here). if let Some(ws) = watch_set { if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) { - let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error( - e, - compute_instance, - ); + let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e); send_err(tx, err); return; } diff --git a/src/adapter/src/coord/statement_logging.rs b/src/adapter/src/coord/statement_logging.rs index 4f8a9d165a324..c0dd87eda36a5 100644 --- a/src/adapter/src/coord/statement_logging.rs +++ b/src/adapter/src/coord/statement_logging.rs @@ -12,7 +12,8 @@ use std::sync::{Arc, Mutex}; use std::time::Duration; use mz_adapter_types::connection::ConnectionId; -use mz_compute_client::controller::error::CollectionLookupError; +use mz_compute_client::controller::error::{CollectionFrontiersError, CollectionMissing}; +use mz_compute_types::ComputeInstanceId; use mz_controller_types::ClusterId; use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime}; use mz_ore::task::spawn; @@ -25,6 +26,7 @@ use mz_storage_client::controller::IntrospectionType; use qcell::QCell; use rand::SeedableRng; use sha2::{Digest, Sha256}; +use thiserror::Error; use tokio::time::MissedTickBehavior; use uuid::Uuid; @@ -775,7 +777,7 @@ impl Coordinator { &mut self, conn_id: ConnectionId, watch_set: WatchSetCreation, - ) -> Result<(), CollectionLookupError> { + ) -> Result<(), WatchSetInstallError> { let WatchSetCreation { logging_id, timestamp, @@ -804,3 +806,29 @@ impl Coordinator { Ok(()) } } + +/// Errors arising during peek watch set installation. +#[derive(Error, Debug)] +pub enum WatchSetInstallError { + /// The specified compute instance does not exist. + #[error("instance does not exist: {0}")] + InstanceMissing(ComputeInstanceId), + /// The collection does not exist. + #[error("collection does not exist: {0}")] + CollectionMissing(GlobalId), +} + +impl From for WatchSetInstallError { + fn from(error: CollectionFrontiersError) -> Self { + match error { + CollectionFrontiersError::InstanceMissing(id) => Self::InstanceMissing(id), + CollectionFrontiersError::CollectionMissing(id) => Self::CollectionMissing(id), + } + } +} + +impl From for WatchSetInstallError { + fn from(error: CollectionMissing) -> Self { + Self::CollectionMissing(error.0) + } +} diff --git a/src/adapter/src/error.rs b/src/adapter/src/error.rs index 33b62b65bdf59..e1dd7aa04da57 100644 --- a/src/adapter/src/error.rs +++ b/src/adapter/src/error.rs @@ -16,7 +16,8 @@ use dec::TryFromDecimalError; use itertools::Itertools; use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER; use mz_compute_client::controller::error as compute_error; -use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing}; +use mz_compute_client::controller::error::InstanceMissing; + use mz_compute_types::ComputeInstanceId; use mz_expr::EvalError; use mz_ore::error::ErrorExt; @@ -38,7 +39,9 @@ use tokio::sync::oneshot; use tokio_postgres::error::SqlState; use crate::coord::NetworkPolicyError; +use crate::coord::statement_logging::WatchSetInstallError; use crate::optimize::OptimizerError; +use crate::peek_client::CollectionLookupError; /// Errors that can occur in the coordinator. #[derive(Debug)] @@ -695,12 +698,14 @@ impl AdapterError { // is appropriate, so we want to make the conversion target explicit at the call site. // For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster, // in which case `ConcurrentDependencyDrop` would not be appropriate. + pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self { AdapterError::ConcurrentDependencyDrop { dependency_kind: "cluster", dependency_id: e.0.to_string(), } } + pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self { AdapterError::ConcurrentDependencyDrop { dependency_kind: "collection", @@ -730,11 +735,26 @@ impl AdapterError { } } + pub fn concurrent_dependency_drop_from_watch_set_install_error( + e: WatchSetInstallError, + ) -> Self { + match e { + WatchSetInstallError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "cluster", + dependency_id: id.to_string(), + }, + WatchSetInstallError::CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop { + dependency_kind: "collection", + dependency_id: id.to_string(), + }, + } + } + pub fn concurrent_dependency_drop_from_instance_peek_error( - e: mz_compute_client::controller::instance::PeekError, + e: mz_compute_client::controller::instance_client::PeekError, compute_instance: ComputeInstanceId, ) -> AdapterError { - use mz_compute_client::controller::instance::PeekError::*; + use mz_compute_client::controller::instance_client::PeekError::*; match e { ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop { dependency_kind: "replica", diff --git a/src/adapter/src/peek_client.rs b/src/adapter/src/peek_client.rs index 5a999db237f32..165a125ecb29a 100644 --- a/src/adapter/src/peek_client.rs +++ b/src/adapter/src/peek_client.rs @@ -11,12 +11,15 @@ use std::collections::BTreeMap; use std::sync::Arc; use differential_dataflow::consolidation::consolidate; -use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing}; +use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing}; +use mz_compute_client::controller::instance_client::InstanceClient; +use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown}; use mz_compute_client::protocol::command::PeekTarget; use mz_compute_types::ComputeInstanceId; use mz_expr::row::RowCollection; use mz_ore::cast::CastFrom; use mz_persist_client::PersistClient; +use mz_repr::GlobalId; use mz_repr::Timestamp; use mz_repr::global_id::TransientIdGen; use mz_repr::{RelationDesc, Row}; @@ -24,6 +27,7 @@ use mz_sql::optimizer_metrics::OptimizerMetrics; use mz_storage_types::sources::Timeline; use mz_timestamp_oracle::TimestampOracle; use prometheus::Histogram; +use thiserror::Error; use timely::progress::Antichain; use tokio::sync::oneshot; use uuid::Uuid; @@ -54,8 +58,7 @@ pub struct PeekClient { /// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak /// if a long-running user session keeps peeking on clusters that are being created and dropped /// in a hot loop. Hopefully this won't occur any time soon. - compute_instances: - BTreeMap>, + compute_instances: BTreeMap>, /// Handle to storage collections for reading frontiers and policies. pub storage_collections: StorageCollectionsHandle, /// A generator for transient `GlobalId`s, shared with Coordinator. @@ -93,7 +96,7 @@ impl PeekClient { pub async fn ensure_compute_instance_client( &mut self, compute_instance: ComputeInstanceId, - ) -> Result, InstanceMissing> { + ) -> Result, InstanceMissing> { if !self.compute_instances.contains_key(&compute_instance) { let client = self .call_coordinator(|tx| Command::GetComputeInstanceClient { @@ -501,3 +504,44 @@ impl PeekClient { )); } } + +/// Errors arising during collection lookup in peek client operations. +#[derive(Error, Debug)] +pub enum CollectionLookupError { + /// The specified compute instance does not exist. + #[error("instance does not exist: {0}")] + InstanceMissing(ComputeInstanceId), + /// The specified compute instance has shut down. + #[error("the instance has shut down")] + InstanceShutDown, + /// The compute collection does not exist. + #[error("collection does not exist: {0}")] + CollectionMissing(GlobalId), +} + +impl From for CollectionLookupError { + fn from(error: InstanceMissing) -> Self { + Self::InstanceMissing(error.0) + } +} + +impl From for CollectionLookupError { + fn from(_error: InstanceShutDown) -> Self { + Self::InstanceShutDown + } +} + +impl From for CollectionLookupError { + fn from(error: CollectionMissing) -> Self { + Self::CollectionMissing(error.0) + } +} + +impl From for CollectionLookupError { + fn from(error: AcquireReadHoldsError) -> Self { + match error { + AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id), + AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown, + } + } +} diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index 931109a10153a..5b0f352ef3349 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -67,11 +67,12 @@ use tokio::time::{self, MissedTickBehavior}; use uuid::Uuid; use crate::controller::error::{ - CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError, + CollectionFrontiersError, CollectionMissing, CollectionUpdateError, DataflowCreationError, HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError, ReplicaCreationError, ReplicaDropError, }; use crate::controller::instance::{Instance, SharedCollectionState}; +use crate::controller::instance_client::InstanceClient; use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink}; use crate::controller::replica::ReplicaConfig; use crate::logging::{LogVariant, LoggingConfig}; @@ -79,12 +80,13 @@ use crate::metrics::ComputeControllerMetrics; use crate::protocol::command::{ComputeParameters, PeekTarget}; use crate::protocol::response::{PeekResponse, SubscribeBatch}; +mod instance; mod introspection; mod replica; mod sequential_hydration; pub mod error; -pub mod instance; +pub mod instance_client; pub(crate) type StorageCollections = Arc< dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync, @@ -337,11 +339,11 @@ impl ComputeController { self.instances.get(&id).ok_or(InstanceMissing(id)) } - /// Return an `instance::Client` for the indicated compute instance. + /// Return an `instance_client::InstanceClient` for the indicated compute instance. pub fn instance_client( &self, id: ComputeInstanceId, - ) -> Result, InstanceMissing> { + ) -> Result, InstanceMissing> { self.instance(id).map(|instance| instance.client.clone()) } @@ -371,7 +373,7 @@ impl ComputeController { &self, collection_id: GlobalId, instance_id: Option, - ) -> Result, CollectionLookupError> { + ) -> Result, CollectionFrontiersError> { let collection = match instance_id { Some(id) => self.instance(id)?.collection(collection_id)?, None => self @@ -527,7 +529,7 @@ where logs.push((log, id, shared)); } - let client = instance::Client::spawn( + let client = InstanceClient::spawn( id, self.build_info, Arc::clone(&self.storage_collections), @@ -1067,13 +1069,13 @@ where #[derive(Debug)] struct InstanceState { - client: instance::Client, + client: InstanceClient, replicas: BTreeSet, collections: BTreeMap>, } impl InstanceState { - fn new(client: instance::Client, collections: BTreeMap>) -> Self { + fn new(client: InstanceClient, collections: BTreeMap>) -> Self { Self { client, replicas: Default::default(), diff --git a/src/compute-client/src/controller/error.rs b/src/compute-client/src/controller/error.rs index bbbf0a83e7334..e0d90f5fe27ad 100644 --- a/src/compute-client/src/controller/error.rs +++ b/src/compute-client/src/controller/error.rs @@ -22,7 +22,6 @@ use thiserror::Error; pub use mz_storage_types::errors::CollectionMissing; -use crate::controller::instance::InstanceShutDown; use crate::controller::{ComputeInstanceId, ReplicaId}; /// The error returned by replica-targeted peeks and subscribes when the target replica @@ -45,33 +44,24 @@ pub struct InstanceExists(pub ComputeInstanceId); #[error("No replicas found in cluster for target list.")] pub struct HydrationCheckBadTarget(pub Vec); -/// Errors arising during compute collection lookup. +/// Errors arising during compute collection frontiers lookup. #[derive(Error, Debug)] -pub enum CollectionLookupError { +pub enum CollectionFrontiersError { /// The specified compute instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// The specified compute instance has shut down. - #[error("the instance has shut down")] - InstanceShutDown, /// The compute collection does not exist. #[error("collection does not exist: {0}")] CollectionMissing(GlobalId), } -impl From for CollectionLookupError { +impl From for CollectionFrontiersError { fn from(error: InstanceMissing) -> Self { Self::InstanceMissing(error.0) } } -impl From for CollectionLookupError { - fn from(_error: InstanceShutDown) -> Self { - Self::InstanceShutDown - } -} - -impl From for CollectionLookupError { +impl From for CollectionFrontiersError { fn from(error: CollectionMissing) -> Self { Self::CollectionMissing(error.0) } @@ -80,10 +70,10 @@ impl From for CollectionLookupError { /// Errors arising during compute replica creation. #[derive(Error, Debug)] pub enum ReplicaCreationError { - /// TODO(database-issues#7533): Add documentation. + /// The target compute instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// TODO(database-issues#7533): Add documentation. + /// A replica with the given ID already exists on the target instance. #[error("replica exists already: {0}")] ReplicaExists(ReplicaId), } @@ -97,10 +87,10 @@ impl From for ReplicaCreationError { /// Errors arising during compute replica removal. #[derive(Error, Debug)] pub enum ReplicaDropError { - /// TODO(database-issues#7533): Add documentation. + /// The target compute instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// TODO(database-issues#7533): Add documentation. + /// The replica to be dropped does not exist on the target instance. #[error("replica does not exist: {0}")] ReplicaMissing(ReplicaId), } @@ -183,10 +173,10 @@ impl From for PeekError { /// Errors arising during collection updates. #[derive(Error, Debug)] pub enum CollectionUpdateError { - /// TODO(database-issues#7533): Add documentation. + /// The target compute instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// TODO(database-issues#7533): Add documentation. + /// The collection to be updated does not exist. #[error("collection does not exist: {0}")] CollectionMissing(GlobalId), } @@ -206,13 +196,13 @@ impl From for CollectionUpdateError { /// Errors arising during collection read policy assignment. #[derive(Error, Debug)] pub enum ReadPolicyError { - /// TODO(database-issues#7533): Add documentation. + /// The target compute instance does not exist. #[error("instance does not exist: {0}")] InstanceMissing(ComputeInstanceId), - /// TODO(database-issues#7533): Add documentation. + /// The collection does not exist. #[error("collection does not exist: {0}")] CollectionMissing(GlobalId), - /// TODO(database-issues#7533): Add documentation. + /// The collection is write-only and does not support read policies. #[error("collection is write-only: {0}")] WriteOnlyCollection(GlobalId), } @@ -232,7 +222,7 @@ impl From for ReadPolicyError { /// Errors arising during orphan removal. #[derive(Error, Debug)] pub enum RemoveOrphansError { - /// TODO(database-issues#7533): Add documentation. + /// An error occurred in the orchestrator while removing orphaned replicas. #[error("orchestrator error: {0}")] OrchestratorError(anyhow::Error), } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index f8cfbf3883714..1e27f07524615 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -17,7 +17,6 @@ use std::time::{Duration, Instant}; use chrono::{DateTime, DurationRound, TimeDelta, Utc}; use mz_build_info::BuildInfo; use mz_cluster_client::WallclockLagFn; -use mz_compute_types::ComputeInstanceId; use mz_compute_types::dataflows::{BuildDesc, DataflowDescription}; use mz_compute_types::plan::render_plan::RenderPlan; use mz_compute_types::sinks::{ @@ -45,13 +44,11 @@ use thiserror::Error; use timely::PartialOrder; use timely::progress::frontier::MutableAntichain; use timely::progress::{Antichain, ChangeBatch, Timestamp}; -use tokio::sync::mpsc::error::SendError; use tokio::sync::{mpsc, oneshot}; -use tracing::debug_span; use uuid::Uuid; use crate::controller::error::{ - CollectionLookupError, CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget, + CollectionMissing, ERROR_TARGET_REPLICA_FAILED, HydrationCheckBadTarget, }; use crate::controller::replica::{ReplicaClient, ReplicaConfig}; use crate::controller::{ @@ -102,32 +99,7 @@ impl From for DataflowCreationError { } } -#[derive(Error, Debug)] -#[error("the instance has shut down")] -pub(super) struct InstanceShutDown; - -/// Errors arising during peek processing. -#[derive(Error, Debug)] -pub enum PeekError { - /// The replica that the peek was issued against does not exist. - #[error("replica does not exist: {0}")] - ReplicaMissing(ReplicaId), - /// The read hold that was passed in is against the wrong collection. - #[error("read hold ID does not match peeked collection: {0}")] - ReadHoldIdMismatch(GlobalId), - /// The read hold that was passed in is for a later time than the peek's timestamp. - #[error("insufficient read hold provided: {0}")] - ReadHoldInsufficient(GlobalId), - /// The peek's target instance has shut down. - #[error("the instance has shut down")] - InstanceShutDown, -} - -impl From for PeekError { - fn from(_error: InstanceShutDown) -> Self { - Self::InstanceShutDown - } -} +use crate::controller::instance_client::PeekError; #[derive(Error, Debug)] pub(super) enum ReadPolicyError { @@ -144,173 +116,7 @@ impl From for ReadPolicyError { } /// A command sent to an [`Instance`] task. -type Command = Box) + Send>; - -/// A client for an `Instance` task. -#[derive(Clone, derivative::Derivative)] -#[derivative(Debug)] -pub struct Client { - /// A sender for commands for the instance. - command_tx: mpsc::UnboundedSender>, - /// A sender for read hold changes for collections installed on the instance. - #[derivative(Debug = "ignore")] - read_hold_tx: read_holds::ChangeTx, -} - -impl Client { - pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx { - Arc::clone(&self.read_hold_tx) - } - - /// Call a method to be run on the instance task, by sending a message to the instance. - /// Does not wait for a response message. - pub(super) fn call(&self, f: F) -> Result<(), InstanceShutDown> - where - F: FnOnce(&mut Instance) + Send + 'static, - { - let otel_ctx = OpenTelemetryContext::obtain(); - self.command_tx - .send(Box::new(move |instance| { - let _span = debug_span!("instance::call").entered(); - otel_ctx.attach_as_parent(); - - f(instance) - })) - .map_err(|_send_error| InstanceShutDown) - } - - /// Call a method to be run on the instance task, by sending a message to the instance and - /// waiting for a response message. - pub(super) async fn call_sync(&self, f: F) -> Result - where - F: FnOnce(&mut Instance) -> R + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = oneshot::channel(); - let otel_ctx = OpenTelemetryContext::obtain(); - self.command_tx - .send(Box::new(move |instance| { - let _span = debug_span!("instance::call_sync").entered(); - otel_ctx.attach_as_parent(); - let result = f(instance); - let _ = tx.send(result); - })) - .map_err(|_send_error| InstanceShutDown)?; - - rx.await.map_err(|_| InstanceShutDown) - } -} - -impl Client -where - T: ComputeControllerTimestamp, -{ - pub(super) fn spawn( - id: ComputeInstanceId, - build_info: &'static BuildInfo, - storage: StorageCollections, - peek_stash_persist_location: PersistLocation, - arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>, - metrics: InstanceMetrics, - now: NowFn, - wallclock_lag: WallclockLagFn, - dyncfg: Arc, - response_tx: mpsc::UnboundedSender>, - introspection_tx: mpsc::UnboundedSender, - read_only: bool, - ) -> Self { - let (command_tx, command_rx) = mpsc::unbounded_channel(); - - let read_hold_tx: read_holds::ChangeTx<_> = { - let command_tx = command_tx.clone(); - Arc::new(move |id, change: ChangeBatch<_>| { - let cmd: Command<_> = { - let change = change.clone(); - Box::new(move |i| i.apply_read_hold_change(id, change.clone())) - }; - command_tx.send(cmd).map_err(|_| SendError((id, change))) - }) - }; - - mz_ore::task::spawn( - || format!("compute-instance-{id}"), - Instance::new( - build_info, - storage, - peek_stash_persist_location, - arranged_logs, - metrics, - now, - wallclock_lag, - dyncfg, - command_rx, - response_tx, - Arc::clone(&read_hold_tx), - introspection_tx, - read_only, - ) - .run(), - ); - - Self { - command_tx, - read_hold_tx, - } - } - - /// Acquires a `ReadHold` and collection write frontier for each of the identified compute - /// collections. - pub async fn acquire_read_holds_and_collection_write_frontiers( - &self, - ids: Vec, - ) -> Result, Antichain)>, CollectionLookupError> { - self.call_sync(move |i| { - let mut result = Vec::new(); - for id in ids.into_iter() { - result.push(( - id, - i.acquire_read_hold(id)?, - i.collection_write_frontier(id)?, - )); - } - Ok(result) - }) - .await? - } - - /// Issue a peek by calling into the instance task. - /// - /// If this returns an error, then it didn't modify any `Instance` state. - pub async fn peek( - &self, - peek_target: PeekTarget, - literal_constraints: Option>, - uuid: Uuid, - timestamp: T, - result_desc: RelationDesc, - finishing: RowSetFinishing, - map_filter_project: mz_expr::SafeMfpPlan, - target_read_hold: ReadHold, - target_replica: Option, - peek_response_tx: oneshot::Sender, - ) -> Result<(), PeekError> { - self.call_sync(move |i| { - i.peek( - peek_target, - literal_constraints, - uuid, - timestamp, - result_desc, - finishing, - map_filter_project, - target_read_hold, - target_replica, - peek_response_tx, - ) - }) - .await? - } -} +pub(super) type Command = Box) + Send>; /// A response from a replica, composed of a replica ID, the replica's current epoch, and the /// compute response itself. @@ -864,19 +670,15 @@ impl Instance { /// This also returns `true` in case this cluster does not have any /// replicas. #[mz_ore::instrument(level = "debug")] - pub fn collection_hydrated( - &self, - collection_id: GlobalId, - ) -> Result { + pub fn collection_hydrated(&self, collection_id: GlobalId) -> Result { if self.replicas.is_empty() { return Ok(true); } - for replica_state in self.replicas.values() { let collection_state = replica_state .collections .get(&collection_id) - .ok_or(CollectionLookupError::CollectionMissing(collection_id))?; + .ok_or(CollectionMissing(collection_id))?; if collection_state.hydrated() { return Ok(true); @@ -1053,7 +855,10 @@ impl Instance { } /// Reports the current write frontier for the identified compute collection. - fn collection_write_frontier(&self, id: GlobalId) -> Result, CollectionMissing> { + pub(super) fn collection_write_frontier( + &self, + id: GlobalId, + ) -> Result, CollectionMissing> { Ok(self.collection(id)?.write_frontier()) } } @@ -1062,7 +867,7 @@ impl Instance where T: ComputeControllerTimestamp, { - fn new( + pub(super) fn new( build_info: &'static BuildInfo, storage: StorageCollections, peek_stash_persist_location: PersistLocation, @@ -1126,7 +931,7 @@ where } } - async fn run(mut self) { + pub(super) async fn run(mut self) { self.send(ComputeCommand::Hello { // The nonce is protocol iteration-specific and will be set in // `ReplicaTask::specialize_command`. @@ -1930,7 +1735,7 @@ where } /// Apply a collection read hold change. - fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch) { + pub(super) fn apply_read_hold_change(&mut self, id: GlobalId, mut update: ChangeBatch) { let Some(collection) = self.collections.get_mut(&id) else { soft_panic_or_log!( "read hold change for absent collection (id={id}, changes={update:?})" @@ -2443,7 +2248,7 @@ where /// /// This mirrors the logic used by the controller-side `InstanceState::acquire_read_hold`, /// but executes on the instance task itself. - fn acquire_read_hold(&self, id: GlobalId) -> Result, CollectionMissing> { + pub(super) fn acquire_read_hold(&self, id: GlobalId) -> Result, CollectionMissing> { // Similarly to InstanceState::acquire_read_hold and StorageCollections::acquire_read_holds, // we acquire read holds at the earliest possible time rather than returning a copy // of the implied read hold. This is so that dependents can acquire read holds on diff --git a/src/compute-client/src/controller/instance_client.rs b/src/compute-client/src/controller/instance_client.rs new file mode 100644 index 0000000000000..a6540162cf447 --- /dev/null +++ b/src/compute-client/src/controller/instance_client.rs @@ -0,0 +1,256 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A client for communicating with a compute instance task. +//! +//! This module provides the public interface for external callers (like the adapter) +//! to interact with compute instances directly for operations like peek sequencing. +//! It also has a number of `pub(super)` functions, which are for the `ComputeController`'s benefit. + +use std::sync::Arc; + +use mz_build_info::BuildInfo; +use mz_cluster_client::WallclockLagFn; +use mz_compute_types::ComputeInstanceId; +use mz_dyncfg::ConfigSet; +use mz_expr::RowSetFinishing; +use mz_ore::now::NowFn; +use mz_ore::tracing::OpenTelemetryContext; +use mz_persist_types::PersistLocation; +use mz_repr::{GlobalId, RelationDesc, Row}; +use mz_storage_types::read_holds::{self, ReadHold}; +use thiserror::Error; +use timely::progress::{Antichain, ChangeBatch}; +use tokio::sync::mpsc::error::SendError; +use tokio::sync::{mpsc, oneshot}; +use tracing::debug_span; +use uuid::Uuid; + +use crate::controller::error::CollectionMissing; +use crate::controller::instance::{Command, Instance, SharedCollectionState}; +use crate::controller::{ + ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, ReplicaId, + StorageCollections, +}; +use crate::logging::LogVariant; +use crate::metrics::InstanceMetrics; +use crate::protocol::command::PeekTarget; +use crate::protocol::response::PeekResponse; + +/// Error indicating the instance has shut down. +#[derive(Error, Debug)] +#[error("the instance has shut down")] +pub struct InstanceShutDown; + +/// Errors arising during peek processing. +#[derive(Error, Debug)] +pub enum PeekError { + /// The replica that the peek was issued against does not exist. + #[error("replica does not exist: {0}")] + ReplicaMissing(ReplicaId), + /// The read hold that was passed in is against the wrong collection. + #[error("read hold ID does not match peeked collection: {0}")] + ReadHoldIdMismatch(GlobalId), + /// The read hold that was passed in is for a later time than the peek's timestamp. + #[error("insufficient read hold provided: {0}")] + ReadHoldInsufficient(GlobalId), + /// The peek's target instance has shut down. + #[error("the instance has shut down")] + InstanceShutDown, +} + +impl From for PeekError { + fn from(_error: InstanceShutDown) -> Self { + Self::InstanceShutDown + } +} + +/// Errors arising from `InstanceClient::acquire_read_holds_and_collection_write_frontiers`. +#[derive(Error, Debug)] +pub enum AcquireReadHoldsError { + /// The compute collection does not exist. + #[error("collection does not exist: {0}")] + CollectionMissing(GlobalId), + /// The instance has shut down. + #[error("the instance has shut down")] + InstanceShutDown, +} + +impl From for AcquireReadHoldsError { + fn from(error: CollectionMissing) -> Self { + Self::CollectionMissing(error.0) + } +} + +impl From for AcquireReadHoldsError { + fn from(_error: InstanceShutDown) -> Self { + Self::InstanceShutDown + } +} + +/// A client for an `Instance` task. +#[derive(Clone, derivative::Derivative)] +#[derivative(Debug)] +pub struct InstanceClient { + /// A sender for commands for the instance. + command_tx: mpsc::UnboundedSender>, + /// A sender for read hold changes for collections installed on the instance. + #[derivative(Debug = "ignore")] + read_hold_tx: read_holds::ChangeTx, +} + +impl InstanceClient { + pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx { + Arc::clone(&self.read_hold_tx) + } + + /// Call a method to be run on the instance task, by sending a message to the instance. + /// Does not wait for a response message. + pub(super) fn call(&self, f: F) -> Result<(), InstanceShutDown> + where + F: FnOnce(&mut Instance) + Send + 'static, + { + let otel_ctx = OpenTelemetryContext::obtain(); + self.command_tx + .send(Box::new(move |instance| { + let _span = debug_span!("instance_client::call").entered(); + otel_ctx.attach_as_parent(); + + f(instance) + })) + .map_err(|_send_error| InstanceShutDown) + } + + /// Call a method to be run on the instance task, by sending a message to the instance and + /// waiting for a response message. + pub(super) async fn call_sync(&self, f: F) -> Result + where + F: FnOnce(&mut Instance) -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let otel_ctx = OpenTelemetryContext::obtain(); + self.command_tx + .send(Box::new(move |instance| { + let _span = debug_span!("instance_client::call_sync").entered(); + otel_ctx.attach_as_parent(); + let result = f(instance); + let _ = tx.send(result); + })) + .map_err(|_send_error| InstanceShutDown)?; + + rx.await.map_err(|_| InstanceShutDown) + } + + pub(super) fn spawn( + id: ComputeInstanceId, + build_info: &'static BuildInfo, + storage: StorageCollections, + peek_stash_persist_location: PersistLocation, + arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>, + metrics: InstanceMetrics, + now: NowFn, + wallclock_lag: WallclockLagFn, + dyncfg: Arc, + response_tx: mpsc::UnboundedSender>, + introspection_tx: mpsc::UnboundedSender, + read_only: bool, + ) -> Self { + let (command_tx, command_rx) = mpsc::unbounded_channel(); + + let read_hold_tx: read_holds::ChangeTx<_> = { + let command_tx = command_tx.clone(); + Arc::new(move |id, change: ChangeBatch<_>| { + let cmd: Command<_> = { + let change = change.clone(); + Box::new(move |i| i.apply_read_hold_change(id, change.clone())) + }; + command_tx.send(cmd).map_err(|_| SendError((id, change))) + }) + }; + + mz_ore::task::spawn( + || format!("compute-instance-{id}"), + Instance::new( + build_info, + storage, + peek_stash_persist_location, + arranged_logs, + metrics, + now, + wallclock_lag, + dyncfg, + command_rx, + response_tx, + Arc::clone(&read_hold_tx), + introspection_tx, + read_only, + ) + .run(), + ); + + Self { + command_tx, + read_hold_tx, + } + } + + /// Acquires a `ReadHold` and collection write frontier for each of the identified compute + /// collections. + pub async fn acquire_read_holds_and_collection_write_frontiers( + &self, + ids: Vec, + ) -> Result, Antichain)>, AcquireReadHoldsError> { + self.call_sync(move |i| { + let mut result = Vec::new(); + for id in ids.into_iter() { + result.push(( + id, + i.acquire_read_hold(id)?, + i.collection_write_frontier(id)?, + )); + } + Ok(result) + }) + .await? + } + + /// Issue a peek by calling into the instance task. + /// + /// If this returns an error, then it didn't modify any `Instance` state. + pub async fn peek( + &self, + peek_target: PeekTarget, + literal_constraints: Option>, + uuid: Uuid, + timestamp: T, + result_desc: RelationDesc, + finishing: RowSetFinishing, + map_filter_project: mz_expr::SafeMfpPlan, + target_read_hold: ReadHold, + target_replica: Option, + peek_response_tx: oneshot::Sender, + ) -> Result<(), PeekError> { + self.call_sync(move |i| { + i.peek( + peek_target, + literal_constraints, + uuid, + timestamp, + result_desc, + finishing, + map_filter_project, + target_read_hold, + target_replica, + peek_response_tx, + ) + }) + .await? + } +} diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 7b896cf2e23b0..f5c680777bd0d 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -32,7 +32,7 @@ use futures::future::BoxFuture; use mz_build_info::BuildInfo; use mz_cluster_client::metrics::ControllerMetrics; use mz_cluster_client::{ReplicaId, WallclockLagFn}; -use mz_compute_client::controller::error::CollectionLookupError; +use mz_compute_client::controller::error::{CollectionFrontiersError, CollectionMissing}; use mz_compute_client::controller::{ ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification, }; @@ -382,7 +382,7 @@ where &mut self, mut objects: BTreeSet, t: T, - ) -> Result { + ) -> Result { let ws_id = self.watch_set_id_gen.allocate_id(); // Collect all frontiers first, returning any errors @@ -425,7 +425,7 @@ where &mut self, mut objects: BTreeSet, t: T, - ) -> Result { + ) -> Result { let ws_id = self.watch_set_id_gen.allocate_id(); let uppers = self