Skip to content

Commit 0a0792b

Browse files
committed
fix: resolve auth before join
1 parent 6570a2e commit 0a0792b

File tree

2 files changed

+117
-32
lines changed

2 files changed

+117
-32
lines changed

packages/loro-websocket/src/client/index.ts

Lines changed: 70 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import type { CrdtDocAdaptor } from "loro-adaptors";
2424

2525
export * from "loro-adaptors";
2626

27+
export type AuthProvider = () => Uint8Array | Promise<Uint8Array>;
28+
type AuthOption = Uint8Array | AuthProvider;
29+
2730
interface FragmentBatch {
2831
header: DocUpdateFragmentHeader;
2932
fragments: Map<number, Uint8Array>;
@@ -173,7 +176,7 @@ export class LoroWebsocketClient {
173176
private roomAdaptors: Map<string, CrdtDocAdaptor> = new Map();
174177
// Track roomId for each active id so we can rejoin on reconnect
175178
private roomIds: Map<string, string> = new Map();
176-
private roomAuth: Map<string, Uint8Array | undefined> = new Map();
179+
private roomAuth: Map<string, AuthOption | undefined> = new Map();
177180
private roomStatusListeners: Map<
178181
string,
179182
Set<(s: RoomJoinStatusValue) => void>
@@ -206,6 +209,17 @@ export class LoroWebsocketClient {
206209
void this.connect();
207210
}
208211

212+
private async resolveAuth(auth?: AuthOption): Promise<Uint8Array> {
213+
if (typeof auth === "function") {
214+
const value = await auth();
215+
if (!(value instanceof Uint8Array)) {
216+
throw new Error("Auth provider must return Uint8Array");
217+
}
218+
return value;
219+
}
220+
return auth ?? new Uint8Array();
221+
}
222+
209223
get socket(): WebSocket {
210224
return this.ws;
211225
}
@@ -562,17 +576,27 @@ export class LoroWebsocketClient {
562576
if (!roomId) continue;
563577
const active = this.activeRooms.get(id);
564578
if (!active) continue;
565-
this.sendRejoinRequest(id, roomId, adaptor, active.room, this.roomAuth.get(id));
579+
void this.sendRejoinRequest(id, roomId, adaptor, active.room, this.roomAuth.get(id));
566580
}
567581
}
568582

569-
private sendRejoinRequest(
583+
private async sendRejoinRequest(
570584
id: string,
571585
roomId: string,
572586
adaptor: CrdtDocAdaptor,
573587
room: LoroWebsocketClientRoom,
574-
auth?: Uint8Array
588+
auth?: AuthOption
575589
) {
590+
let authValue: Uint8Array;
591+
try {
592+
authValue = await this.resolveAuth(auth);
593+
} catch (e) {
594+
console.error("Failed to resolve auth for rejoin:", e);
595+
this.cleanupRoom(roomId, adaptor.crdtType);
596+
this.emitRoomStatus(id, RoomJoinStatus.Error);
597+
return;
598+
}
599+
576600
// Prepare a lightweight pending entry so JoinError handling can retry version formats
577601
const pending: PendingRoom = {
578602
room: Promise.resolve(room),
@@ -594,7 +618,7 @@ export class LoroWebsocketClient {
594618
},
595619
adaptor,
596620
roomId,
597-
auth,
621+
auth: authValue,
598622
isRejoin: true,
599623
};
600624
this.pendingRooms.set(id, pending);
@@ -603,7 +627,7 @@ export class LoroWebsocketClient {
603627
type: MessageType.JoinRequest,
604628
crdt: adaptor.crdtType,
605629
roomId,
606-
auth: auth ?? new Uint8Array(),
630+
auth: authValue,
607631
version: adaptor.getVersion(),
608632
} as JoinRequest);
609633

@@ -677,7 +701,7 @@ export class LoroWebsocketClient {
677701
// Drop any in-flight join since the server explicitly removed us
678702
this.pendingRooms.delete(roomId);
679703
if (shouldRejoin && active && adaptor) {
680-
this.sendRejoinRequest(roomId, msg.roomId, adaptor, active.room, auth);
704+
void this.sendRejoinRequest(roomId, msg.roomId, adaptor, active.room, auth);
681705
} else {
682706
// Remove local room state so client does not auto-retry unless requested
683707
this.cleanupRoom(msg.roomId, msg.crdt);
@@ -925,7 +949,7 @@ export class LoroWebsocketClient {
925949
}: {
926950
roomId: string;
927951
crdtAdaptor: CrdtDocAdaptor;
928-
auth?: Uint8Array;
952+
auth?: AuthOption;
929953
onStatusChange?: (s: RoomJoinStatusValue) => void;
930954
}): Promise<LoroWebsocketClientRoom> {
931955
const id = crdtAdaptor.crdtType + roomId;
@@ -940,8 +964,8 @@ export class LoroWebsocketClient {
940964
return Promise.resolve(active.room);
941965
}
942966

943-
let resolve: (res: JoinResponseOk) => void;
944-
let reject: (error: Error) => void;
967+
let resolve!: (res: JoinResponseOk) => void;
968+
let reject!: (error: Error) => void;
945969

946970
const response = new Promise<JoinResponseOk>((resolve_, reject_) => {
947971
resolve = resolve_;
@@ -1005,31 +1029,45 @@ export class LoroWebsocketClient {
10051029
return room;
10061030
});
10071031

1008-
this.pendingRooms.set(id, {
1009-
room,
1010-
resolve: resolve!,
1011-
reject: reject!,
1012-
adaptor: crdtAdaptor,
1013-
roomId,
1014-
auth,
1015-
});
10161032
this.roomAuth.set(id, auth);
10171033

1018-
const joinPayload = encode({
1019-
type: MessageType.JoinRequest,
1020-
crdt: crdtAdaptor.crdtType,
1021-
roomId,
1022-
auth: auth ?? new Uint8Array(),
1023-
version: crdtAdaptor.getVersion(),
1024-
} as JoinRequest);
1034+
// Resolve auth before registering pending room to avoid race condition
1035+
// where JoinError retry might use undefined auth
1036+
void this.resolveAuth(auth)
1037+
.then(authValue => {
1038+
// Register pending room only after auth is resolved
1039+
this.pendingRooms.set(id, {
1040+
room,
1041+
resolve: resolve!,
1042+
reject: reject!,
1043+
adaptor: crdtAdaptor,
1044+
roomId,
1045+
auth: authValue,
1046+
});
10251047

1026-
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
1027-
this.ws.send(joinPayload);
1028-
} else {
1029-
this.enqueueJoin(joinPayload);
1030-
// ensure a connection attempt is running
1031-
void this.connect();
1032-
}
1048+
const joinPayload = encode({
1049+
type: MessageType.JoinRequest,
1050+
crdt: crdtAdaptor.crdtType,
1051+
roomId,
1052+
auth: authValue,
1053+
version: crdtAdaptor.getVersion(),
1054+
} as JoinRequest);
1055+
1056+
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
1057+
this.ws.send(joinPayload);
1058+
} else {
1059+
this.enqueueJoin(joinPayload);
1060+
// ensure a connection attempt is running
1061+
void this.connect();
1062+
}
1063+
})
1064+
.catch(err => {
1065+
const error = err instanceof Error ? err : new Error(String(err));
1066+
this.emitRoomStatus(id, RoomJoinStatus.Error);
1067+
reject(error);
1068+
this.cleanupRoom(roomId, crdtAdaptor.crdtType);
1069+
this.pendingRooms.delete(id);
1070+
});
10331071

10341072
return room;
10351073
}

packages/loro-websocket/tests/e2e.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,53 @@ describe("E2E: Client-Server Sync", () => {
571571
await authServer.stop();
572572
}, 15000);
573573

574+
it("fetches fresh auth on rejoin when auth provider is used", async () => {
575+
const port = await getPort();
576+
const tokens: string[] = [];
577+
578+
const server = new SimpleServer({
579+
port,
580+
authenticate: async (_roomId, _crdt, auth) => {
581+
tokens.push(new TextDecoder().decode(auth));
582+
return "write";
583+
},
584+
});
585+
await server.start();
586+
587+
const client = new LoroWebsocketClient({
588+
url: `ws://localhost:${port}`,
589+
reconnect: { initialDelayMs: 20, maxDelayMs: 100, jitter: 0 },
590+
});
591+
592+
let room: LoroWebsocketClientRoom | undefined;
593+
try {
594+
await client.waitConnected();
595+
let call = 0;
596+
const adaptor = new LoroAdaptor();
597+
598+
room = await client.join({
599+
roomId: "auth-refresh",
600+
crdtAdaptor: adaptor,
601+
auth: async () => new TextEncoder().encode(`token-${++call}`),
602+
});
603+
604+
await waitUntil(() => tokens.length >= 1, 5000, 25);
605+
606+
await server.stop();
607+
await new Promise(resolve => setTimeout(resolve, 60));
608+
await server.start();
609+
610+
await waitUntil(() => tokens.some(t => t === "token-2"), 10000, 50);
611+
612+
expect(tokens[0]).toBe("token-1");
613+
expect(tokens.some(t => t === "token-2")).toBe(true);
614+
} finally {
615+
await room?.destroy();
616+
client.destroy();
617+
await server.stop();
618+
}
619+
}, 15000);
620+
574621
it("destroy rejects pending ping waiters", async () => {
575622
const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` });
576623
await client.waitConnected();

0 commit comments

Comments
 (0)