Skip to content
37 changes: 37 additions & 0 deletions docs/future-tasks/doc-level-subscriptions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Doc Level Subscriptions

## Problem

Our current schema query system can be quite slow to evaluate, leading to slow round trips from the client, which can result in unneccessary conflicts.

While we can issue guidance to avoid using schemas that do span large amounts of the space, building useful apps sometimes requires them. In any case, developers are likely to accidentally include much more content than needed, and we should still perform reasonably there, even if we don't perform well.

## Current Implementation

The client sends a query/subscribe command with a set of documents, and for each document, the path and schema (this combination is a SchemaPathSelector) to be used for that document.
We then run a schema query using that information, and return the set of documents we used to traverse the schema. This ensures that a client with the same set of documents will be have all the linked documents needed based on the specified schema.

We also maintain a set of watched documents, and add our query to the set of queries that should be re-run if one of those documents is changed. This means we don't need to re-run a query when an unrelated doc changes. When one of the documents in a transaction does match, we re-run the query, and update our set of watched documents when we're done.

As an implementation detail, we also maintain a structure of per-document subscriptions that are created while evaluating the initial subscription. This means that if we traverse back into the same document multiple times with the same SchemaPathSelector, we can skip evaluating it again.

## Change Suggestions

The per-document subscription tracking that we use when evaluating a single query could be maintained across queries, and across time.
* Across Queries - multiple subscriptions can each result in the same SchemaPathSelector on a document. Right now, each of those would re-run that portion of the query.
* Across Time - modifying one of the documents that was a result of our initial query may alter part of the query, but for most of the resulting documents, there is no change.

In this model, when any of the docs for which we have already evaluated the query is altered, we mark the SchemaPathSelector associated with the changed docs stale (removing them from the cache), and re-evaluate only those. Often, evaluating those queries won't have to traverse many documents, since they will typically result in the same SchemaPathSelector on linked documents that we already have in our cache.

## Complications
### Watch List and SchemaTracker
The watch list *should* change when we re-evaluate a query. While it's trivial to add the new results, it's difficult to determine whether existing results should still be flagged as included. This is also true for our SchemaPathSelector tracking. A change to document A can mean that we no longer have a SchemaPathSelector for document B, but it isn't obvious that the selector we have for document B was caused by this selector on document A, or even whether it was caused by this subscription at all.

While I can maintain reference counts and links, I may try to see what happens if I we just ignore this for now, and allow the client to get updates for documents that no longer match their query.

### AnyOf
When using the `anyOf` schema property, the matching for a linked document may depend on properties of the containing document. Without the cache, this is ok, because we're always generating document B's SchemaPathSelector for the cache based on document A's value.

I plan to ignore this one. For example, if your `country` changes to one without states, the linked `state` would still be returned to the client.

This should probably be part of our defined behavior, and not just an implementation quirk.
10 changes: 8 additions & 2 deletions packages/memory/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ export type Brief<Space extends MemorySpace = MemorySpace> = {
meta?: Meta;
};

export interface Session<Space extends MemorySpace = MemorySpace> {
interface Session<Space extends MemorySpace = MemorySpace> {
/**
* Transacts can be used to assert or retract a document from the repository.
* If `version` asserted / retracted does not match version of the document
Expand All @@ -463,8 +463,10 @@ export interface Session<Space extends MemorySpace = MemorySpace> {
close(): CloseResult;
}

// Re-declares the Session interface as sync instead of async, and adds the
// subject field
export interface SpaceSession<Space extends MemorySpace = MemorySpace>
extends Session {
extends Session<Space> {
subject: Space;

transact(
Expand All @@ -474,13 +476,17 @@ export interface SpaceSession<Space extends MemorySpace = MemorySpace>
close(): Result<Unit, SystemError>;
}

// Adds the subscribe/unsubscribe interface to Session
// Adds the serviceDID, which is essentially the same as SpaceSession's
// subject.
export interface MemorySession<Space extends MemorySpace = MemorySpace>
extends Session<Space> {
subscribe(subscriber: Subscriber<Space>): SubscribeResult;
unsubscribe(subscriber: Subscriber<Space>): SubscribeResult;
serviceDid(): DID;
}

// Interface for the subscriber's scallback
export interface Subscriber<Space extends MemorySpace = MemorySpace> {
// Notifies a subscriber of a commit that has been applied
commit(commit: Commit<Space>): AwaitResult<Unit, SystemError>;
Expand Down
61 changes: 38 additions & 23 deletions packages/memory/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
AsyncResult,
ConnectionError,
MemorySession,
MemorySpace as Subject,
Query,
QueryResult,
Result,
Expand All @@ -27,21 +26,25 @@ import {
export * from "./interface.ts";
import { type DID } from "@commontools/identity";

interface Session {
interface MemorySessionState {
store: URL;
subscribers: Set<Subscriber>;
spaces: Map<string, SpaceSession>;
spaces: Map<DID, SpaceSession>;
}

export class Memory implements Session, MemorySession {
// There's some confusion in this class whether it wants to handle multiple
// spaces. The presence of the serviceDid implies there will be one Memory
// per space, but the map of spaces implies there could be multiple.
// Currently, there is one Memory per DID (a.k.a. MemorySpace).
export class Memory implements MemorySessionState, MemorySession {
store: URL;
ready: Promise<unknown>;
#serviceDid: DID;

constructor(
options: Options,
public subscribers: Set<Subscriber> = new Set(),
public spaces: Map<Subject, SpaceSession> = new Map(),
public spaces: Map<DID, SpaceSession> = new Map(),
) {
this.store = options.store;
this.ready = Promise.resolve();
Expand Down Expand Up @@ -107,11 +110,14 @@ export class Memory implements Session, MemorySession {
* @param subscriber - The subscriber to add
* @returns Success result
*/
export const subscribe = (session: Session, subscriber: Subscriber) => {
const subscribe = (
{ subscribers }: MemorySessionState,
subscriber: Subscriber,
) => {
return traceSync("memory.subscribe", (span) => {
addMemoryAttributes(span, { operation: "subscribe" });
session.subscribers.add(subscriber);
span.setAttribute("memory.subscriber_count", session.subscribers.size);
subscribers.add(subscriber);
span.setAttribute("memory.subscriber_count", subscribers.size);
return { ok: {} };
});
};
Expand All @@ -122,16 +128,19 @@ export const subscribe = (session: Session, subscriber: Subscriber) => {
* @param subscriber - The subscriber to remove
* @returns Success result
*/
export const unsubscribe = (session: Session, subscriber: Subscriber) => {
const unsubscribe = (
{ subscribers }: MemorySessionState,
subscriber: Subscriber,
) => {
return traceSync("memory.unsubscribe", (span) => {
addMemoryAttributes(span, { operation: "unsubscribe" });
session.subscribers.delete(subscriber);
span.setAttribute("memory.subscriber_count", session.subscribers.size);
subscribers.delete(subscriber);
span.setAttribute("memory.subscriber_count", subscribers.size);
return { ok: {} };
});
};

export const query = async (session: Session, query: Query) => {
const query = async (session: MemorySessionState, query: Query) => {
return await traceAsync("memory.query", async (span) => {
addMemoryAttributes(span, {
operation: "query",
Expand All @@ -149,7 +158,10 @@ export const query = async (session: Session, query: Query) => {
});
};

export const querySchema = async (session: Session, query: SchemaQuery) => {
export const querySchema = async (
session: MemorySessionState,
query: SchemaQuery,
) => {
return await traceAsync("memory.querySchema", async (span) => {
addMemoryAttributes(span, {
operation: "querySchema",
Expand All @@ -167,7 +179,10 @@ export const querySchema = async (session: Session, query: SchemaQuery) => {
});
};

export const transact = async (session: Session, transaction: Transaction) => {
const transact = async (
session: MemorySessionState,
transaction: Transaction,
) => {
return await traceAsync("memory.transact", async (span) => {
addMemoryAttributes(span, {
operation: "transact",
Expand All @@ -185,7 +200,7 @@ export const transact = async (session: Session, transaction: Transaction) => {
}

span.setAttribute("mount.status", "success");
const result = space.transact(transaction);
const result = await space.transact(transaction);

if (result.error) {
return result;
Expand All @@ -212,34 +227,34 @@ export const transact = async (session: Session, transaction: Transaction) => {
});
};

export const mount = async (
session: Session,
subject: Subject,
const mount = async (
{ spaces, store }: MemorySessionState,
subject: DID,
): Promise<Result<SpaceSession, ConnectionError>> => {
return await traceAsync("memory.mount", async (span) => {
addMemoryAttributes(span, {
operation: "mount",
space: subject,
});

const space = session.spaces.get(subject);
const space = spaces.get(subject);
if (space) {
span.setAttribute("memory.mount.cache", "hit");
return { ok: space };
} else {
span.setAttribute("memory.mount.cache", "miss");

const result = await Space.open({
url: new URL(`./${subject}.sqlite`, session.store),
url: new URL(`./${subject}.sqlite`, store),
});

if (result.error) {
return result;
}

const replica = result.ok as SpaceSession;
session.spaces.set(subject, replica);
span.setAttribute("memory.spaces_count", session.spaces.size);
spaces.set(subject, replica);
span.setAttribute("memory.spaces_count", spaces.size);
return { ok: replica };
}
});
Expand Down Expand Up @@ -281,7 +296,7 @@ export const emulate = (options: ServiceOptions) =>
store: new URL("memory://"),
});

export const close = async (session: Session) => {
export const close = async (session: MemorySessionState) => {
return await traceAsync("memory.close", async (span) => {
addMemoryAttributes(span, { operation: "close" });
span.setAttribute("memory.spaces_count", session.spaces.size);
Expand Down
15 changes: 10 additions & 5 deletions packages/memory/space-schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
ContextualFlowControl,
deepEqual,
type JSONObject,
type JSONValue,
Expand Down Expand Up @@ -48,7 +49,7 @@ import {
type SelectedFact,
selectFact,
selectFacts,
type Session,
type SpaceStoreSession,
toSelection,
} from "./space.ts";

Expand Down Expand Up @@ -77,7 +78,7 @@ export class ServerObjectManager extends BaseObjectManager<
private restrictedValues = new Set<string>();

constructor(
private session: Session<MemorySpace>,
private session: SpaceStoreSession<MemorySpace>,
private providedClassifications: Set<string>,
) {
super();
Expand Down Expand Up @@ -150,7 +151,7 @@ export class ServerObjectManager extends BaseObjectManager<
}

export const selectSchema = <Space extends MemorySpace>(
session: Session<Space>,
session: SpaceStoreSession<Space>,
{ selectSchema, since, classification }: SchemaQuery["args"],
): FactSelection => {
const startTime = performance.timeOrigin + performance.now();
Expand All @@ -163,6 +164,7 @@ export const selectSchema = <Space extends MemorySpace>(
Immutable<JSONValue>,
SchemaContext | undefined
>();
const cfc = new ContextualFlowControl();
const schemaTracker = new MapSet<string, SchemaPathSelector>(deepEqual);

const includedFacts: FactSelection = {}; // we'll store all the raw facts we accesed here
Expand All @@ -188,6 +190,7 @@ export const selectSchema = <Space extends MemorySpace>(
entry,
selectorEntry.value,
tracker,
cfc,
schemaTracker,
);

Expand Down Expand Up @@ -253,6 +256,7 @@ function loadFactsForDoc(
fact: IAttestation,
selector: SchemaPathSelector,
tracker: PointerCycleTracker,
cfc: ContextualFlowControl,
schemaTracker: MapSet<string, SchemaPathSelector>,
) {
if (isObject(fact.value)) {
Expand All @@ -266,6 +270,7 @@ function loadFactsForDoc(
factValue,
selector.path,
tracker,
cfc,
schemaTracker,
selector,
);
Expand Down Expand Up @@ -294,7 +299,7 @@ function loadFactsForDoc(

const redactCommits = <Space extends MemorySpace>(
includedFacts: FactSelection,
session: Session<Space>,
session: SpaceStoreSession<Space>,
) => {
const change = getChange(includedFacts, session.subject, COMMIT_LOG_TYPE);
if (change !== undefined) {
Expand Down Expand Up @@ -344,7 +349,7 @@ function addToSelection(

// Get the ValueEntry objects for the facts that match our selector
function getMatchingFacts<Space extends MemorySpace>(
session: Session<Space>,
session: SpaceStoreSession<Space>,
factSelector: FactSelector,
): Iterable<IAttestation & { cause: CauseString; since: number }> {
const results = [];
Expand Down
Loading