Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pub enum Command {
instance_id: ComputeInstanceId,
tx: oneshot::Sender<
Result<
mz_compute_client::controller::instance::Client<mz_repr::Timestamp>,
mz_compute_client::controller::instance_client::InstanceClient<mz_repr::Timestamp>,
mz_compute_client::controller::error::InstanceMissing,
>,
>,
Expand Down
7 changes: 3 additions & 4 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ use mz_catalog::memory::objects::{
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
use mz_compute_client::controller::error::{
CollectionLookupError, DataflowCreationError, InstanceMissing,
CollectionFrontiersError, CollectionMissing, DataflowCreationError, InstanceMissing,
};
use mz_compute_types::ComputeInstanceId;
use mz_compute_types::dataflows::DataflowDescription;
Expand Down Expand Up @@ -3814,7 +3814,7 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) -> Result<(), CollectionLookupError> {
) -> Result<(), CollectionFrontiersError> {
let ws_id = self.controller.install_compute_watch_set(objects, t)?;
self.connection_watch_sets
.entry(conn_id.clone())
Expand All @@ -3823,7 +3823,6 @@ impl Coordinator {
self.installed_watch_sets.insert(ws_id, (conn_id, state));
Ok(())
}

/// Install a _watch set_ in the controller that is automatically associated with the given
/// connection id. The watchset will be automatically cleared if the connection terminates
/// before the watchset completes.
Expand All @@ -3833,7 +3832,7 @@ impl Coordinator {
objects: BTreeSet<GlobalId>,
t: Timestamp,
state: WatchSetResponse,
) -> Result<(), CollectionLookupError> {
) -> Result<(), CollectionMissing> {
let ws_id = self.controller.install_storage_watch_set(objects, t)?;
self.connection_watch_sets
.entry(conn_id.clone())
Expand Down
4 changes: 1 addition & 3 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,9 +1927,7 @@ impl Coordinator {
if let Some(ws) = watch_set {
if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
let _ = tx.send(Err(
AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
e, cluster_id,
),
AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e),
));
return;
}
Expand Down
10 changes: 2 additions & 8 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,10 +1222,7 @@ impl crate::coord::Coordinator {
if let Some(ws) = watch_set {
self.install_peek_watch_sets(conn_id.clone(), ws)
.map_err(|e| {
AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
e,
compute_instance,
)
AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
})?;
}

Expand Down Expand Up @@ -1289,10 +1286,7 @@ impl crate::coord::Coordinator {
// for the error case (no ExecuteContextExtra is created here).
if let Some(ws) = watch_set {
if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
e,
compute_instance,
);
let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
send_err(tx, err);
return;
}
Expand Down
32 changes: 30 additions & 2 deletions src/adapter/src/coord/statement_logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use mz_adapter_types::connection::ConnectionId;
use mz_compute_client::controller::error::CollectionLookupError;
use mz_compute_client::controller::error::{CollectionFrontiersError, CollectionMissing};
use mz_compute_types::ComputeInstanceId;
use mz_controller_types::ClusterId;
use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
use mz_ore::task::spawn;
Expand All @@ -25,6 +26,7 @@ use mz_storage_client::controller::IntrospectionType;
use qcell::QCell;
use rand::SeedableRng;
use sha2::{Digest, Sha256};
use thiserror::Error;
use tokio::time::MissedTickBehavior;
use uuid::Uuid;

Expand Down Expand Up @@ -775,7 +777,7 @@ impl Coordinator {
&mut self,
conn_id: ConnectionId,
watch_set: WatchSetCreation,
) -> Result<(), CollectionLookupError> {
) -> Result<(), WatchSetInstallError> {
let WatchSetCreation {
logging_id,
timestamp,
Expand Down Expand Up @@ -804,3 +806,29 @@ impl Coordinator {
Ok(())
}
}

/// Errors arising during peek watch set installation.
#[derive(Error, Debug)]
pub enum WatchSetInstallError {
/// The specified compute instance does not exist.
#[error("instance does not exist: {0}")]
InstanceMissing(ComputeInstanceId),
/// The collection does not exist.
#[error("collection does not exist: {0}")]
CollectionMissing(GlobalId),
}

impl From<CollectionFrontiersError> for WatchSetInstallError {
fn from(error: CollectionFrontiersError) -> Self {
match error {
CollectionFrontiersError::InstanceMissing(id) => Self::InstanceMissing(id),
CollectionFrontiersError::CollectionMissing(id) => Self::CollectionMissing(id),
}
}
}

impl From<CollectionMissing> for WatchSetInstallError {
fn from(error: CollectionMissing) -> Self {
Self::CollectionMissing(error.0)
}
}
26 changes: 23 additions & 3 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use dec::TryFromDecimalError;
use itertools::Itertools;
use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
use mz_compute_client::controller::error as compute_error;
use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
use mz_compute_client::controller::error::InstanceMissing;

use mz_compute_types::ComputeInstanceId;
use mz_expr::EvalError;
use mz_ore::error::ErrorExt;
Expand All @@ -38,7 +39,9 @@ use tokio::sync::oneshot;
use tokio_postgres::error::SqlState;

use crate::coord::NetworkPolicyError;
use crate::coord::statement_logging::WatchSetInstallError;
use crate::optimize::OptimizerError;
use crate::peek_client::CollectionLookupError;

/// Errors that can occur in the coordinator.
#[derive(Debug)]
Expand Down Expand Up @@ -695,12 +698,14 @@ impl AdapterError {
// is appropriate, so we want to make the conversion target explicit at the call site.
// For example, maybe we get an `InstanceMissing` if the user specifies a non-existing cluster,
// in which case `ConcurrentDependencyDrop` would not be appropriate.

pub fn concurrent_dependency_drop_from_instance_missing(e: InstanceMissing) -> Self {
AdapterError::ConcurrentDependencyDrop {
dependency_kind: "cluster",
dependency_id: e.0.to_string(),
}
}

pub fn concurrent_dependency_drop_from_collection_missing(e: CollectionMissing) -> Self {
AdapterError::ConcurrentDependencyDrop {
dependency_kind: "collection",
Expand Down Expand Up @@ -730,11 +735,26 @@ impl AdapterError {
}
}

pub fn concurrent_dependency_drop_from_watch_set_install_error(
e: WatchSetInstallError,
) -> Self {
match e {
WatchSetInstallError::InstanceMissing(id) => AdapterError::ConcurrentDependencyDrop {
dependency_kind: "cluster",
dependency_id: id.to_string(),
},
WatchSetInstallError::CollectionMissing(id) => AdapterError::ConcurrentDependencyDrop {
dependency_kind: "collection",
dependency_id: id.to_string(),
},
}
}

pub fn concurrent_dependency_drop_from_instance_peek_error(
e: mz_compute_client::controller::instance::PeekError,
e: mz_compute_client::controller::instance_client::PeekError,
compute_instance: ComputeInstanceId,
) -> AdapterError {
use mz_compute_client::controller::instance::PeekError::*;
use mz_compute_client::controller::instance_client::PeekError::*;
match e {
ReplicaMissing(id) => AdapterError::ConcurrentDependencyDrop {
dependency_kind: "replica",
Expand Down
52 changes: 48 additions & 4 deletions src/adapter/src/peek_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use differential_dataflow::consolidation::consolidate;
use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing};
use mz_compute_client::controller::instance_client::InstanceClient;
use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown};
use mz_compute_client::protocol::command::PeekTarget;
use mz_compute_types::ComputeInstanceId;
use mz_expr::row::RowCollection;
use mz_ore::cast::CastFrom;
use mz_persist_client::PersistClient;
use mz_repr::GlobalId;
use mz_repr::Timestamp;
use mz_repr::global_id::TransientIdGen;
use mz_repr::{RelationDesc, Row};
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_storage_types::sources::Timeline;
use mz_timestamp_oracle::TimestampOracle;
use prometheus::Histogram;
use thiserror::Error;
use timely::progress::Antichain;
use tokio::sync::oneshot;
use uuid::Uuid;
Expand Down Expand Up @@ -54,8 +58,7 @@ pub struct PeekClient {
/// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
/// if a long-running user session keeps peeking on clusters that are being created and dropped
/// in a hot loop. Hopefully this won't occur any time soon.
compute_instances:
BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
compute_instances: BTreeMap<ComputeInstanceId, InstanceClient<Timestamp>>,
/// Handle to storage collections for reading frontiers and policies.
pub storage_collections: StorageCollectionsHandle,
/// A generator for transient `GlobalId`s, shared with Coordinator.
Expand Down Expand Up @@ -93,7 +96,7 @@ impl PeekClient {
pub async fn ensure_compute_instance_client(
&mut self,
compute_instance: ComputeInstanceId,
) -> Result<mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing> {
) -> Result<InstanceClient<Timestamp>, InstanceMissing> {
if !self.compute_instances.contains_key(&compute_instance) {
let client = self
.call_coordinator(|tx| Command::GetComputeInstanceClient {
Expand Down Expand Up @@ -501,3 +504,44 @@ impl PeekClient {
));
}
}

/// Errors arising during collection lookup in peek client operations.
#[derive(Error, Debug)]
pub enum CollectionLookupError {
/// The specified compute instance does not exist.
#[error("instance does not exist: {0}")]
InstanceMissing(ComputeInstanceId),
/// The specified compute instance has shut down.
#[error("the instance has shut down")]
InstanceShutDown,
/// The compute collection does not exist.
#[error("collection does not exist: {0}")]
CollectionMissing(GlobalId),
}

impl From<InstanceMissing> for CollectionLookupError {
fn from(error: InstanceMissing) -> Self {
Self::InstanceMissing(error.0)
}
}

impl From<InstanceShutDown> for CollectionLookupError {
fn from(_error: InstanceShutDown) -> Self {
Self::InstanceShutDown
}
}

impl From<CollectionMissing> for CollectionLookupError {
fn from(error: CollectionMissing) -> Self {
Self::CollectionMissing(error.0)
}
}

impl From<AcquireReadHoldsError> for CollectionLookupError {
fn from(error: AcquireReadHoldsError) -> Self {
match error {
AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id),
AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown,
}
}
}
18 changes: 10 additions & 8 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,24 +67,26 @@ use tokio::time::{self, MissedTickBehavior};
use uuid::Uuid;

use crate::controller::error::{
CollectionLookupError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
CollectionFrontiersError, CollectionMissing, CollectionUpdateError, DataflowCreationError,
HydrationCheckBadTarget, InstanceExists, InstanceMissing, PeekError, ReadPolicyError,
ReplicaCreationError, ReplicaDropError,
};
use crate::controller::instance::{Instance, SharedCollectionState};
use crate::controller::instance_client::InstanceClient;
use crate::controller::introspection::{IntrospectionUpdates, spawn_introspection_sink};
use crate::controller::replica::ReplicaConfig;
use crate::logging::{LogVariant, LoggingConfig};
use crate::metrics::ComputeControllerMetrics;
use crate::protocol::command::{ComputeParameters, PeekTarget};
use crate::protocol::response::{PeekResponse, SubscribeBatch};

mod instance;
mod introspection;
mod replica;
mod sequential_hydration;

pub mod error;
pub mod instance;
pub mod instance_client;

pub(crate) type StorageCollections<T> = Arc<
dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T> + Send + Sync,
Expand Down Expand Up @@ -337,11 +339,11 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
self.instances.get(&id).ok_or(InstanceMissing(id))
}

/// Return an `instance::Client` for the indicated compute instance.
/// Return an `instance_client::InstanceClient` for the indicated compute instance.
pub fn instance_client(
&self,
id: ComputeInstanceId,
) -> Result<instance::Client<T>, InstanceMissing> {
) -> Result<InstanceClient<T>, InstanceMissing> {
self.instance(id).map(|instance| instance.client.clone())
}

Expand Down Expand Up @@ -371,7 +373,7 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
&self,
collection_id: GlobalId,
instance_id: Option<ComputeInstanceId>,
) -> Result<CollectionFrontiers<T>, CollectionLookupError> {
) -> Result<CollectionFrontiers<T>, CollectionFrontiersError> {
let collection = match instance_id {
Some(id) => self.instance(id)?.collection(collection_id)?,
None => self
Expand Down Expand Up @@ -527,7 +529,7 @@ where
logs.push((log, id, shared));
}

let client = instance::Client::spawn(
let client = InstanceClient::spawn(
id,
self.build_info,
Arc::clone(&self.storage_collections),
Expand Down Expand Up @@ -1067,13 +1069,13 @@ where

#[derive(Debug)]
struct InstanceState<T: ComputeControllerTimestamp> {
client: instance::Client<T>,
client: InstanceClient<T>,
replicas: BTreeSet<ReplicaId>,
collections: BTreeMap<GlobalId, Collection<T>>,
}

impl<T: ComputeControllerTimestamp> InstanceState<T> {
fn new(client: instance::Client<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
fn new(client: InstanceClient<T>, collections: BTreeMap<GlobalId, Collection<T>>) -> Self {
Self {
client,
replicas: Default::default(),
Expand Down
Loading