diff --git a/.changeset/connect-error-listener-leak.md b/.changeset/connect-error-listener-leak.md new file mode 100644 index 00000000..67a37e82 --- /dev/null +++ b/.changeset/connect-error-listener-leak.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Remove FfiClient listener on failed connect to prevent leak diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index e5358d57..61da7d6b 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -235,49 +235,55 @@ export class Room extends (EventEmitter as new () => TypedEmitter FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent); - const res = FfiClient.instance.request({ - message: { - case: 'connect', - value: req, - }, - }); + try { + const res = FfiClient.instance.request({ + message: { + case: 'connect', + value: req, + }, + }); - const cb = await FfiClient.instance.waitFor((ev: FfiEvent) => { - return ev.message.case == 'connect' && ev.message.value.asyncId == res.asyncId; - }); + const cb = await FfiClient.instance.waitFor((ev: FfiEvent) => { + return ev.message.case == 'connect' && ev.message.value.asyncId == res.asyncId; + }); - log.debug('Connect callback received'); - - switch (cb.message.case) { - case 'result': - this.ffiHandle = new FfiHandle(cb.message.value.room!.handle!.id!); - this.e2eeManager = e2eeEnabled && new E2EEManager(this.ffiHandle.handle, e2eeOptions); - - this._token = token; - this._serverUrl = url; - this.info = cb.message.value.room!.info; - this.connectionState = ConnectionState.CONN_CONNECTED; - // Reset the abort controller for this connection session so that - // a previous disconnect doesn't immediately cancel new operations. - this.disconnectController = new AbortController(); - this.localParticipant = new LocalParticipant( - cb.message.value.localParticipant!, - this.ffiEventLock, - this.disconnectController.signal, - ); + log.debug('Connect callback received'); + + switch (cb.message.case) { + case 'result': + this.ffiHandle = new FfiHandle(cb.message.value.room!.handle!.id!); + this.e2eeManager = e2eeEnabled && new E2EEManager(this.ffiHandle.handle, e2eeOptions); + + this._token = token; + this._serverUrl = url; + this.info = cb.message.value.room!.info; + this.connectionState = ConnectionState.CONN_CONNECTED; + // Reset the abort controller for this connection session so that + // a previous disconnect doesn't immediately cancel new operations. + this.disconnectController = new AbortController(); + this.localParticipant = new LocalParticipant( + cb.message.value.localParticipant!, + this.ffiEventLock, + this.disconnectController.signal, + ); - for (const pt of cb.message.value.participants) { - const rp = this.createRemoteParticipant(pt.participant!); + for (const pt of cb.message.value.participants) { + const rp = this.createRemoteParticipant(pt.participant!); - for (const pub of pt.publications) { - const publication = new RemoteTrackPublication(pub); - rp.trackPublications.set(publication.sid!, publication); + for (const pub of pt.publications) { + const publication = new RemoteTrackPublication(pub); + rp.trackPublications.set(publication.sid!, publication); + } } - } - break; - case 'error': - default: - throw new ConnectError(cb.message.value || ''); + break; + case 'error': + default: + throw new ConnectError(cb.message.value || ''); + } + } catch (e) { + FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onFfiEvent); + this.preConnectEvents = []; + throw e; } }