diff --git a/.changeset/stream-controller-map-leak.md b/.changeset/stream-controller-map-leak.md new file mode 100644 index 00000000..ddc056cc --- /dev/null +++ b/.changeset/stream-controller-map-leak.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Fix stream controller map leak on disconnect and abort diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 3c69ef04..82464546 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -335,8 +335,22 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? 'unknown error'), + }), + }); + await sendTrailer(trailerReq); + } catch { + // best effort — transport may already be down + } }, }); @@ -450,8 +464,22 @@ export class LocalParticipant extends Participant { }); await sendTrailer(trailerReq); }, - abort(err) { + async abort(err) { log.error('Sink error:', err); + try { + const trailerReq = new SendStreamTrailerRequest({ + senderIdentity, + localParticipantHandle: localHandle, + destinationIdentities, + trailer: new DataStream_Trailer({ + streamId, + reason: err instanceof Error ? err.message : String(err ?? 'unknown error'), + }), + }); + await sendTrailer(trailerReq); + } catch { + // best effort — transport may already be down + } }, }); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 6ef6bf63..d5f9bcc7 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -283,6 +283,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId; }); + this.cleanupStreamControllers(); FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); this.removeAllListeners(); } @@ -599,6 +600,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter /*} else if (ev.case == 'connected') { this.emit(RoomEvent.Connected);*/ } else if (ev.case == 'disconnected') { + this.cleanupStreamControllers(); this.emit(RoomEvent.Disconnected, ev.value.reason!); } else if (ev.case == 'reconnecting') { this.emit(RoomEvent.Reconnecting); @@ -696,6 +698,25 @@ export class Room extends (EventEmitter as new () => TypedEmitter return participant; } + private cleanupStreamControllers() { + for (const [streamId, entry] of this.byteStreamControllers) { + try { + entry.controller.error(new Error('Room disconnected')); + } catch { + // controller may already be closed + } + this.byteStreamControllers.delete(streamId); + } + for (const [streamId, entry] of this.textStreamControllers) { + try { + entry.controller.error(new Error('Room disconnected')); + } catch { + // controller may already be closed + } + this.textStreamControllers.delete(streamId); + } + } + private handleStreamHeader(streamHeader: DataStream_Header, participantIdentity: string) { if (streamHeader.contentHeader.case === 'byteHeader') { const streamHandlerCallback = this.byteStreamHandlers.get(streamHeader.topic ?? ''); diff --git a/packages/livekit-rtc/src/tests/room.test.ts b/packages/livekit-rtc/src/tests/room.test.ts new file mode 100644 index 00000000..9efb4c63 --- /dev/null +++ b/packages/livekit-rtc/src/tests/room.test.ts @@ -0,0 +1,134 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { DataStream_Chunk, DataStream_Header } from '@livekit/rtc-ffi-bindings'; +import { describe, expect, it } from 'vitest'; +import { Room } from '../room.js'; + +function createMockStreamController() { + let controller!: ReadableStreamDefaultController; + const stream = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + // Keep a reader so the stream stays alive + const reader = stream.getReader(); + + return { + header: new DataStream_Header(), + controller, + startTime: Date.now(), + reader, + }; +} + +describe('Room', () => { + describe('cleanupStreamControllers', () => { + it('should clear byteStreamControllers on disconnect cleanup', () => { + const room = new Room(); + const roomAny = room as any; + + const mock = createMockStreamController(); + roomAny.byteStreamControllers.set('stream-1', { + header: mock.header, + controller: mock.controller, + startTime: mock.startTime, + }); + + expect(roomAny.byteStreamControllers.size).toBe(1); + + roomAny.cleanupStreamControllers(); + + expect(roomAny.byteStreamControllers.size).toBe(0); + }); + + it('should clear textStreamControllers on disconnect cleanup', () => { + const room = new Room(); + const roomAny = room as any; + + const mock = createMockStreamController(); + roomAny.textStreamControllers.set('stream-1', { + header: mock.header, + controller: mock.controller, + startTime: mock.startTime, + }); + + expect(roomAny.textStreamControllers.size).toBe(1); + + roomAny.cleanupStreamControllers(); + + expect(roomAny.textStreamControllers.size).toBe(0); + }); + + it('should error open stream controllers so consumers are not left hanging', async () => { + const room = new Room(); + const roomAny = room as any; + + const mock = createMockStreamController(); + roomAny.byteStreamControllers.set('stream-1', { + header: mock.header, + controller: mock.controller, + startTime: mock.startTime, + }); + + roomAny.cleanupStreamControllers(); + + await expect(mock.reader.read()).rejects.toThrow('Room disconnected'); + }); + + it('should handle cleanup when controllers are already closed', () => { + const room = new Room(); + const roomAny = room as any; + + const mock = createMockStreamController(); + // Close the controller before cleanup + mock.controller.close(); + + roomAny.byteStreamControllers.set('stream-1', { + header: mock.header, + controller: mock.controller, + startTime: mock.startTime, + }); + + // Should not throw + roomAny.cleanupStreamControllers(); + + expect(roomAny.byteStreamControllers.size).toBe(0); + }); + + it('should clean up multiple streams from both maps', () => { + const room = new Room(); + const roomAny = room as any; + + const byteMock1 = createMockStreamController(); + const byteMock2 = createMockStreamController(); + const textMock1 = createMockStreamController(); + + roomAny.byteStreamControllers.set('byte-1', { + header: byteMock1.header, + controller: byteMock1.controller, + startTime: byteMock1.startTime, + }); + roomAny.byteStreamControllers.set('byte-2', { + header: byteMock2.header, + controller: byteMock2.controller, + startTime: byteMock2.startTime, + }); + roomAny.textStreamControllers.set('text-1', { + header: textMock1.header, + controller: textMock1.controller, + startTime: textMock1.startTime, + }); + + expect(roomAny.byteStreamControllers.size).toBe(2); + expect(roomAny.textStreamControllers.size).toBe(1); + + roomAny.cleanupStreamControllers(); + + expect(roomAny.byteStreamControllers.size).toBe(0); + expect(roomAny.textStreamControllers.size).toBe(0); + }); + }); +});