Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/stream-controller-map-leak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Fix stream controller map leak on disconnect and abort
32 changes: 30 additions & 2 deletions packages/livekit-rtc/src/participant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,22 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
async abort(err) {
log.error('Sink error:', err);
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? 'unknown error'),
}),
});
await sendTrailer(trailerReq);
} catch {
// best effort — transport may already be down
}
},
});

Expand Down Expand Up @@ -450,8 +464,22 @@ export class LocalParticipant extends Participant {
});
await sendTrailer(trailerReq);
},
abort(err) {
async abort(err) {
log.error('Sink error:', err);
try {
const trailerReq = new SendStreamTrailerRequest({
senderIdentity,
localParticipantHandle: localHandle,
destinationIdentities,
trailer: new DataStream_Trailer({
streamId,
reason: err instanceof Error ? err.message : String(err ?? 'unknown error'),
}),
});
await sendTrailer(trailerReq);
} catch {
// best effort — transport may already be down
}
},
});

Expand Down
21 changes: 21 additions & 0 deletions packages/livekit-rtc/src/room.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
});

this.cleanupStreamControllers();
FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
this.removeAllListeners();
}
Expand Down Expand Up @@ -599,6 +600,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
/*} else if (ev.case == 'connected') {
this.emit(RoomEvent.Connected);*/
} else if (ev.case == 'disconnected') {
this.cleanupStreamControllers();
this.emit(RoomEvent.Disconnected, ev.value.reason!);
} else if (ev.case == 'reconnecting') {
this.emit(RoomEvent.Reconnecting);
Expand Down Expand Up @@ -696,6 +698,25 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return participant;
}

private cleanupStreamControllers() {
for (const [streamId, entry] of this.byteStreamControllers) {
try {
entry.controller.error(new Error('Room disconnected'));
} catch {
// controller may already be closed
}
this.byteStreamControllers.delete(streamId);
}
for (const [streamId, entry] of this.textStreamControllers) {
try {
entry.controller.error(new Error('Room disconnected'));
} catch {
// controller may already be closed
}
this.textStreamControllers.delete(streamId);
}
}

private handleStreamHeader(streamHeader: DataStream_Header, participantIdentity: string) {
if (streamHeader.contentHeader.case === 'byteHeader') {
const streamHandlerCallback = this.byteStreamHandlers.get(streamHeader.topic ?? '');
Expand Down
134 changes: 134 additions & 0 deletions packages/livekit-rtc/src/tests/room.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { DataStream_Chunk, DataStream_Header } from '@livekit/rtc-ffi-bindings';
import { describe, expect, it } from 'vitest';
import { Room } from '../room.js';

function createMockStreamController() {
let controller!: ReadableStreamDefaultController<DataStream_Chunk>;
const stream = new ReadableStream<DataStream_Chunk>({
start(c) {
controller = c;
},
});

// Keep a reader so the stream stays alive
const reader = stream.getReader();

return {
header: new DataStream_Header(),
controller,
startTime: Date.now(),
reader,
};
}

describe('Room', () => {
describe('cleanupStreamControllers', () => {
it('should clear byteStreamControllers on disconnect cleanup', () => {
const room = new Room();
const roomAny = room as any;

const mock = createMockStreamController();
roomAny.byteStreamControllers.set('stream-1', {
header: mock.header,
controller: mock.controller,
startTime: mock.startTime,
});

expect(roomAny.byteStreamControllers.size).toBe(1);

roomAny.cleanupStreamControllers();

expect(roomAny.byteStreamControllers.size).toBe(0);
});

it('should clear textStreamControllers on disconnect cleanup', () => {
const room = new Room();
const roomAny = room as any;

const mock = createMockStreamController();
roomAny.textStreamControllers.set('stream-1', {
header: mock.header,
controller: mock.controller,
startTime: mock.startTime,
});

expect(roomAny.textStreamControllers.size).toBe(1);

roomAny.cleanupStreamControllers();

expect(roomAny.textStreamControllers.size).toBe(0);
});

it('should error open stream controllers so consumers are not left hanging', async () => {
const room = new Room();
const roomAny = room as any;

const mock = createMockStreamController();
roomAny.byteStreamControllers.set('stream-1', {
header: mock.header,
controller: mock.controller,
startTime: mock.startTime,
});

roomAny.cleanupStreamControllers();

await expect(mock.reader.read()).rejects.toThrow('Room disconnected');
});

it('should handle cleanup when controllers are already closed', () => {
const room = new Room();
const roomAny = room as any;

const mock = createMockStreamController();
// Close the controller before cleanup
mock.controller.close();

roomAny.byteStreamControllers.set('stream-1', {
header: mock.header,
controller: mock.controller,
startTime: mock.startTime,
});

// Should not throw
roomAny.cleanupStreamControllers();

expect(roomAny.byteStreamControllers.size).toBe(0);
});

it('should clean up multiple streams from both maps', () => {
const room = new Room();
const roomAny = room as any;

const byteMock1 = createMockStreamController();
const byteMock2 = createMockStreamController();
const textMock1 = createMockStreamController();

roomAny.byteStreamControllers.set('byte-1', {
header: byteMock1.header,
controller: byteMock1.controller,
startTime: byteMock1.startTime,
});
roomAny.byteStreamControllers.set('byte-2', {
header: byteMock2.header,
controller: byteMock2.controller,
startTime: byteMock2.startTime,
});
roomAny.textStreamControllers.set('text-1', {
header: textMock1.header,
controller: textMock1.controller,
startTime: textMock1.startTime,
});

expect(roomAny.byteStreamControllers.size).toBe(2);
expect(roomAny.textStreamControllers.size).toBe(1);

roomAny.cleanupStreamControllers();

expect(roomAny.byteStreamControllers.size).toBe(0);
expect(roomAny.textStreamControllers.size).toBe(0);
});
});
});