diff --git a/src/react/useDeltaStreams.ts b/src/react/useDeltaStreams.ts index 799feca1..3acb5288 100644 --- a/src/react/useDeltaStreams.ts +++ b/src/react/useDeltaStreams.ts @@ -103,36 +103,51 @@ export function useDeltaStreams< | undefined; const newDeltas = cursorQuery?.streams.deltas; - if (newDeltas?.length && streamMessages) { + if (streamMessages) { const newDeltasByStreamId = new Map(); - for (const delta of newDeltas) { - const oldCursor = cursors[delta.streamId]; - if (oldCursor && delta.start < oldCursor) continue; - const existing = newDeltasByStreamId.get(delta.streamId); - if (existing) { - const previousEnd = existing.at(-1)!.end; - assert( - previousEnd === delta.start, - `Gap found in deltas for ${delta.streamId} jumping to ${delta.start} from ${previousEnd}`, - ); - existing.push(delta); - } else { - assert( - !oldCursor || oldCursor === delta.start, - `Gap found - first delta after ${oldCursor} is ${delta.start} for stream ${delta.streamId}`, - ); - newDeltasByStreamId.set(delta.streamId, [delta]); + if (newDeltas?.length) { + for (const delta of newDeltas) { + const oldCursor = cursors[delta.streamId]; + if (oldCursor && delta.start < oldCursor) continue; + const existing = newDeltasByStreamId.get(delta.streamId); + if (existing) { + const previousEnd = existing.at(-1)!.end; + assert( + previousEnd === delta.start, + `Gap found in deltas for ${delta.streamId} jumping to ${delta.start} from ${previousEnd}`, + ); + existing.push(delta); + } else { + assert( + !oldCursor || oldCursor === delta.start, + `Gap found - first delta after ${oldCursor} is ${delta.start} for stream ${delta.streamId}`, + ); + newDeltasByStreamId.set(delta.streamId, [delta]); + } } } const newCursors: Record = {}; + let cursorsChanged = false; for (const { streamId } of streamMessages) { const cursor = newDeltasByStreamId.get(streamId)?.at(-1)?.end ?? cursors[streamId]; if (cursor !== undefined) { newCursors[streamId] = cursor; + if (cursors[streamId] !== cursor) { + cursorsChanged = true; + } } } - setCursors(newCursors); + // Also check if any streams were removed from cursors + for (const streamId in cursors) { + if (!(streamId in newCursors)) { + cursorsChanged = true; + break; + } + } + if (cursorsChanged) { + setCursors(newCursors); + } // we defensively create a new object so object identity matches contents state.deltaStreams = streamMessages.map((streamMessage) => {