Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
3d83281
Use a single instance of cfc object when traversing
ubik2 Oct 16, 2025
2f338e7
Rework the traverse methods to operate on am IExtendedStorageTransact…
ubik2 Oct 16, 2025
42df9c6
Remove sync flag from validateAndTransform, since it wasn't used.
ubik2 Oct 17, 2025
71637c4
Just adding WIP in case I forget tomorrow
ubik2 Oct 22, 2025
14645b4
- Temporarily remove a lot of type enforcement, since the returned ob…
ubik2 Oct 27, 2025
fde025e
Unified behavior for additionalProperties with behavior change based …
ubik2 Oct 27, 2025
2229578
Large cleanup pass on value as the first element of doc path.
ubik2 Oct 27, 2025
0fb2599
Semi-working state without default or anyOf
ubik2 Oct 28, 2025
ad9a013
Fix another path value mismatch where getAtPath used the link.path wi…
ubik2 Oct 28, 2025
c50a376
Return a QueryResultProxy if the link schema is true or undefined (in…
ubik2 Oct 28, 2025
8508e5f
Fix regression with asCell schema that are true-ish. These should sti…
ubik2 Oct 29, 2025
b6b4936
Handle undefined doc.value in traverseWithSchemaContext
ubik2 Oct 29, 2025
df0ec18
Update query tests to use the new "value" convention for their select…
ubik2 Oct 29, 2025
59dd84c
Updated traverse test to use the value in path
ubik2 Oct 29, 2025
6d2bba4
Added method to combine schema context
ubik2 Nov 3, 2025
303dc5e
Use our traverseCells flag to determine whether to include source cel…
ubik2 Nov 3, 2025
257d037
When traversing cell links, combine the schema from the data in the l…
ubik2 Nov 4, 2025
1d82fd0
Fix mergeSchemaFlags
ubik2 Nov 4, 2025
2e4d017
Updated test to reflect new additionalProperties/properties behavior
ubik2 Nov 4, 2025
07e93be
Improve combineSchema for primitives with flags
ubik2 Nov 4, 2025
ad668c4
more tests pass (applying property defaults)
ubik2 Nov 5, 2025
9afec1d
Handle prefixItems in ContextualFlowControl.schemaAtPath
ubik2 Nov 10, 2025
2828bd2
Removed a bunch of debugging
ubik2 Nov 13, 2025
c65b577
Fix a case where I was treating an asCell schema as undefined and cre…
ubik2 Nov 13, 2025
c641881
Better asCell/asStream detection
ubik2 Nov 13, 2025
d374278
Better array support in cfc schemaAtPath
ubik2 Nov 14, 2025
5b23f5a
Replicate the client behavior, where we follow the first link for ele…
ubik2 Nov 14, 2025
f6ad462
Leaving the existing validateAndTransform with the _ prefix, and addi…
ubik2 Nov 15, 2025
dd12bb8
Merge branch 'main' into robin/traverse-tx
ubik2 Nov 17, 2025
de778f2
Improve FIXME comment
ubik2 Nov 17, 2025
1f626d8
Fixed tests to use value in selector
ubik2 Nov 17, 2025
dec64c4
Committing changes to traverse to pass through top link. Previous cod…
ubik2 Nov 19, 2025
05fbf0c
Reverted change that carried links down across cells. We want to do t…
ubik2 Dec 1, 2025
fbff315
Minor fix to array links as DataCellURI objects
ubik2 Dec 1, 2025
d0e2d55
Added TODO comment in traverse.ts for future cleanup
ubik2 Dec 2, 2025
774cc8e
Update selector in traverse_timing_test.ts to include value in the pa…
ubik2 Dec 2, 2025
65e6a83
Remove console.log calls and some unused code
ubik2 Dec 2, 2025
5160cdf
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 3, 2025
abc7d62
Fix a case when we're combining link schema at the top level with the…
ubik2 Dec 4, 2025
b4788af
lowered log level of transactions to debug
ubik2 Dec 4, 2025
553a8ca
typos
ubik2 Dec 4, 2025
53b4587
Remove some unnecessary doc spread usage when we already replace the …
ubik2 Dec 4, 2025
78e8824
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 4, 2025
6b231fb
Added most of the plumbing to get cross space links working, though t…
ubik2 Dec 5, 2025
7373e5d
Added debug logging to traverse when we fail to match
ubik2 Dec 5, 2025
cb1b35f
Missed file
ubik2 Dec 5, 2025
5ad3040
Don't run actions if the argument is undefined. There are some ramifi…
ubik2 Dec 10, 2025
8e6ba8e
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 10, 2025
b68ab4d
schemaTracker keys contain space now, so add that
ubik2 Dec 10, 2025
7cf47b1
Tweak to traverse so that when merging defs, we don't add the $defs k…
ubik2 Dec 10, 2025
3559e68
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 11, 2025
b8be235
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 13, 2025
5c34479
Working integration of the sharedSchemaTracker changes into the trave…
ubik2 Dec 13, 2025
97dd32d
Change runner so that actions that don't care about their input (argu…
ubik2 Dec 15, 2025
ecbbf83
Change traverse to use the combination of the parent schema and the l…
ubik2 Dec 15, 2025
fbe3da3
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 15, 2025
7dd1b40
Fix lint error
ubik2 Dec 15, 2025
22438e9
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 16, 2025
bee74d7
Changes based on PR feedback
ubik2 Dec 16, 2025
6bc5508
If the argument to a handler is undefined (invalid schema), log it as…
ubik2 Dec 16, 2025
2a4b237
Minor tweak to list-operations pattern to eliminate the `.get()` sinc…
ubik2 Dec 17, 2025
7f0000a
Limited implementation of an anyOf merge. This isn't technically corr…
ubik2 Dec 17, 2025
b549ed1
Added mergeMatches function to IObjectCreator for integrating multipl…
ubik2 Dec 18, 2025
6b977f6
Change mentioned and backlinks fields to be optional, since these are…
ubik2 Dec 18, 2025
cd6d644
Altered counter-nested-computed-totals test
ubik2 Dec 18, 2025
d3bf3b6
Break out the mergeAnyOfMatches function, so I can keep the common pa…
ubik2 Dec 18, 2025
b951078
Merge branch 'main' into robin/traverse-tx
ubik2 Dec 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/charm/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ export const uiSchema = {

export type UISchema = Schema<typeof uiSchema>;

// We specify not true for the items, since we don't want to recursively load them
export const charmListSchema = {
type: "array",
items: { not: true, asCell: true },
default: [],
} as const satisfies JSONSchema;

export const charmLineageSchema = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { Cell, Default, handler, lift, recipe, str } from "commontools";

interface FallbackDefaultsArgs {
slots: Default<(number | undefined)[], []>;
slots: Default<(number | null)[], []>;
fallback: Default<number, 0>;
expectedLength: Default<number, 0>;
}
Expand Down Expand Up @@ -38,7 +38,7 @@ const updateSlot = handler(
(
event: SlotUpdateEvent | undefined,
context: {
slots: Cell<(number | undefined)[]>;
slots: Cell<(number | null)[]>;
fallback: Cell<number>;
expectedLength: Cell<number>;
},
Expand Down Expand Up @@ -76,10 +76,10 @@ const updateSlot = handler(
export const counterWithFallbackDefaults = recipe<FallbackDefaultsArgs>(
"Counter With Fallback Defaults",
({ slots, fallback, expectedLength }) => {
const normalizedFallback = lift((value: number | undefined) =>
const normalizedFallback = lift((value: number | null) =>
sanitizeNumber(value, 0)
)(fallback);
const normalizedExpected = lift((value: number | undefined) => {
const normalizedExpected = lift((value: number | null) => {
if (isFiniteNumber(value) && value >= 0) {
return Math.floor(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ export const counterNestedComputedTotalsScenario: PatternIntegrationScenario<
},
{
events: [
{ stream: "groups.0.append", payload: { value: Number.NaN } },
{ stream: "groups.0.append", payload: { value: Number.NaN } }, // fails
{ stream: "groups.0.append", payload: { value: 0 } },
{ stream: "appendToGroup", payload: { index: 9, value: 100 } },
],
expect: [
Expand Down
173 changes: 80 additions & 93 deletions packages/memory/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,13 @@ class MemoryProviderSession<
schemaChannels: Map<JobId, SchemaSubscription> = new Map();
// Mapping from fact key to since value of the last fact sent to the client
lastRevision: Map<string, number> = new Map();
// Shared schema tracker for all subscriptions - tracks which docs were scanned with which schemas
// Shared schema tracker for all subscriptions
// Tracks which docs were scanned with which schemas
// This serves as a watch list, to tell which documents we need to be
// notified when they change. It also serves as a cache to prevent us
// from re-running a query or part of a query when the underlying docs
// haven't changed. In this cache role, it lets us know that we already
// have the current information.
sharedSchemaTracker: MapSet<string, SchemaPathSelector> = new MapSet(
deepEqual,
);
Expand Down Expand Up @@ -386,7 +392,7 @@ class MemoryProviderSession<
});
}
case "/memory/transact": {
logger.info(
logger.debug(
"server-transact",
() => [
"Received transaction:",
Expand Down Expand Up @@ -586,10 +592,8 @@ class MemoryProviderSession<
private findAffectedDocsForWildcard<Space extends MemorySpace>(
changedDocs: Set<string>,
invocation: SchemaQuery<Space>,
): Array<{ docKey: string; schemas: Set<SchemaPathSelector> }> {
const affected: Array<
{ docKey: string; schemas: Set<SchemaPathSelector> }
> = [];
): Map<string, Set<SchemaPathSelector>> {
const affected = new Map<string, Set<SchemaPathSelector>>();
const selectSchema = invocation.args.selectSchema;

// Get the wildcard selector's type patterns
Expand All @@ -610,14 +614,14 @@ class MemoryProviderSession<

// Match changed docs against type patterns
for (const docKey of changedDocs) {
const slashIndex = docKey.indexOf("/");
if (slashIndex === -1) continue;
const docType = docKey.slice(slashIndex + 1);
const parsedKey = this.parseDocKey(docKey);
if (parsedKey === undefined) continue;
const { id: _, type } = parsedKey;

// Check if this type matches a wildcard pattern
const schemas = typeSchemas.get(docType) ?? typeSchemas.get("_");
const schemas = typeSchemas.get(type) ?? typeSchemas.get("_");
if (schemas && schemas.size > 0) {
affected.push({ docKey, schemas: new Set(schemas) });
affected.set(docKey, new Set(schemas));
}
}

Expand All @@ -643,7 +647,7 @@ class MemoryProviderSession<
}

// Extract changed document keys from transaction
const changedDocs = this.extractChangedDocKeys(transaction);
const changedDocs = this.extractChangedDocKeys(space, transaction);
if (changedDocs.size === 0) {
return [];
}
Expand All @@ -659,16 +663,18 @@ class MemoryProviderSession<
const spaceSession = mountResult.ok as unknown as SpaceSession<Space>;

// Find affected docs using the shared schemaTracker (for non-wildcard)
const sharedAffectedDocs = this.findAffectedDocs(
changedDocs,
this.sharedSchemaTracker,
);
const sharedAffectedDocs = new Map<string, Set<SchemaPathSelector>>();
for (const docKey of changedDocs) {
const existingSelectors = this.sharedSchemaTracker.get(docKey);
if (existingSelectors !== undefined) {
sharedAffectedDocs.set(docKey, new Set(existingSelectors));
}
}

// Process shared affected docs
const { newFacts } = this.processIncrementalUpdate(
spaceSession,
sharedAffectedDocs,
space,
);

// Add facts from wildcard subscriptions
Expand All @@ -678,11 +684,10 @@ class MemoryProviderSession<
changedDocs,
subscription.invocation,
);
if (wildcardDocs.length > 0) {
if (wildcardDocs.size > 0) {
const wildcardResult = this.processIncrementalUpdate(
spaceSession,
wildcardDocs,
space,
);
for (const [key, fact] of wildcardResult.newFacts) {
newFacts.set(key, fact);
Expand All @@ -698,110 +703,90 @@ class MemoryProviderSession<
* Extract document keys (id/type format) from a transaction's changes.
*/
private extractChangedDocKeys<Space extends MemorySpace>(
space: MemorySpace,
transaction: Transaction<Space>,
): Set<string> {
const changedDocs = new Set<string>();
for (const fact of SelectionBuilder.iterate(transaction.args.changes)) {
if (fact.value !== true) {
// Format matches what schemaTracker uses: "id/type" (from BaseObjectManager.toKey)
changedDocs.add(`${fact.of}/${fact.the}`);
changedDocs.add(`${space}/${fact.of}/${fact.the}`);
}
}
return changedDocs;
}

/**
* Find docs in changedDocs that are tracked by the subscription's schemaTracker.
* Returns list of (docKey, schemas) pairs.
*/
private findAffectedDocs(
changedDocs: Set<string>,
schemaTracker: MapSet<string, SchemaPathSelector>,
): Array<{ docKey: string; schemas: Set<SchemaPathSelector> }> {
const affected: Array<
{ docKey: string; schemas: Set<SchemaPathSelector> }
> = [];
for (const docKey of changedDocs) {
const schemas = schemaTracker.get(docKey);
if (schemas && schemas.size > 0) {
affected.push({ docKey, schemas: new Set(schemas) });
}
}
return affected;
}

/**
* Process incremental update given affected docs.
* Re-evaluates each affected doc with its schemas and follows new links.
* Uses the shared schemaTracker to track discovered links.
*/
private processIncrementalUpdate<Space extends MemorySpace>(
spaceSession: SpaceSession<Space>,
affectedDocs: Array<{ docKey: string; schemas: Set<SchemaPathSelector> }>,
space: Space,
affectedDocs: Map<string, Set<SchemaPathSelector>>,
): { newFacts: Map<string, Revision<Fact>> } {
const newFacts = new Map<string, Revision<Fact>>();
// Note: classification is not used here since we're processing across all subscriptions
// TODO(ubik2,seefeld): Make this a per-session classification
const classification = undefined;

// Collect all unique docKeys that need to be fetched
// Start with the initially affected docs
const docsToFetch = new Set<string>(affectedDocs.map((d) => d.docKey));

// First pass: Remove all affected (docKey, schema) pairs from schemaTracker.
// This allows the traverser to re-traverse them and discover any new links.
// We do this in a separate pass so that if doc A links to doc B, and both
// are affected, we don't re-traverse B while evaluating A only to remove
// and re-evaluate B again.
const toReEvaluate: Array<
{ docId: string; docType: string; schema: SchemaPathSelector }
> = [];
for (const { docKey, schemas } of affectedDocs) {
const { docId, docType } = this.parseDocKey(docKey);
if (docId === null) continue;

for (const schema of schemas) {
this.sharedSchemaTracker.deleteValue(docKey, schema);
toReEvaluate.push({ docId, docType, schema });
const classification = ["public", "secret"];

// Purge these docs from the tracker -- we want to re-evaluate the queries
const staleSchemaTracker = new Map<string, Set<SchemaPathSelector>>();
for (const [docKey, _schemaSelectors] of affectedDocs) {
const existingSchemas = this.sharedSchemaTracker.get(docKey);
if (existingSchemas !== undefined) {
this.sharedSchemaTracker.delete(docKey);
staleSchemaTracker.set(docKey, existingSchemas);
}
}

// Second pass: Re-evaluate each affected doc with its schema
// Check which docs we're watching that didn't just change, so we don't
// have to reload them to see if we should send them
const existingDocs = new Set<string>();
for (const [key, _value] of this.sharedSchemaTracker) {
existingDocs.add(key);
}

// Evaluate each affected doc with each of its schemas
// evaluateDocumentLinks does a full traversal and finds all linked documents
for (const { docId, docType, schema } of toReEvaluate) {
const result = evaluateDocumentLinks(
spaceSession,
{ id: docId, type: docType },
schema,
classification,
this.sharedSchemaTracker,
);
// Collect newly discovered docs to fetch
if (result !== null) {
for (const { docKey: newDocKey } of result.newLinks) {
docsToFetch.add(newDocKey);
}
for (const [docKey, schemaSelectors] of affectedDocs) {
const address = this.parseDocKey(docKey);
if (address === undefined) continue;
for (const schemaSelector of schemaSelectors) {
evaluateDocumentLinks(
spaceSession,
address,
schemaSelector,
classification,
this.sharedSchemaTracker,
);
}
}

// Fetch each unique doc once and add to results
for (const docKey of docsToFetch) {
const { docId, docType } = this.parseDocKey(docKey);
if (docId === null) continue;
for (const [docKey, _value] of this.sharedSchemaTracker) {
// Don't bother with anything we already tracked
if (existingDocs.has(docKey)) {
continue;
}

const address = this.parseDocKey(docKey);
if (address === undefined) continue;

const fact = selectFact(spaceSession, {
of: docId as `${string}:${string}`,
the: docType as `${string}/${string}`,
of: address.id,
the: address.type,
});

if (!fact || fact.is === undefined) {
// Document doesn't exist yet - skip
continue;
}

const address = this.formatAddress(space, fact);
newFacts.set(address, {
// The format of this key doesn't really matter
// this uses the watch:// format, but we could use the docKey
const factKey = this.formatAddress(spaceSession.subject, fact);
newFacts.set(factKey, {
of: fact.of,
the: fact.the,
cause: causeFromString(fact.cause),
Expand All @@ -813,19 +798,21 @@ class MemoryProviderSession<
return { newFacts };
}

/** Parse docKey (format "id/type") back to id and type */
/** Parse docKey (format "space/id/type") back to space, id, and type */
private parseDocKey(
docKey: string,
): { docId: string | null; docType: string } {
// Note: type can contain slashes (e.g., "application/json"), so we split on the FIRST slash
// The id is always in the form "of:HASH" which doesn't contain slashes
const slashIndex = docKey.indexOf("/");
if (slashIndex === -1) {
return { docId: null, docType: "" };
): { space: MemorySpace; id: Memory.URI; type: Memory.MIME } | undefined {
// Parse docKey back to space, id, and type (format is "space/id/type")
// Note: type can contain slashes (e.g., "application/json")
const pattern = new RegExp("([^/]+)/([^/]+)/(.+)");
const match = pattern.exec(docKey);
if (match === null) {
return undefined;
}
return {
docId: docKey.slice(0, slashIndex),
docType: docKey.slice(slashIndex + 1),
space: match[1] as MemorySpace,
id: match[2] as Memory.URI,
type: match[3] as Memory.MIME,
};
}

Expand Down
Loading
Loading