From 2e4f91f668ea867d768291e9efd3e1c1eb825b97 Mon Sep 17 00:00:00 2001 From: Cas <6506529+ThaUnknown@users.noreply.github.com> Date: Thu, 23 Jun 2022 21:03:37 +0200 Subject: fix: support stream cancelling (#2335) --- index.js | 6 ++- lib/worker-server.js | 129 +++++++++++++++++++++++++++------------------------ lib/worker.js | 4 +- 3 files changed, 75 insertions(+), 64 deletions(-) diff --git a/index.js b/index.js index e7b56a8..1476004 100644 --- a/index.js +++ b/index.js @@ -222,7 +222,11 @@ class WebTorrent extends EventEmitter { this.workerPortCount++ port.postMessage(response) }) - cb(this.serviceWorker) + // test if browser supports cancelling sw Readable Streams + fetch(`${this.serviceWorker.scriptURL.slice(0, this.serviceWorker.scriptURL.lastIndexOf('/') + 1).slice(window.location.origin.length)}webtorrent/cancel/`).then(res => { + res.body.cancel() + }) + cb(null, this.serviceWorker) } get downloadSpeed () { return this._downloadSpeed() } diff --git a/lib/worker-server.js b/lib/worker-server.js index 5e6e56a..428b908 100644 --- a/lib/worker-server.js +++ b/lib/worker-server.js @@ -2,74 +2,81 @@ /* eslint-env serviceworker */ const portTimeoutDuration = 5000 +let cancellable = false module.exports = event => { - const { request } = event - const { url, method, headers, destination } = request + const { url } = event.request if (!url.includes(self.registration.scope + 'webtorrent/')) return null if (url.includes(self.registration.scope + 'webtorrent/keepalive/')) return new Response() + if (url.includes(self.registration.scope + 'webtorrent/cancel/')) { + return new Response(new ReadableStream({ + cancel () { + cancellable = true + } + })) + } + return serve(event) +} + +async function serve ({ request }) { + const { url, method, headers, destination } = request + const clientlist = await clients.matchAll({ type: 'window', includeUncontrolled: true }) + + const [data, port] = await new Promise(resolve => { + // Use race condition for whoever controls the response stream + for (const client of clientlist) { + const messageChannel = new MessageChannel() + const { port1, port2 } = messageChannel + port1.onmessage = ({ data }) => { + resolve([data, port1]) + } + client.postMessage({ + url, + method, + headers: Object.fromEntries(headers.entries()), + scope: self.registration.scope, + destination, + type: 'webtorrent' + }, [port2]) + } + }) + + if (data.body !== 'STREAM' && data.body !== 'DOWNLOAD') return new Response(data.body, data) - return clients.matchAll({ type: 'window', includeUncontrolled: true }) - .then(clients => { + let timeOut = null + return new Response(new ReadableStream({ + pull (controller) { return new Promise(resolve => { - // Use race condition for whoever controls the response stream - for (const client of clients) { - const messageChannel = new MessageChannel() - const { port1, port2 } = messageChannel - port1.onmessage = event => { - resolve([event.data, messageChannel]) + port.onmessage = ({ data }) => { + if (data) { + controller.enqueue(data) // data is Uint8Array + } else { + clearTimeout(timeOut) + controller.close() // data is null, means the stream ended + port.onmessage = null } - client.postMessage({ - url, - method, - headers: Object.fromEntries(headers.entries()), - scope: self.registration.scope, - destination, - type: 'webtorrent' - }, [port2]) + resolve() } - }) - }) - .then(([data, messageChannel]) => { - if (data.body === 'STREAM' || data.body === 'DOWNLOAD') { - let timeOut = null - return new Response(new ReadableStream({ - pull (controller) { - return new Promise(resolve => { - messageChannel.port1.onmessage = event => { - if (event.data) { - controller.enqueue(event.data) // event.data is Uint8Array - } else { - clearTimeout(timeOut) - controller.close() // event.data is null, means the stream ended - messageChannel.port1.onmessage = null - } - resolve() - } - - // 'media player' does NOT signal a close on the stream and we cannot close it because it's locked to the reader, - // so we just empty it after 5s of inactivity, the browser will request another port anyways - clearTimeout(timeOut) - if (data.body === 'STREAM') { - timeOut = setTimeout(() => { - controller.close() - messageChannel.port1.postMessage(false) // send timeout - messageChannel.port1.onmessage = null - resolve() - }, portTimeoutDuration) - } - - messageChannel.port1.postMessage(true) // send a pull request - }) - }, - cancel () { - // This event is never executed - messageChannel.port1.postMessage(false) // send a cancel request + if (!cancellable) { + // firefox doesn't support cancelling of Readable Streams in service workers, + // so we just empty it after 5s of inactivity, the browser will request another port anyways + clearTimeout(timeOut) + if (data.body === 'STREAM') { + timeOut = setTimeout(() => { + controller.close() + port.postMessage(false) // send timeout + port.onmessage = null + resolve() + }, portTimeoutDuration) } - }), data) - } - - return new Response(data.body, data) - }) - .catch(console.error) + } + port.postMessage(true) // send a pull request + }) + }, + cancel () { + port.postMessage(false) // send a cancel request + clearTimeout(timeOut) + port.onmessage = null + } + }), data) } diff --git a/lib/worker.js b/lib/worker.js index 8913cf0..e080aa7 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -11,6 +11,6 @@ self.addEventListener('fetch', event => { if (res) event.respondWith(res) }) -self.addEventListener('activate', evt => { - evt.waitUntil(self.clients.claim()) +self.addEventListener('activate', () => { + self.clients.claim() }) -- cgit v1.2.3