Skip to content

Commit 7a1cbdd

Browse files
seefeldbclaude
andcommitted
feat(cell): add pull() method for demand-driven value retrieval
Add Cell.pull() method that registers a temporary effect and waits for the scheduler to process it. This ensures all dependencies are computed before returning the value, providing consistent behavior across both push and pull scheduling modes. Key features: - Registers as effect with scheduleImmediately: true - Waits for scheduler.idle() before resolving - Unsubscribes after value is retrieved (one-shot) - Works in both push mode (equivalent to idle() + get()) and pull mode (triggers dependency computation) Use case: Replace `await idle(); cell.get()` pattern with cleaner `await cell.pull()` that works correctly in pull-based scheduling. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 0ef1114 commit 7a1cbdd

File tree

2 files changed

+172
-0
lines changed

2 files changed

+172
-0
lines changed

packages/runner/src/cell.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ declare module "@commontools/api" {
128128
withTx(tx?: IExtendedStorageTransaction): Cell<T>;
129129
sink(callback: (value: Readonly<T>) => Cancel | undefined | void): Cancel;
130130
sync(): Promise<Cell<T>> | Cell<T>;
131+
pull(): Promise<Readonly<T>>;
131132
getAsQueryResult<Path extends PropertyKey[]>(
132133
path?: Readonly<Path>,
133134
tx?: IExtendedStorageTransaction,
@@ -230,6 +231,7 @@ const cellMethods = new Set<keyof ICell<unknown>>([
230231
"withTx",
231232
"sink",
232233
"sync",
234+
"pull",
233235
"getAsQueryResult",
234236
"getAsNormalizedFullLink",
235237
"getAsLink",
@@ -553,6 +555,64 @@ export class CellImpl<T> implements ICell<T>, IStreamable<T> {
553555
);
554556
}
555557

558+
/**
559+
* Pull the cell's value, ensuring all dependencies are computed first.
560+
*
561+
* In pull-based scheduling mode, computations don't run automatically when
562+
* their inputs change - they only run when pulled by an effect. This method
563+
* registers a temporary effect that reads the cell's value, triggering the
564+
* scheduler to compute all transitive dependencies first.
565+
*
566+
* In push-based mode (the default), this is equivalent to `await idle()`
567+
* followed by `get()`, but ensures consistent behavior across both modes.
568+
*
569+
* Use this in tests or when you need to ensure a computed value is up-to-date
570+
* before reading it:
571+
*
572+
* ```ts
573+
* // Instead of:
574+
* await runtime.scheduler.idle();
575+
* const value = cell.get();
576+
*
577+
* // Use:
578+
* const value = await cell.pull();
579+
* ```
580+
*
581+
* @returns A promise that resolves to the cell's current value after all
582+
* dependencies have been computed.
583+
*/
584+
pull(): Promise<Readonly<T>> {
585+
if (!this.synced) this.sync(); // No await, just kicking this off
586+
587+
return new Promise((resolve) => {
588+
let result: Readonly<T>;
589+
let cancel: Cancel | undefined;
590+
591+
const action: Action = (tx) => {
592+
// Read the value inside the effect - this ensures dependencies are pulled
593+
result = validateAndTransform(this.runtime, tx, this.link, this.synced);
594+
};
595+
596+
// Run the action once to capture dependencies
597+
const tx = this.runtime.edit();
598+
action(tx);
599+
const log = txToReactivityLog(tx);
600+
tx.commit();
601+
602+
// Subscribe as an effect with scheduleImmediately so it runs in the next cycle
603+
cancel = this.runtime.scheduler.subscribe(action, log, {
604+
isEffect: true,
605+
scheduleImmediately: true,
606+
});
607+
608+
// Wait for the scheduler to process all pending work, then resolve
609+
this.runtime.scheduler.idle().then(() => {
610+
cancel?.();
611+
resolve(result);
612+
});
613+
});
614+
}
615+
556616
set(
557617
newValue: AnyCellWrapping<T> | T,
558618
onCommit?: (tx: IExtendedStorageTransaction) => void,

packages/runner/test/cell.test.ts

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4040,4 +4040,116 @@ describe("Cell success callbacks", () => {
40404040
expect(schemaCell.schema).toBeUndefined();
40414041
});
40424042
});
4043+
4044+
describe("pull()", () => {
4045+
it("should return the cell value in push mode", async () => {
4046+
const c = runtime.getCell<number>(space, "pull-test-1", undefined, tx);
4047+
c.set(42);
4048+
await tx.commit();
4049+
tx = runtime.edit();
4050+
4051+
const value = await c.pull();
4052+
expect(value).toBe(42);
4053+
});
4054+
4055+
it("should wait for dependent computations in push mode", async () => {
4056+
// Create a source cell
4057+
const source = runtime.getCell<number>(
4058+
space,
4059+
"pull-source",
4060+
undefined,
4061+
tx,
4062+
);
4063+
source.set(5);
4064+
await tx.commit();
4065+
tx = runtime.edit();
4066+
4067+
// Create a computation that depends on source
4068+
const computed = runtime.getCell<number>(
4069+
space,
4070+
"pull-computed",
4071+
undefined,
4072+
tx,
4073+
);
4074+
4075+
const action = (actionTx: IExtendedStorageTransaction) => {
4076+
const val = source.withTx(actionTx).get();
4077+
computed.withTx(actionTx).set(val * 2);
4078+
};
4079+
4080+
// Run once to set up initial value and log reads
4081+
const setupTx = runtime.edit();
4082+
action(setupTx);
4083+
const log = txToReactivityLog(setupTx);
4084+
await setupTx.commit();
4085+
4086+
// Subscribe the computation
4087+
runtime.scheduler.subscribe(action, log, { scheduleImmediately: true });
4088+
4089+
// Pull should wait for the computation to run
4090+
const value = await computed.pull();
4091+
expect(value).toBe(10);
4092+
});
4093+
4094+
it("should work in pull mode", async () => {
4095+
runtime.scheduler.enablePullMode();
4096+
4097+
// In pull mode, pull() works the same way - it registers as an effect
4098+
// and waits for the scheduler. The key difference is that pull() ensures
4099+
// the effect mechanism is used, which triggers pull-based execution.
4100+
const c = runtime.getCell<number>(space, "pull-mode-cell", undefined, tx);
4101+
c.set(42);
4102+
await tx.commit();
4103+
tx = runtime.edit();
4104+
4105+
const value = await c.pull();
4106+
expect(value).toBe(42);
4107+
4108+
// Verify we can pull after updates
4109+
const tx2 = runtime.edit();
4110+
c.withTx(tx2).set(100);
4111+
await tx2.commit();
4112+
4113+
const value2 = await c.pull();
4114+
expect(value2).toBe(100);
4115+
4116+
runtime.scheduler.disablePullMode();
4117+
});
4118+
4119+
it("should handle multiple sequential pulls", async () => {
4120+
const c = runtime.getCell<number>(space, "pull-multi", undefined, tx);
4121+
c.set(1);
4122+
await tx.commit();
4123+
4124+
expect(await c.pull()).toBe(1);
4125+
4126+
const tx2 = runtime.edit();
4127+
c.withTx(tx2).set(2);
4128+
await tx2.commit();
4129+
4130+
expect(await c.pull()).toBe(2);
4131+
4132+
const tx3 = runtime.edit();
4133+
c.withTx(tx3).set(3);
4134+
await tx3.commit();
4135+
4136+
expect(await c.pull()).toBe(3);
4137+
});
4138+
4139+
it("should pull nested cell values", async () => {
4140+
const c = runtime.getCell<{ a: { b: number } }>(
4141+
space,
4142+
"pull-nested",
4143+
undefined,
4144+
tx,
4145+
);
4146+
c.set({ a: { b: 99 } });
4147+
await tx.commit();
4148+
tx = runtime.edit();
4149+
4150+
const nested = c.key("a").key("b");
4151+
const value = await nested.pull();
4152+
expect(value).toBe(99);
4153+
});
4154+
});
40434155
});

0 commit comments

Comments
 (0)