Skip to content

Commit 7d854fd

Browse files
committed
feat: surface update errors
1 parent 5961d2e commit 7d854fd

File tree

8 files changed

+177
-7
lines changed

8 files changed

+177
-7
lines changed

llms.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ The resolved room implements:
278278
to stay within limits.
279279
- On receiving `DocUpdate`/fragments, the client reassembles updates and passes
280280
them to `crdtAdaptor.applyUpdate`.
281-
- `Ack` messages are logged; non‑zero statuses indicate the update batch was rejected.
281+
- `Ack` messages with non‑zero status trigger `crdtAdaptor.onUpdateError(updates, status)` using the original sent batch; missing batches are still logged.
282282

283283
### 3.8 Ping/Pong Integration
284284

packages/loro-adaptors/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ import { YjsAwarenessServerAdaptor } from "loro-adaptors/yjs";
107107
## API
108108

109109
- `loro-adaptors/loro`
110-
- `new LoroAdaptor(doc?: LoroDoc, config?: { onImportError? })`
110+
- `new LoroAdaptor(doc?: LoroDoc, config?: { onImportError?, onUpdateError? })`
111111
- `new LoroEphemeralAdaptor(store?: EphemeralStore)`
112112
- `new LoroPersistentStoreAdaptor(store?: EphemeralStore)`
113-
- `new EloAdaptor(docOrConfig: LoroDoc | { getPrivateKey, ivFactory?, onDecryptError? })`
113+
- `new EloAdaptor(docOrConfig: LoroDoc | { getPrivateKey, ivFactory?, onDecryptError?, onUpdateError? })`
114114
- `loro-adaptors/flock`
115-
- `new FlockAdaptor(flock: Flock, config?: { onImportError? })`
115+
- `new FlockAdaptor(flock: Flock, config?: { onImportError?, onUpdateError? })`
116116
- `loro-adaptors/yjs`
117117
- `new YjsAwarenessServerAdaptor()`
118118

packages/loro-adaptors/src/elo-adaptor.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ export interface EloAdaptorConfig {
2424
err: Error,
2525
meta: { kind: "delta" | "snapshot"; keyId: string }
2626
) => void;
27+
onUpdateError?: (
28+
updates: Uint8Array[],
29+
errorCode: number,
30+
reason?: string
31+
) => void;
2732
}
2833

2934
export class EloAdaptor implements CrdtDocAdaptor {
@@ -157,6 +162,14 @@ export class EloAdaptor implements CrdtDocAdaptor {
157162
return undefined;
158163
}
159164

165+
onUpdateError(
166+
updates: Uint8Array[],
167+
errorCode: number,
168+
reason?: string
169+
): void {
170+
this.config.onUpdateError?.(updates, errorCode, reason);
171+
}
172+
160173
async handleJoinOk(res: JoinResponseOk): Promise<void> {
161174
if (this.destroyed) return;
162175
try {

packages/loro-adaptors/src/flock-adaptor.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ function deserializeBundle(bytes: Uint8Array): FlockExportBundle {
9595

9696
export interface FlockAdaptorConfig {
9797
onImportError?: (error: Error, data: Uint8Array[]) => void;
98+
onUpdateError?: (
99+
updates: Uint8Array[],
100+
errorCode: number,
101+
reason?: string
102+
) => void;
98103
}
99104

100105
/**
@@ -169,6 +174,14 @@ export class FlockAdaptor implements CrdtDocAdaptor {
169174
return undefined;
170175
}
171176

177+
onUpdateError(
178+
updates: Uint8Array[],
179+
errorCode: number,
180+
reason?: string
181+
): void {
182+
this.config.onUpdateError?.(updates, errorCode, reason);
183+
}
184+
172185
async handleJoinOk(res: JoinResponseOk): Promise<void> {
173186
if (this.destroyed) return;
174187
try {

packages/loro-adaptors/src/loro-adaptor.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import type { CrdtAdaptorContext, CrdtDocAdaptor } from "./types";
77

88
export interface LoroAdaptorConfig {
99
onImportError?: (error: Error, data: Uint8Array[]) => void;
10+
onUpdateError?: (
11+
updates: Uint8Array[],
12+
errorCode: number,
13+
reason?: string
14+
) => void;
1015
}
1116

1217
export class LoroAdaptor implements CrdtDocAdaptor {
@@ -74,6 +79,14 @@ export class LoroAdaptor implements CrdtDocAdaptor {
7479
return undefined;
7580
}
7681

82+
onUpdateError(
83+
updates: Uint8Array[],
84+
errorCode: number,
85+
reason?: string
86+
): void {
87+
this.config.onUpdateError?.(updates, errorCode, reason);
88+
}
89+
7790
async handleJoinOk(res: JoinResponseOk): Promise<void> {
7891
if (this.destroyed) return;
7992

packages/loro-adaptors/src/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ export interface CrdtDocAdaptor {
3939
getAlternativeVersion?: (
4040
currentVersion: Uint8Array
4141
) => Uint8Array | undefined;
42+
/**
43+
* Called when the server rejects a previously sent update batch.
44+
* @param updates The original updates that were sent
45+
* @param errorCode The numeric update status code from the Ack
46+
* @param reason Optional human-readable reason if available
47+
*/
48+
onUpdateError?: (
49+
updates: Uint8Array[],
50+
errorCode: number,
51+
reason?: string
52+
) => void;
4253
// Legacy hook retained for compatibility
4354
handleUpdateError?: (error: unknown) => void;
4455
destroy: () => void;

packages/loro-websocket/src/client/index.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ export class LoroWebsocketClient {
167167
private activeRooms: Map<string, ActiveRoom> = new Map();
168168
// Buffer for %ELO only: backfills can arrive immediately after JoinResponseOk
169169
private preJoinUpdates: Map<string, Array<{ updates: Uint8Array[]; refId?: HexString }>> = new Map();
170+
// Track outbound update batches so we can surface errors with payload context
171+
private sentUpdateBatches: Map<HexString, { roomKey: string; updates: Uint8Array[] }> = new Map();
170172
private fragmentBatches: Map<string, FragmentBatch> = new Map();
171173
private roomAdaptors: Map<string, CrdtDocAdaptor> = new Map();
172174
// Track roomId for each active id so we can rejoin on reconnect
@@ -856,6 +858,7 @@ export class LoroWebsocketClient {
856858

857859
cleanupRoom(roomId: string, crdtType: CrdtType) {
858860
const id = crdtType + roomId;
861+
this.purgeSentBatchesForRoom(id);
859862
this.activeRooms.delete(id);
860863
this.pendingRooms.delete(id);
861864
this.roomAdaptors.delete(id);
@@ -1079,6 +1082,12 @@ export class LoroWebsocketClient {
10791082
);
10801083

10811084
const batchId = randomBatchId();
1085+
const roomKey = `${crdt}${roomId}`;
1086+
// Store the original payload so we can surface detailed errors on Ack
1087+
this.sentUpdateBatches.set(batchId, {
1088+
roomKey,
1089+
updates: [update.slice()],
1090+
});
10821091

10831092
if (update.length <= FRAG_LIMIT) {
10841093
// Send as a single DocUpdate with one update entry
@@ -1135,6 +1144,22 @@ export class LoroWebsocketClient {
11351144
);
11361145
}
11371146

1147+
consumeSentBatch(refId: HexString): { roomKey: string; updates: Uint8Array[] } | undefined {
1148+
const entry = this.sentUpdateBatches.get(refId);
1149+
if (entry) {
1150+
this.sentUpdateBatches.delete(refId);
1151+
}
1152+
return entry;
1153+
}
1154+
1155+
private purgeSentBatchesForRoom(roomKey: string): void {
1156+
for (const [refId, entry] of Array.from(this.sentUpdateBatches.entries())) {
1157+
if (entry.roomKey === roomKey) {
1158+
this.sentUpdateBatches.delete(refId);
1159+
}
1160+
}
1161+
}
1162+
11381163
/**
11391164
* Destroy the client, removing listeners and stopping timers.
11401165
* After destroy, the instance should not be used.
@@ -1162,6 +1187,7 @@ export class LoroWebsocketClient {
11621187
this.ops.onWsClose?.();
11631188
}
11641189
this.queuedJoins = [];
1190+
this.sentUpdateBatches.clear();
11651191
this.detachSocketListeners(ws);
11661192
try {
11671193
this.removeNetworkListeners?.();
@@ -1482,8 +1508,14 @@ class LoroWebsocketClientRoomImpl
14821508
}
14831509

14841510
handleAck(ack: Ack) {
1511+
const sent = this.client.consumeSentBatch(ack.refId);
14851512
if (ack.status !== UpdateStatusCode.Ok) {
1486-
console.warn(`Ack status ${ack.status} for ${this.crdtType}:${this.roomId} (ref ${ack.refId})`);
1513+
const updates = sent?.updates ?? [];
1514+
const reason = updateStatusToReason(ack.status);
1515+
this.crdtAdaptor.onUpdateError?.(updates, ack.status, reason);
1516+
if (!sent) {
1517+
console.warn(`Ack status ${ack.status} for ${this.crdtType}:${this.roomId} (ref ${ack.refId}) with no matching batch`);
1518+
}
14871519
}
14881520
}
14891521

@@ -1524,6 +1556,27 @@ class LoroWebsocketClientRoomImpl
15241556

15251557
// --- Keepalive helpers (ping/pong) ---
15261558

1559+
function updateStatusToReason(status: UpdateStatusCode): string | undefined {
1560+
switch (status) {
1561+
case UpdateStatusCode.Unknown:
1562+
return "unknown";
1563+
case UpdateStatusCode.PermissionDenied:
1564+
return "permission_denied";
1565+
case UpdateStatusCode.InvalidUpdate:
1566+
return "invalid_update";
1567+
case UpdateStatusCode.PayloadTooLarge:
1568+
return "payload_too_large";
1569+
case UpdateStatusCode.RateLimited:
1570+
return "rate_limited";
1571+
case UpdateStatusCode.FragmentTimeout:
1572+
return "fragment_timeout";
1573+
case UpdateStatusCode.AppError:
1574+
return "app_error";
1575+
default:
1576+
return undefined;
1577+
}
1578+
}
1579+
15271580
// --- Internal ping helpers ---
15281581
function isPositive(v: unknown): v is number {
15291582
return typeof v === "number" && isFinite(v) && v > 0;

0 commit comments

Comments
 (0)