Skip to content

Commit e3177ab

Browse files
committed
Fix pull scheduler dirty propagation and dependents
1 parent b3c16e7 commit e3177ab

File tree

2 files changed

+254
-0
lines changed

2 files changed

+254
-0
lines changed

packages/runner/src/scheduler.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ export class Scheduler {
115115
private effects = new Set<Action>();
116116
private computations = new Set<Action>();
117117
private dependents = new WeakMap<Action, Set<Action>>();
118+
private reverseDependencies = new WeakMap<Action, Set<Action>>();
118119
// Track which actions are effects persistently (survives unsubscribe/re-subscribe)
119120
private isEffectAction = new WeakMap<Action, boolean>();
120121
private dirty = new Set<Action>();
@@ -319,6 +320,18 @@ export class Scheduler {
319320
this.cancels.delete(action);
320321
this.dependencies.delete(action);
321322
this.pending.delete(action);
323+
const dependencies = this.reverseDependencies.get(action);
324+
if (dependencies) {
325+
for (const dependency of dependencies) {
326+
const dependents = this.dependents.get(dependency);
327+
dependents?.delete(action);
328+
if (dependents && dependents.size === 0) {
329+
this.dependents.delete(dependency);
330+
}
331+
}
332+
this.reverseDependencies.delete(action);
333+
}
334+
this.dependents.delete(action);
322335
// Clean up effect/computation tracking
323336
this.effects.delete(action);
324337
this.computations.delete(action);
@@ -632,7 +645,20 @@ export class Scheduler {
632645
* For each action that writes to paths this action reads, add this action as a dependent.
633646
*/
634647
private updateDependents(action: Action, log: ReactivityLog): void {
648+
const previousDependencies = this.reverseDependencies.get(action);
649+
if (previousDependencies) {
650+
for (const dependency of previousDependencies) {
651+
const dependents = this.dependents.get(dependency);
652+
dependents?.delete(action);
653+
if (dependents && dependents.size === 0) {
654+
this.dependents.delete(dependency);
655+
}
656+
}
657+
this.reverseDependencies.delete(action);
658+
}
659+
635660
const reads = log.reads;
661+
const newDependencies = new Set<Action>();
636662

637663
// For each read of the new action, find other actions that write to it
638664
for (const read of reads) {
@@ -658,11 +684,16 @@ export class Scheduler {
658684
this.dependents.set(otherAction, deps);
659685
}
660686
deps.add(action);
687+
newDependencies.add(otherAction);
661688
}
662689
}
663690
}
664691
}
665692
}
693+
694+
if (newDependencies.size > 0) {
695+
this.reverseDependencies.set(action, newDependencies);
696+
}
666697
}
667698

668699
/**
@@ -734,6 +765,8 @@ export class Scheduler {
734765
if (this.dirty.has(action)) return; // Already dirty, avoid infinite recursion
735766

736767
this.dirty.add(action);
768+
// Treat dirty computations as triggered so they bypass filtering
769+
this.pushTriggered.add(action);
737770

738771
// Propagate to dependents transitively
739772
const deps = this.dependents.get(action);
@@ -1106,6 +1139,7 @@ export class Scheduler {
11061139
findEffects(computation);
11071140

11081141
for (const effect of toSchedule) {
1142+
this.pushTriggered.add(effect);
11091143
this.scheduleWithDebounce(effect);
11101144
}
11111145
}
@@ -1386,6 +1420,9 @@ export class Scheduler {
13861420

13871421
// In pull mode, filter based on pushTriggered
13881422
if (this.pullMode) {
1423+
if (this.dirty.has(action)) {
1424+
return false;
1425+
}
13891426
// If not triggered by actual storage changes, filter it out
13901427
if (!this.pushTriggered.has(action)) {
13911428
return true;

packages/runner/test/scheduler.test.ts

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1715,6 +1715,223 @@ describe("pull-based scheduling", () => {
17151715
expect(effectRuns).toBeGreaterThan(initialEffectRuns);
17161716
});
17171717

1718+
it("should recompute multi-hop chains before running effects in pull mode", async () => {
1719+
runtime.scheduler.enablePullMode();
1720+
1721+
const source = runtime.getCell<number>(
1722+
space,
1723+
"pull-multihop-source",
1724+
undefined,
1725+
tx,
1726+
);
1727+
source.set(1);
1728+
const intermediate1 = runtime.getCell<number>(
1729+
space,
1730+
"pull-multihop-mid-1",
1731+
undefined,
1732+
tx,
1733+
);
1734+
intermediate1.set(0);
1735+
const intermediate2 = runtime.getCell<number>(
1736+
space,
1737+
"pull-multihop-mid-2",
1738+
undefined,
1739+
tx,
1740+
);
1741+
intermediate2.set(0);
1742+
const effectResult = runtime.getCell<number>(
1743+
space,
1744+
"pull-multihop-effect",
1745+
undefined,
1746+
tx,
1747+
);
1748+
effectResult.set(0);
1749+
await tx.commit();
1750+
tx = runtime.edit();
1751+
1752+
let comp1Runs = 0;
1753+
let comp2Runs = 0;
1754+
let effectRuns = 0;
1755+
1756+
const computation1: Action = (actionTx) => {
1757+
comp1Runs++;
1758+
const val = source.withTx(actionTx).get();
1759+
intermediate1.withTx(actionTx).send(val + 1);
1760+
};
1761+
1762+
const computation2: Action = (actionTx) => {
1763+
comp2Runs++;
1764+
const val = intermediate1.withTx(actionTx).get();
1765+
intermediate2.withTx(actionTx).send(val * 2);
1766+
};
1767+
1768+
const effect: Action = (actionTx) => {
1769+
effectRuns++;
1770+
const val = intermediate2.withTx(actionTx).get();
1771+
effectResult.withTx(actionTx).send(val - 3);
1772+
};
1773+
1774+
runtime.scheduler.subscribe(
1775+
computation1,
1776+
{
1777+
reads: [source.getAsNormalizedFullLink()],
1778+
writes: [intermediate1.getAsNormalizedFullLink()],
1779+
},
1780+
{ scheduleImmediately: true },
1781+
);
1782+
await runtime.idle();
1783+
1784+
runtime.scheduler.subscribe(
1785+
computation2,
1786+
{
1787+
reads: [intermediate1.getAsNormalizedFullLink()],
1788+
writes: [intermediate2.getAsNormalizedFullLink()],
1789+
},
1790+
{ scheduleImmediately: true },
1791+
);
1792+
await runtime.idle();
1793+
1794+
runtime.scheduler.subscribe(
1795+
effect,
1796+
{
1797+
reads: [intermediate2.getAsNormalizedFullLink()],
1798+
writes: [effectResult.getAsNormalizedFullLink()],
1799+
},
1800+
{ scheduleImmediately: true, isEffect: true },
1801+
);
1802+
await runtime.idle();
1803+
1804+
expect(effectResult.get()).toBe((1 + 1) * 2 - 3);
1805+
expect(comp2Runs).toBe(1);
1806+
expect(effectRuns).toBe(1);
1807+
1808+
const tx2 = runtime.edit();
1809+
source.withTx(tx2).send(5);
1810+
await tx2.commit();
1811+
tx = runtime.edit();
1812+
await runtime.idle();
1813+
1814+
expect(comp1Runs).toBe(2);
1815+
expect(comp2Runs).toBe(2);
1816+
expect(effectRuns).toBe(2);
1817+
expect(effectResult.get()).toBe((5 + 1) * 2 - 3);
1818+
});
1819+
1820+
it("should drop stale dependents when computation changes inputs", async () => {
1821+
runtime.scheduler.enablePullMode();
1822+
1823+
const sourceA = runtime.getCell<number>(
1824+
space,
1825+
"pull-deps-source-a",
1826+
undefined,
1827+
tx,
1828+
);
1829+
sourceA.set(2);
1830+
const sourceB = runtime.getCell<number>(
1831+
space,
1832+
"pull-deps-source-b",
1833+
undefined,
1834+
tx,
1835+
);
1836+
sourceB.set(7);
1837+
const selector = runtime.getCell<boolean>(
1838+
space,
1839+
"pull-deps-selector",
1840+
undefined,
1841+
tx,
1842+
);
1843+
selector.set(false);
1844+
const intermediate = runtime.getCell<number>(
1845+
space,
1846+
"pull-deps-intermediate",
1847+
undefined,
1848+
tx,
1849+
);
1850+
intermediate.set(0);
1851+
const effectResult = runtime.getCell<number>(
1852+
space,
1853+
"pull-deps-effect",
1854+
undefined,
1855+
tx,
1856+
);
1857+
effectResult.set(0);
1858+
await tx.commit();
1859+
tx = runtime.edit();
1860+
1861+
let effectRuns = 0;
1862+
1863+
const computation: Action = (actionTx) => {
1864+
const useB = selector.withTx(actionTx).get();
1865+
const value = useB
1866+
? sourceB.withTx(actionTx).get()
1867+
: sourceA.withTx(actionTx).get();
1868+
intermediate.withTx(actionTx).send(value * 10);
1869+
};
1870+
1871+
const effect: Action = (actionTx) => {
1872+
effectRuns++;
1873+
const value = intermediate.withTx(actionTx).get();
1874+
effectResult.withTx(actionTx).send(value);
1875+
};
1876+
1877+
runtime.scheduler.subscribe(
1878+
computation,
1879+
{
1880+
reads: [
1881+
selector.getAsNormalizedFullLink(),
1882+
sourceA.getAsNormalizedFullLink(),
1883+
],
1884+
writes: [intermediate.getAsNormalizedFullLink()],
1885+
},
1886+
{ scheduleImmediately: true },
1887+
);
1888+
await runtime.idle();
1889+
1890+
runtime.scheduler.subscribe(
1891+
effect,
1892+
{
1893+
reads: [intermediate.getAsNormalizedFullLink()],
1894+
writes: [effectResult.getAsNormalizedFullLink()],
1895+
},
1896+
{ scheduleImmediately: true, isEffect: true },
1897+
);
1898+
await runtime.idle();
1899+
1900+
expect(effectRuns).toBe(1);
1901+
expect(effectResult.get()).toBe(20);
1902+
1903+
// Switch computation to sourceB
1904+
const toggleTx = runtime.edit();
1905+
selector.withTx(toggleTx).send(true);
1906+
await toggleTx.commit();
1907+
tx = runtime.edit();
1908+
await runtime.idle();
1909+
1910+
expect(effectRuns).toBe(2);
1911+
expect(effectResult.get()).toBe(70);
1912+
1913+
// Updating sourceA should not dirty the computation any more
1914+
const tx3 = runtime.edit();
1915+
sourceA.withTx(tx3).send(999);
1916+
await tx3.commit();
1917+
tx = runtime.edit();
1918+
await runtime.idle();
1919+
1920+
expect(effectRuns).toBe(2);
1921+
expect(effectResult.get()).toBe(70);
1922+
expect(runtime.scheduler.isDirty(computation)).toBe(false);
1923+
1924+
// Updating sourceB should still run the computation
1925+
const tx4 = runtime.edit();
1926+
sourceB.withTx(tx4).send(6);
1927+
await tx4.commit();
1928+
tx = runtime.edit();
1929+
await runtime.idle();
1930+
1931+
expect(effectRuns).toBe(3);
1932+
expect(effectResult.get()).toBe(60);
1933+
});
1934+
17181935
it("should track getStats with dirty count", async () => {
17191936
runtime.scheduler.enablePullMode();
17201937

0 commit comments

Comments
 (0)