Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/state/CallViewModel/localMember/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ export class Publisher {
const track$ = scope.behavior(
observeTrackReference$(room.localParticipant, Track.Source.Camera).pipe(
map((trackRef) => {
const track = trackRef?.publication?.track;
const track = trackRef?.publication.track;
return track instanceof LocalVideoTrack ? track : null;
}),
),
Expand Down
103 changes: 33 additions & 70 deletions src/state/CallViewModel/remoteMembers/Connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import {
ElementCallError,
FailToGetOpenIdToken,
} from "../../../utils/errors.ts";
import { mockRemoteParticipant } from "../../../utils/test.ts";

let testScope: ObservableScope;

Expand Down Expand Up @@ -376,46 +377,32 @@ describe("Start connection states", () => {
});
});

function fakeRemoteLivekitParticipant(
id: string,
publications: number = 1,
): RemoteParticipant {
return {
identity: id,
getTrackPublications: () => Array(publications),
} as unknown as RemoteParticipant;
}

describe("Publishing participants observations", () => {
it("should emit the list of publishing participants", () => {
describe("remote participants", () => {
it("emits the list of remote participants", () => {
setupTest();

const connection = setupRemoteConnection();

const bobIsAPublisher = Promise.withResolvers<void>();
const danIsAPublisher = Promise.withResolvers<void>();
const observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
if (publishers.some((p) => p.identity === "@bob:example.org:DEV111")) {
bobIsAPublisher.resolve();
}
if (publishers.some((p) => p.identity === "@dan:example.org:DEV333")) {
danIsAPublisher.resolve();
}
},
);
const observedParticipants: RemoteParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((participants) => {
observedParticipants.push(participants);
});
onTestFinished(() => s.unsubscribe());
// The remoteParticipants$ observable is derived from the current members of the
// livekitRoom and the rtc membership in order to publish the members that are publishing
// on this connection.

let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@alice:example.org:DEV000", 0),
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
fakeRemoteLivekitParticipant("@carol:example.org:DEV222", 0),
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 0),
const participants: RemoteParticipant[] = [
mockRemoteParticipant({ identity: "@alice:example.org:DEV000" }),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
mockRemoteParticipant({ identity: "@carol:example.org:DEV222" }),
// Mock Dan to have no published tracks. We want him to still show show up
// in the participants list.
mockRemoteParticipant({
identity: "@dan:example.org:DEV333",
getTrackPublication: () => undefined,
getTrackPublications: () => [],
}),
];

// Let's simulate 3 members on the livekitRoom
Expand All @@ -427,38 +414,23 @@ describe("Publishing participants observations", () => {
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
);

// At this point there should be no publishers
expect(observedPublishers.pop()!.length).toEqual(0);

participants = [
fakeRemoteLivekitParticipant("@alice:example.org:DEV000", 1),
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1),
fakeRemoteLivekitParticipant("@carol:example.org:DEV222", 1),
fakeRemoteLivekitParticipant("@dan:example.org:DEV333", 2),
];
participants.forEach((p) =>
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, p),
);

// At this point there should be no publishers
expect(observedPublishers.pop()!.length).toEqual(4);
// All remote participants should be present
expect(observedParticipants.pop()!.length).toEqual(4);
});

it("should be scoped to parent scope", (): void => {
setupTest();

const connection = setupRemoteConnection();

let observedPublishers: RemoteParticipant[][] = [];
const s = connection.remoteParticipantsWithTracks$.subscribe(
(publishers) => {
observedPublishers.push(publishers);
},
);
let observedParticipants: RemoteParticipant[][] = [];
const s = connection.remoteParticipants$.subscribe((participants) => {
observedParticipants.push(participants);
});
onTestFinished(() => s.unsubscribe());

let participants: RemoteParticipant[] = [
fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 0),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
];

// Let's simulate 3 members on the livekitRoom
Expand All @@ -470,35 +442,26 @@ describe("Publishing participants observations", () => {
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
}

// At this point there should be no publishers
expect(observedPublishers.pop()!.length).toEqual(0);

participants = [fakeRemoteLivekitParticipant("@bob:example.org:DEV111", 1)];

for (const participant of participants) {
fakeLivekitRoom.emit(RoomEvent.ParticipantConnected, participant);
}

// We should have bob has a publisher now
const publishers = observedPublishers.pop();
expect(publishers?.length).toEqual(1);
expect(publishers?.[0]?.identity).toEqual("@bob:example.org:DEV111");
// We should have bob as a participant now
const ps = observedParticipants.pop();
expect(ps?.length).toEqual(1);
expect(ps?.[0]?.identity).toEqual("@bob:example.org:DEV111");

// end the parent scope
testScope.end();
observedPublishers = [];
observedParticipants = [];

// SHOULD NOT emit any more publishers as the scope is ended
// SHOULD NOT emit any more participants as the scope is ended
participants = participants.filter(
(p) => p.identity !== "@bob:example.org:DEV111",
);

fakeLivekitRoom.emit(
RoomEvent.ParticipantDisconnected,
fakeRemoteLivekitParticipant("@bob:example.org:DEV111"),
mockRemoteParticipant({ identity: "@bob:example.org:DEV111" }),
);

expect(observedPublishers.length).toEqual(0);
expect(observedParticipants.length).toEqual(0);
});
});

Expand Down
31 changes: 9 additions & 22 deletions src/state/CallViewModel/remoteMembers/Connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
ConnectionError,
type Room as LivekitRoom,
type RemoteParticipant,
RoomEvent,
} from "livekit-client";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { BehaviorSubject, map } from "rxjs";
Expand Down Expand Up @@ -96,11 +95,13 @@ export class Connection {
private scope: ObservableScope;

/**
* An observable of the participants that are publishing on this connection. (Excluding our local participant)
* This is derived from `participantsIncludingSubscribers$` and `remoteTransports$`.
* It filters the participants to only those that are associated with a membership that claims to publish on this connection.
* The remote LiveKit participants that are visible on this connection.
*
* Note that this may include participants that are connected only to
* subscribe, or publishers that are otherwise unattested in MatrixRTC state.
* It is therefore more low-level than what should be presented to the user.
*/
public readonly remoteParticipantsWithTracks$: Behavior<RemoteParticipant[]>;
public readonly remoteParticipants$: Behavior<RemoteParticipant[]>;

/**
* Whether the connection has been stopped.
Expand Down Expand Up @@ -231,23 +232,9 @@ export class Connection {
this.transport = transport;
this.client = client;

// REMOTE participants with track!!!
// this.remoteParticipantsWithTracks$
this.remoteParticipantsWithTracks$ = scope.behavior(
// only tracks remote participants
connectedParticipantsObserver(this.livekitRoom, {
additionalRoomEvents: [
RoomEvent.TrackPublished,
RoomEvent.TrackUnpublished,
],
}).pipe(
map((participants) => {
return participants.filter(
(participant) => participant.getTrackPublications().length > 0,
);
}),
),
[],
this.remoteParticipants$ = scope.behavior(
// Only tracks remote participants
connectedParticipantsObserver(this.livekitRoom),
);

scope.onEnd(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ beforeEach(() => {
(transport: LivekitTransport, scope: ObservableScope) => {
const mockConnection = {
transport,
remoteParticipantsWithTracks$: new BehaviorSubject([]),
remoteParticipants$: new BehaviorSubject([]),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn();
Expand Down Expand Up @@ -200,7 +200,7 @@ describe("connections$ stream", () => {
});

describe("connectionManagerData$ stream", () => {
// Used in test to control fake connections' remoteParticipantsWithTracks$ streams
// Used in test to control fake connections' remoteParticipants$ streams
let fakeRemoteParticipantsStreams: Map<string, Behavior<RemoteParticipant[]>>;

function keyForTransport(transport: LivekitTransport): string {
Expand Down Expand Up @@ -229,7 +229,7 @@ describe("connectionManagerData$ stream", () => {
>([]);
const mockConnection = {
transport,
remoteParticipantsWithTracks$: getRemoteParticipantsFor(transport),
remoteParticipants$: getRemoteParticipantsFor(transport),
} as unknown as Connection;
vi.mocked(mockConnection).start = vi.fn();
vi.mocked(mockConnection).stop = vi.fn();
Expand Down
27 changes: 5 additions & 22 deletions src/state/CallViewModel/remoteMembers/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-Element-Commercial
Please see LICENSE in the repository root for full details.
*/

import {
type LivekitTransport,
type ParticipantId,
} from "matrix-js-sdk/lib/matrixrtc";
import { type LivekitTransport } from "matrix-js-sdk/lib/matrixrtc";
import { combineLatest, map, of, switchMap, tap } from "rxjs";
import { type Logger } from "matrix-js-sdk/lib/logger";
import { type RemoteParticipant } from "livekit-client";
Expand Down Expand Up @@ -57,34 +54,20 @@ export class ConnectionManagerData {
const key = transport.livekit_service_url + "|" + transport.livekit_alias;
return this.store.get(key)?.[1] ?? [];
}
/**
* Get all connections where the given participant is publishing.
* In theory, there could be several connections where the same participant is publishing but with
* only well behaving clients a participant should only be publishing on a single connection.
* @param participantId
*/
public getConnectionsForParticipant(
participantId: ParticipantId,
): Connection[] {
const connections: Connection[] = [];
for (const [connection, participants] of this.store.values()) {
if (participants.some((p) => p.identity === participantId)) {
connections.push(connection);
}
}
return connections;
}
}

interface Props {
scope: ObservableScope;
connectionFactory: ConnectionFactory;
inputTransports$: Behavior<Epoch<LivekitTransport[]>>;
logger: Logger;
}

// TODO - write test for scopes (do we really need to bind scope)
export interface IConnectionManager {
connectionManagerData$: Behavior<Epoch<ConnectionManagerData>>;
}

/**
* Crete a `ConnectionManager`
* @param scope the observable scope used by this object.
Expand Down Expand Up @@ -169,7 +152,7 @@ export function createConnectionManager$({
// Map the connections to list of {connection, participants}[]
const listOfConnectionsWithRemoteParticipants = connections.value.map(
(connection) => {
return connection.remoteParticipantsWithTracks$.pipe(
return connection.remoteParticipants$.pipe(
map((participants) => ({
connection,
participants,
Expand Down
41 changes: 39 additions & 2 deletions src/state/MediaViewModel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
createLocalMedia,
createRemoteMedia,
withTestScheduler,
mockRemoteParticipant,
} from "../utils/test";
import { getValue } from "../utils/observable";
import { constant } from "./Behavior";
Expand All @@ -44,7 +45,11 @@ const rtcMembership = mockRtcMembership("@alice:example.org", "AAAA");

test("control a participant's volume", () => {
const setVolumeSpy = vi.fn();
const vm = createRemoteMedia(rtcMembership, {}, { setVolume: setVolumeSpy });
const vm = createRemoteMedia(
rtcMembership,
{},
mockRemoteParticipant({ setVolume: setVolumeSpy }),
);
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-ab---c---d|", {
a() {
Expand Down Expand Up @@ -88,7 +93,7 @@ test("control a participant's volume", () => {
});

test("toggle fit/contain for a participant's video", () => {
const vm = createRemoteMedia(rtcMembership, {}, {});
const vm = createRemoteMedia(rtcMembership, {}, mockRemoteParticipant({}));
withTestScheduler(({ expectObservable, schedule }) => {
schedule("-ab|", {
a: () => vm.toggleFitContain(),
Expand Down Expand Up @@ -199,3 +204,35 @@ test("switch cameras", async () => {
});
expect(deviceId).toBe("front camera");
});

test("remote media is in waiting state when participant has not yet connected", () => {
const vm = createRemoteMedia(rtcMembership, {}, null); // null participant
expect(vm.waitingForMedia$.value).toBe(true);
});

test("remote media is not in waiting state when participant is connected", () => {
const vm = createRemoteMedia(rtcMembership, {}, mockRemoteParticipant({}));
expect(vm.waitingForMedia$.value).toBe(false);
});

test("remote media is not in waiting state when participant is connected with no publications", () => {
const vm = createRemoteMedia(
rtcMembership,
{},
mockRemoteParticipant({
getTrackPublication: () => undefined,
getTrackPublications: () => [],
}),
);
expect(vm.waitingForMedia$.value).toBe(false);
});

test("remote media is not in waiting state when user does not intend to publish anywhere", () => {
const vm = createRemoteMedia(
rtcMembership,
{},
mockRemoteParticipant({}),
undefined, // No room (no advertised transport)
);
expect(vm.waitingForMedia$.value).toBe(false);
});
Loading
Loading