Skip to content

Commit 61a0dce

Browse files
committed
added support for move tags on the shape class
1 parent 3aa5d00 commit 61a0dce

File tree

3 files changed

+1215
-5
lines changed

3 files changed

+1215
-5
lines changed

packages/typescript-client/src/helpers.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import {
22
ChangeMessage,
33
ControlMessage,
4+
EventMessage,
45
Message,
56
NormalizedPgSnapshot,
67
Offset,
@@ -31,6 +32,29 @@ export function isChangeMessage<T extends Row<unknown> = Row>(
3132
return `key` in message
3233
}
3334

35+
/**
36+
* Type guard for checking {@link Message} is {@link EventMessage}.
37+
*
38+
* See [TS docs](https://www.typescriptlang.org/docs/handbook/advanced-types.html#user-defined-type-guards)
39+
* for information on how to use type guards.
40+
*
41+
* @param message - the message to check
42+
* @returns true if the message is a {@link EventMessage}
43+
*
44+
* @example
45+
* ```ts
46+
* if (isEventMessage(message)) {
47+
* const msgChng: ChangeMessage = message // Err, type mismatch
48+
* const msgEvt: EventMessage = message // Ok
49+
* }
50+
* ```
51+
*/
52+
export function isEventMessage<T extends Row<unknown> = Row>(
53+
message: Message<T>
54+
): message is EventMessage {
55+
return !isChangeMessage(message) && `event` in message.headers
56+
}
57+
3458
/**
3559
* Type guard for checking {@link Message} is {@link ControlMessage}.
3660
*
@@ -51,7 +75,7 @@ export function isChangeMessage<T extends Row<unknown> = Row>(
5175
export function isControlMessage<T extends Row<unknown> = Row>(
5276
message: Message<T>
5377
): message is ControlMessage {
54-
return !isChangeMessage(message)
78+
return !isChangeMessage(message) && !isEventMessage(message)
5579
}
5680

5781
export function isUpToDateMessage<T extends Row<unknown> = Row>(

packages/typescript-client/src/shape.ts

Lines changed: 139 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Message, Offset, Row } from './types'
2-
import { isChangeMessage, isControlMessage } from './helpers'
1+
import { Message, Offset, Row, MoveTag } from './types'
2+
import { isChangeMessage, isControlMessage, isEventMessage } from './helpers'
33
import { FetchError } from './error'
44
import { LogMode, ShapeStreamInterface } from './client'
55

@@ -57,6 +57,8 @@ export class Shape<T extends Row<unknown> = Row> {
5757
#reexecuteSnapshotsPending = false
5858
#status: ShapeStatus = `syncing`
5959
#error: FetchError | false = false
60+
readonly #rowTags = new Map<string, Set<string>>() // key -> set of tag values (simplified for length-1 tags)
61+
readonly #tagIndex = new Map<string, Set<string>>() // tag value -> set of keys
6062

6163
constructor(stream: ShapeStreamInterface<T>) {
6264
this.stream = stream
@@ -176,15 +178,39 @@ export class Shape<T extends Row<unknown> = Row> {
176178
switch (message.headers.operation) {
177179
case `insert`:
178180
this.#data.set(message.key, message.value)
181+
// Track tags if present
182+
if (message.headers.tags) {
183+
const tags = new Set(message.headers.tags)
184+
this.#rowTags.set(message.key, tags)
185+
tags.forEach((tag) => this.#addTagToIndex(tag, message.key))
186+
}
179187
break
180188
case `update`:
181189
this.#data.set(message.key, {
182190
...this.#data.get(message.key)!,
183191
...message.value,
184192
})
193+
// Update tags if present
194+
if (message.headers.tags) {
195+
// Remove old tags from index
196+
const oldTags = this.#rowTags.get(message.key)
197+
if (oldTags) {
198+
oldTags.forEach((tag) =>
199+
this.#removeTagFromIndex(tag, message.key)
200+
)
201+
}
202+
// Set new tags
203+
const newTags = new Set(message.headers.tags)
204+
this.#rowTags.set(message.key, newTags)
205+
newTags.forEach((tag) => this.#addTagToIndex(tag, message.key))
206+
// If no tags left, remove the row
207+
this.#removeRowIfNoTags(message.key)
208+
}
185209
break
186210
case `delete`:
187211
this.#data.delete(message.key)
212+
// Clean up tag indices
213+
this.#removeRowFromTagIndices(message.key)
188214
break
189215
}
190216
} else {
@@ -193,26 +219,62 @@ export class Shape<T extends Row<unknown> = Row> {
193219
case `insert`:
194220
this.#insertedKeys.add(message.key)
195221
this.#data.set(message.key, message.value)
222+
// Track tags if present
223+
if (message.headers.tags) {
224+
const tags = new Set(message.headers.tags)
225+
this.#rowTags.set(message.key, tags)
226+
tags.forEach((tag) => this.#addTagToIndex(tag, message.key))
227+
}
196228
break
197229
case `update`:
198230
if (this.#insertedKeys.has(message.key)) {
199231
this.#data.set(message.key, {
200232
...this.#data.get(message.key)!,
201233
...message.value,
202234
})
235+
// Update tags if present
236+
if (message.headers.tags) {
237+
// Remove old tags from index
238+
const oldTags = this.#rowTags.get(message.key)
239+
if (oldTags) {
240+
oldTags.forEach((tag) =>
241+
this.#removeTagFromIndex(tag, message.key)
242+
)
243+
}
244+
// Set new tags
245+
const newTags = new Set(message.headers.tags)
246+
this.#rowTags.set(message.key, newTags)
247+
newTags.forEach((tag) =>
248+
this.#addTagToIndex(tag, message.key)
249+
)
250+
// If no tags left, remove the row
251+
this.#removeRowIfNoTags(message.key)
252+
}
203253
}
204254
break
205255
case `delete`:
206256
if (this.#insertedKeys.has(message.key)) {
207257
this.#data.delete(message.key)
208258
this.#insertedKeys.delete(message.key)
259+
// Clean up tag indices
260+
this.#removeRowFromTagIndices(message.key)
209261
}
210262
break
211263
}
212264
}
213-
}
265+
} else if (isEventMessage(message)) {
266+
shouldNotify = this.#updateShapeStatus(`syncing`)
214267

215-
if (isControlMessage(message)) {
268+
switch (message.headers.event) {
269+
case `move-out`:
270+
for (const { pos, value } of message.headers.patterns) {
271+
if (pos != 0)
272+
throw new Error(`Only 1-width tags are currently supported`)
273+
this.#removeAllByTagPattern(pos, value)
274+
}
275+
break
276+
}
277+
} else if (isControlMessage(message)) {
216278
switch (message.headers.control) {
217279
case `up-to-date`:
218280
shouldNotify = this.#updateShapeStatus(`up-to-date`)
@@ -224,6 +286,8 @@ export class Shape<T extends Row<unknown> = Row> {
224286
case `must-refetch`:
225287
this.#data.clear()
226288
this.#insertedKeys.clear()
289+
this.#rowTags.clear()
290+
this.#tagIndex.clear()
227291
this.#error = false
228292
shouldNotify = this.#updateShapeStatus(`syncing`)
229293
// Flag to re-execute sub-snapshots once the new shape is up-to-date
@@ -290,4 +354,75 @@ export class Shape<T extends Row<unknown> = Row> {
290354
callback({ value: this.currentValue, rows: this.currentRows })
291355
})
292356
}
357+
358+
/**
359+
* Adds a key to the tag index for the given tag.
360+
*/
361+
#addTagToIndex(tag: string, key: string): void {
362+
let keys = this.#tagIndex.get(tag)
363+
if (!keys) {
364+
keys = new Set()
365+
this.#tagIndex.set(tag, keys)
366+
}
367+
keys.add(key)
368+
}
369+
370+
/**
371+
* Removes a key from the tag index for the given tag.
372+
* If the tag has no more keys, removes the tag from the index.
373+
*/
374+
#removeTagFromIndex(tag: string, key: string): void {
375+
const keys = this.#tagIndex.get(tag)
376+
if (keys) {
377+
keys.delete(key)
378+
if (keys.size === 0) {
379+
this.#tagIndex.delete(tag)
380+
}
381+
}
382+
}
383+
384+
/**
385+
* Removes a row from all tag indices.
386+
* Should be called when a row is being deleted.
387+
*/
388+
#removeRowFromTagIndices(key: string): void {
389+
const tags = this.#rowTags.get(key)
390+
if (tags) {
391+
tags.forEach((tag) => this.#removeTagFromIndex(tag, key))
392+
this.#rowTags.delete(key)
393+
}
394+
}
395+
396+
/**
397+
* Checks if a row has no tags and removes it if so.
398+
* Returns true if the row was removed.
399+
*/
400+
#removeRowIfNoTags(key: string): boolean {
401+
const tags = this.#rowTags.get(key)
402+
if (tags && tags.size === 0) {
403+
this.#data.delete(key)
404+
this.#rowTags.delete(key)
405+
this.#insertedKeys.delete(key)
406+
return true
407+
}
408+
return false
409+
}
410+
411+
#removeAllByTagPattern(_pos: number, tag: string): void {
412+
// TODO: This is naive, working only while tags are single-width
413+
414+
const keys = this.#tagIndex.get(tag)
415+
if (keys) {
416+
for (const key of keys) {
417+
if (this.#rowTags.get(key)?.delete(tag)) {
418+
if (this.#rowTags.get(key)?.size === 0) {
419+
this.#data.delete(key)
420+
this.#rowTags.delete(key)
421+
this.#insertedKeys.delete(key)
422+
}
423+
}
424+
}
425+
this.#tagIndex.delete(tag)
426+
}
427+
}
293428
}

0 commit comments

Comments
 (0)