diff options
author | Cas <6506529+ThaUnknown@users.noreply.github.com> | 2021-08-20 00:41:34 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-20 00:41:34 +0300 |
commit | 604943e325c68721251a71c29d94e6a07ce0b31c (patch) | |
tree | d6728521ed5d4f13e5a3484f7d670756e89a832b | |
parent | a46a7a51d9fd8efef86533c5868a2d23b1346b6e (diff) |
feat: add service worker server as an alternative to renderMedia (#2098)
* feat: add service worker server as an alternative to renderMedia
* code QL
* thanks gh auto merge
Co-authored-by: Diego RodrÃguez Baquero <github@diegorbaquero.com>
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | index.js | 69 | ||||
-rw-r--r-- | lib/file.js | 76 | ||||
-rw-r--r-- | lib/worker-server.js | 75 | ||||
-rw-r--r-- | lib/worker.js | 16 | ||||
-rw-r--r-- | package.json | 5 |
6 files changed, 240 insertions, 2 deletions
@@ -1,3 +1,4 @@ node_modules/ package-lock.json webtorrent.debug.js +sw.debug.js @@ -1,5 +1,6 @@ /*! webtorrent. MIT License. WebTorrent LLC <https://webtorrent.io/opensource> */ -/* global FileList */ +/* global FileList, ServiceWorker */ +/* eslint-env browser */ const { EventEmitter } = require('events') const concat = require('simple-concat') @@ -81,6 +82,10 @@ class WebTorrent extends EventEmitter { this._downloadLimit = Math.max((typeof opts.downloadLimit === 'number') ? opts.downloadLimit : -1, -1) this._uploadLimit = Math.max((typeof opts.uploadLimit === 'number') ? opts.uploadLimit : -1, -1) + this.serviceWorker = null + this.workerKeepAliveInterval = null + this.workerPortCount = 0 + if (opts.secure === true) { require('./lib/peer').enableSecure() } @@ -157,6 +162,68 @@ class WebTorrent extends EventEmitter { } } + /** + * Accepts an existing service worker registration [navigator.serviceWorker.controller] + * which must be activated, "creates" a file server for streamed file rendering to use. + * + * @param {ServiceWorker} controller + * @param {function=} cb + * @return {null} + */ + loadWorker (controller, cb = () => {}) { + if (!(controller instanceof ServiceWorker)) throw new Error('Invalid worker registration') + if (controller.state !== 'activated') throw new Error('Worker isn\'t activated') + const keepAliveTime = 20000 + + this.serviceWorker = controller + + navigator.serviceWorker.addEventListener('message', event => { + const { data } = event + if (!data.type || !data.type === 'webtorrent' || !data.url) return null + let [infoHash, ...filePath] = data.url.slice(data.url.indexOf(data.scope + 'webtorrent/') + 11 + data.scope.length).split('/') + filePath = decodeURI(filePath.join('/')) + if (!infoHash || !filePath) return null + + const [port] = event.ports + + const file = this.get(infoHash) && this.get(infoHash).files.find(file => file.path === filePath) + if (!file) return null + + const [response, stream, raw] = file._serve(data) + const asyncIterator = stream && stream[Symbol.asyncIterator]() + + const cleanup = () => { + port.onmessage = null + if (stream) stream.destroy() + if (raw) raw.destroy() + this.workerPortCount-- + if (!this.workerPortCount) { + clearInterval(this.workerKeepAliveInterval) + this.workerKeepAliveInterval = null + } + } + + port.onmessage = async msg => { + if (msg.data) { + let chunk + try { + chunk = (await asyncIterator.next()).value + } catch (e) { + // chunk is yet to be downloaded or it somehow failed, should this be logged? + } + port.postMessage(chunk) + if (!chunk) cleanup() + if (!this.workerKeepAliveInterval) this.workerKeepAliveInterval = setInterval(() => fetch(`${this.serviceWorker.scriptURL.substr(0, this.serviceWorker.scriptURL.lastIndexOf('/') + 1).slice(window.location.origin.length)}webtorrent/keepalive/`), keepAliveTime) + } else { + cleanup() + } + } + this.workerPortCount++ + port.postMessage(response) + }) + cb(this.serviceWorker) + } + get downloadSpeed () { return this._downloadSpeed() } get uploadSpeed () { return this._uploadSpeed() } diff --git a/lib/file.js b/lib/file.js index fd9ac7c..f3de744 100644 --- a/lib/file.js +++ b/lib/file.js @@ -7,6 +7,9 @@ const streamToBlobURL = require('stream-to-blob-url') const streamToBuffer = require('stream-with-known-length-to-buffer') const FileStream = require('./file-stream') const queueMicrotask = require('queue-microtask') +const rangeParser = require('range-parser') +const mime = require('mime') +const eos = require('end-of-stream') class File extends EventEmitter { constructor (torrent, file) { @@ -33,6 +36,8 @@ class File extends EventEmitter { this.done = true this.emit('done') } + + this._serviceWorker = torrent.client.serviceWorker } get downloaded () { @@ -144,6 +149,77 @@ class File extends EventEmitter { render.render(this, elem, opts, cb) } + _serve (req) { + const res = { + status: 200, + headers: { + // Support range-requests + 'Accept-Ranges': 'bytes', + 'Content-Type': mime.getType(this.name), + 'Cache-Control': 'no-cache, no-store, must-revalidate, max-age=0', + Expires: '0' + }, + body: req.method === 'HEAD' ? '' : 'STREAM' + } + // force the browser to download the file if if it's opened in a new tab + if (req.destination === 'document') { + res.headers['Content-Type'] = 'application/octet-stream' + res.headers['Content-Disposition'] = 'attachment' + res.body = 'DOWNLOAD' + } + + // `rangeParser` returns an array of ranges, or an error code (number) if + // there was an error parsing the range. + let range = rangeParser(this.length, req.headers.range || '') + + if (range.constructor === Array) { + res.status = 206 // indicates that range-request was understood + + // no support for multi-range request, just use the first range + range = range[0] + + res.headers['Content-Range'] = `bytes ${range.start}-${range.end}/${this.length}` + res.headers['Content-Length'] = `${range.end - range.start + 1}` + } else { + res.headers['Content-Length'] = this.length + } + + const stream = req.method === 'GET' && this.createReadStream(range) + + let pipe = null + if (stream) { + this.emit('stream', { stream, req, file: this }, piped => { + pipe = piped + + // piped stream might not close the original filestream on close/error, this is agressive but necessary + eos(piped, () => { + if (piped) piped.destroy() + stream.destroy() + }) + }) + } + + return [res, pipe || stream, pipe && stream] + } + + getStreamURL (cb = () => {}) { + if (typeof window === 'undefined') throw new Error('browser-only method') + if (!this._serviceWorker) throw new Error('No worker registered') + if (this._serviceWorker.state !== 'activated') throw new Error('Worker isn\'t activated') + const workerPath = this._serviceWorker.scriptURL.substr(0, this._serviceWorker.scriptURL.lastIndexOf('/') + 1).slice(window.location.origin.length) + const url = `${workerPath}webtorrent/${this._torrent.infoHash}/${encodeURI(this.path)}` + cb(null, url) + } + + streamTo (elem, cb = () => {}) { + if (typeof window === 'undefined') throw new Error('browser-only method') + if (!this._serviceWorker) throw new Error('No worker registered') + if (this._serviceWorker.state !== 'activated') throw new Error('Worker isn\'t activated') + const workerPath = this._serviceWorker.scriptURL.substr(0, this._serviceWorker.scriptURL.lastIndexOf('/') + 1).slice(window.location.origin.length) + elem.src = `${workerPath}webtorrent/${this._torrent.infoHash}/${encodeURI(this.path)}` + cb(null, elem) + } + _getMimeType () { return render.mime[path.extname(this.name).toLowerCase()] } diff --git a/lib/worker-server.js b/lib/worker-server.js new file mode 100644 index 0000000..5e6e56a --- /dev/null +++ b/lib/worker-server.js @@ -0,0 +1,75 @@ +/* global clients, MessageChannel, ReadableStream, Response */ +/* eslint-env serviceworker */ + +const portTimeoutDuration = 5000 + +module.exports = event => { + const { request } = event + const { url, method, headers, destination } = request + if (!url.includes(self.registration.scope + 'webtorrent/')) return null + if (url.includes(self.registration.scope + 'webtorrent/keepalive/')) return new Response() + + return clients.matchAll({ type: 'window', includeUncontrolled: true }) + .then(clients => { + return new Promise(resolve => { + // Use race condition for whoever controls the response stream + for (const client of clients) { + const messageChannel = new MessageChannel() + const { port1, port2 } = messageChannel + port1.onmessage = event => { + resolve([event.data, messageChannel]) + } + client.postMessage({ + url, + method, + headers: Object.fromEntries(headers.entries()), + scope: self.registration.scope, + destination, + type: 'webtorrent' + }, [port2]) + } + }) + }) + .then(([data, messageChannel]) => { + if (data.body === 'STREAM' || data.body === 'DOWNLOAD') { + let timeOut = null + return new Response(new ReadableStream({ + pull (controller) { + return new Promise(resolve => { + messageChannel.port1.onmessage = event => { + if (event.data) { + controller.enqueue(event.data) // event.data is Uint8Array + } else { + clearTimeout(timeOut) + controller.close() // event.data is null, means the stream ended + messageChannel.port1.onmessage = null + } + resolve() + } + + // 'media player' does NOT signal a close on the stream and we cannot close it because it's locked to the reader, + // so we just empty it after 5s of inactivity, the browser will request another port anyways + clearTimeout(timeOut) + if (data.body === 'STREAM') { + timeOut = setTimeout(() => { + controller.close() + messageChannel.port1.postMessage(false) // send timeout + messageChannel.port1.onmessage = null + resolve() + }, portTimeoutDuration) + } + + messageChannel.port1.postMessage(true) // send a pull request + }) + }, + cancel () { + // This event is never executed + messageChannel.port1.postMessage(false) // send a cancel request + } + }), data) + } + + return new Response(data.body, data) + }) + .catch(console.error) +} diff --git a/lib/worker.js b/lib/worker.js new file mode 100644 index 0000000..47c1127 --- /dev/null +++ b/lib/worker.js @@ -0,0 +1,16 @@ +/* eslint-env serviceworker */ + +const fileResponse = require('./worker-server') + +self.addEventListener('install', () => { + self.skipWaiting() +}) + +self.addEventListener('fetch', event => { + const res = fileResponse(event) + if (res) event.respondWith(res) +}) + +self.addEventListener('activate', evt => { + evt.waitUntil(self.clients.claim()) +}) diff --git a/package.json b/package.json index 072f4a0..92fa2de 100644 --- a/package.json +++ b/package.json @@ -144,11 +144,13 @@ "url": "git://github.com/webtorrent/webtorrent.git" }, "scripts": { - "build": "npm run build-js && npm run build-chromeapp", + "build": "npm run build-js && npm run build-js-worker && npm run build-chromeapp", "build-chromeapp": "browserify --browser-field=chromeapp --standalone WebTorrent . | minify --mangle=false > webtorrent.chromeapp.js", "build-chromeapp-debug": "browserify --browser-field=chromeapp --standalone WebTorrent . > webtorrent.chromeapp.js", "build-js": "browserify --standalone WebTorrent . | minify --mangle=false > webtorrent.min.js", + "build-js-worker": "browserify ./lib/worker.js | minify --mangle=false > sw.min.js", "build-js-debug": "browserify --standalone WebTorrent . > webtorrent.debug.js", + "build-js-worker-debug": "browserify ./lib/worker.js > sw.debug.js", "prepublishOnly": "npm run build && npm run update-authors", "preversion": "npm run build && npm run update-authors", "size": "npm run size-js && npm run size-disc", @@ -163,6 +165,7 @@ "standard": { "ignore": [ "webtorrent.min.js", + "sw.min.js", "webtorrent.chromeapp.js" ] }, |