diff options
author | Anatoli Papirovski <apapirovski@mac.com> | 2017-10-03 04:56:53 +0300 |
---|---|---|
committer | Myles Borins <mylesborins@google.com> | 2017-10-11 09:05:44 +0300 |
commit | 10622c63311700719ac778867b62c9fc07ae2c38 (patch) | |
tree | c570c9f655b2c58c7c4d15bdf194fb9ecf414e2a /lib | |
parent | 824b8dfe9e523848b8c70085289d2b65fc1eb79e (diff) |
http2: near full http1 compatibility, add tests
Extensive re-work of http1 compatibility layer based on tests in
express, on-finished and finalhandler. Fix handling of HEAD
method to match http1. Adjust write, end, etc. to call writeHead
as in http1 and as expected by user-land modules. Add socket
proxy that instead uses the Http2Stream for the vast majority of
socket interactions. Add and change tests to closer represent
http1 behaviour.
Refs: https://github.com/nodejs/node/pull/15633
Refs: https://github.com/expressjs/express/tree/master/test
Refs: https://github.com/jshttp/on-finished/blob/master/test/test.js
Refs: https://github.com/pillarjs/finalhandler/blob/master/test/test.js
PR-URL: https://github.com/nodejs/node/pull/15702
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'lib')
-rwxr-xr-x | lib/internal/errors.js | 3 | ||||
-rw-r--r-- | lib/internal/http2/compat.js | 335 | ||||
-rw-r--r-- | lib/internal/http2/core.js | 16 |
3 files changed, 242 insertions, 112 deletions
diff --git a/lib/internal/errors.js b/lib/internal/errors.js index addc856aeaf..496f00e4ceb 100755 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -173,6 +173,9 @@ E('ERR_HTTP2_INVALID_SETTING_VALUE', E('ERR_HTTP2_INVALID_STREAM', 'The stream has been destroyed'); E('ERR_HTTP2_MAX_PENDING_SETTINGS_ACK', (max) => `Maximum number of pending settings acknowledgements (${max})`); +E('ERR_HTTP2_NO_SOCKET_MANIPULATION', + 'HTTP/2 sockets should not be directly read from, written to, ' + + 'paused and/or resumed.'); E('ERR_HTTP2_OUT_OF_STREAMS', 'No stream ID is available because maximum stream ID has been reached'); E('ERR_HTTP2_PAYLOAD_FORBIDDEN', diff --git a/lib/internal/http2/compat.js b/lib/internal/http2/compat.js index 462b0c51b88..84f15b2ed8a 100644 --- a/lib/internal/http2/compat.js +++ b/lib/internal/http2/compat.js @@ -16,10 +16,10 @@ const kHeaders = Symbol('headers'); const kRawHeaders = Symbol('rawHeaders'); const kTrailers = Symbol('trailers'); const kRawTrailers = Symbol('rawTrailers'); +const kProxySocket = Symbol('proxySocket'); +const kSetHeader = Symbol('setHeader'); const { - NGHTTP2_NO_ERROR, - HTTP2_HEADER_AUTHORITY, HTTP2_HEADER_METHOD, HTTP2_HEADER_PATH, @@ -39,7 +39,7 @@ let statusMessageWarned = false; // close as possible to the current require('http') API function assertValidHeader(name, value) { - if (name === '') + if (name === '' || typeof name !== 'string') throw new errors.TypeError('ERR_INVALID_HTTP_TOKEN', 'Header name', name); if (isPseudoHeader(name)) throw new errors.Error('ERR_HTTP2_PSEUDOHEADER_NOT_ALLOWED'); @@ -71,19 +71,24 @@ function statusMessageWarn() { } function onStreamData(chunk) { - if (!this[kRequest].push(chunk)) + const request = this[kRequest]; + if (request !== undefined && !request.push(chunk)) this.pause(); } function onStreamTrailers(trailers, flags, rawTrailers) { const request = this[kRequest]; - Object.assign(request[kTrailers], trailers); - request[kRawTrailers].push(...rawTrailers); + if (request !== undefined) { + Object.assign(request[kTrailers], trailers); + request[kRawTrailers].push(...rawTrailers); + } } function onStreamEnd() { // Cause the request stream to end as well. - this[kRequest].push(null); + const request = this[kRequest]; + if (request !== undefined) + this[kRequest].push(null); } function onStreamError(error) { @@ -97,62 +102,136 @@ function onStreamError(error) { } function onRequestPause() { - const stream = this[kStream]; - if (stream) - stream.pause(); + this[kStream].pause(); } function onRequestResume() { - const stream = this[kStream]; - if (stream) - stream.resume(); + this[kStream].resume(); } function onStreamDrain() { - this[kResponse].emit('drain'); + const response = this[kResponse]; + if (response !== undefined) + response.emit('drain'); } // TODO Http2Stream does not emit 'close' function onStreamClosedRequest() { - this[kRequest].push(null); + const request = this[kRequest]; + if (request !== undefined) + request.push(null); } // TODO Http2Stream does not emit 'close' function onStreamClosedResponse() { - this[kResponse].emit('finish'); + const response = this[kResponse]; + if (response !== undefined) + response.emit('finish'); } -function onStreamAbortedRequest(hadError, code) { +function onStreamAbortedRequest() { const request = this[kRequest]; - if (request[kState].closed === false) { - request.emit('aborted', hadError, code); + if (request !== undefined && request[kState].closed === false) { + request.emit('aborted'); request.emit('close'); } } function onStreamAbortedResponse() { const response = this[kResponse]; - if (response[kState].closed === false) { + if (response !== undefined && response[kState].closed === false) response.emit('close'); - } } function resumeStream(stream) { stream.resume(); } +const proxySocketHandler = { + get(stream, prop) { + switch (prop) { + case 'on': + case 'once': + case 'end': + case 'emit': + case 'destroy': + return stream[prop].bind(stream); + case 'writable': + case 'destroyed': + return stream[prop]; + case 'readable': + if (stream.destroyed) + return false; + const request = stream[kRequest]; + return request ? request.readable : stream.readable; + case 'setTimeout': + const session = stream.session; + if (session !== undefined) + return session.setTimeout.bind(session); + return stream.setTimeout.bind(stream); + case 'write': + case 'read': + case 'pause': + case 'resume': + throw new errors.Error('ERR_HTTP2_NO_SOCKET_MANIPULATION'); + default: + const ref = stream.session !== undefined ? + stream.session.socket : stream; + const value = ref[prop]; + return typeof value === 'function' ? value.bind(ref) : value; + } + }, + getPrototypeOf(stream) { + if (stream.session !== undefined) + return stream.session.socket.constructor.prototype; + return stream.prototype; + }, + set(stream, prop, value) { + switch (prop) { + case 'writable': + case 'readable': + case 'destroyed': + case 'on': + case 'once': + case 'end': + case 'emit': + case 'destroy': + stream[prop] = value; + return true; + case 'setTimeout': + const session = stream.session; + if (session !== undefined) + session[prop] = value; + else + stream[prop] = value; + return true; + case 'write': + case 'read': + case 'pause': + case 'resume': + throw new errors.Error('ERR_HTTP2_NO_SOCKET_MANIPULATION'); + default: + const ref = stream.session !== undefined ? + stream.session.socket : stream; + ref[prop] = value; + return true; + } + } +}; + class Http2ServerRequest extends Readable { constructor(stream, headers, options, rawHeaders) { super(options); this[kState] = { closed: false, - closedCode: NGHTTP2_NO_ERROR + didRead: false, }; this[kHeaders] = headers; this[kRawHeaders] = rawHeaders; this[kTrailers] = {}; this[kRawTrailers] = []; this[kStream] = stream; + stream[kProxySocket] = null; stream[kRequest] = this; // Pause the stream.. @@ -170,12 +249,10 @@ class Http2ServerRequest extends Readable { this.on('resume', onRequestResume); } - get closed() { - return this[kState].closed; - } - - get code() { - return this[kState].closedCode; + get complete() { + return this._readableState.ended || + this[kState].closed || + this[kStream].destroyed; } get stream() { @@ -212,9 +289,10 @@ class Http2ServerRequest extends Readable { get socket() { const stream = this[kStream]; - if (stream === undefined) - return; - return stream.session.socket; + const proxySocket = stream[kProxySocket]; + if (proxySocket === null) + return stream[kProxySocket] = new Proxy(stream, proxySocketHandler); + return proxySocket; } get connection() { @@ -222,9 +300,10 @@ class Http2ServerRequest extends Readable { } _read(nread) { - const stream = this[kStream]; - if (stream !== undefined) { - process.nextTick(resumeStream, stream); + const state = this[kState]; + if (!state.closed) { + state.didRead = true; + process.nextTick(resumeStream, this[kStream]); } else { this.emit('error', new errors.Error('ERR_HTTP2_STREAM_CLOSED')); } @@ -234,6 +313,13 @@ class Http2ServerRequest extends Readable { return this[kHeaders][HTTP2_HEADER_METHOD]; } + set method(method) { + if (typeof method !== 'string' || method.trim() === '') + throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'method', 'string'); + + this[kHeaders][HTTP2_HEADER_METHOD] = method; + } + get authority() { return this[kHeaders][HTTP2_HEADER_AUTHORITY]; } @@ -256,15 +342,17 @@ class Http2ServerRequest extends Readable { this[kStream].setTimeout(msecs, callback); } - [kFinish](code) { + [kFinish]() { const state = this[kState]; if (state.closed) return; - if (code !== undefined) - state.closedCode = Number(code); state.closed = true; this.push(null); - process.nextTick(() => (this[kStream] = undefined)); + this[kStream][kRequest] = undefined; + // if the user didn't interact with incoming data and didn't pipe it, + // dump it for compatibility with http1 + if (!state.didRead && !this._readableState.resumeScheduled) + this.resume(); } } @@ -272,14 +360,16 @@ class Http2ServerResponse extends Stream { constructor(stream, options) { super(options); this[kState] = { + closed: false, + ending: false, + headRequest: false, sendDate: true, statusCode: HTTP_STATUS_OK, - closed: false, - closedCode: NGHTTP2_NO_ERROR }; this[kHeaders] = Object.create(null); this[kTrailers] = Object.create(null); this[kStream] = stream; + stream[kProxySocket] = null; stream[kResponse] = this; this.writable = true; stream.on('drain', onStreamDrain); @@ -290,17 +380,35 @@ class Http2ServerResponse extends Stream { stream.on('finish', onfinish); } + // User land modules such as finalhandler just check truthiness of this + // but if someone is actually trying to use this for more than that + // then we simply can't support such use cases + get _header() { + return this.headersSent; + } + get finished() { const stream = this[kStream]; - return stream === undefined || stream._writableState.ended; + return stream.destroyed || + stream._writableState.ended || + this[kState].closed; } - get closed() { - return this[kState].closed; + get socket() { + // this is compatible with http1 which removes socket reference + // only from ServerResponse but not IncomingMessage + if (this[kState].closed) + return; + + const stream = this[kStream]; + const proxySocket = stream[kProxySocket]; + if (proxySocket === null) + return stream[kProxySocket] = new Proxy(stream, proxySocketHandler); + return proxySocket; } - get code() { - return this[kState].closedCode; + get connection() { + return this.socket; } get stream() { @@ -308,8 +416,7 @@ class Http2ServerResponse extends Stream { } get headersSent() { - const stream = this[kStream]; - return stream !== undefined ? stream.headersSent : this[kState].headersSent; + return this[kStream].headersSent; } get sendDate() { @@ -339,7 +446,7 @@ class Http2ServerResponse extends Stream { name = name.trim().toLowerCase(); assertValidHeader(name, value); - this[kTrailers][name] = String(value); + this[kTrailers][name] = value; } addTrailers(headers) { @@ -379,6 +486,9 @@ class Http2ServerResponse extends Stream { if (typeof name !== 'string') throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'name', 'string'); + if (this[kStream].headersSent) + throw new errors.Error('ERR_HTTP2_HEADERS_SENT'); + name = name.trim().toLowerCase(); delete this[kHeaders][name]; } @@ -387,9 +497,16 @@ class Http2ServerResponse extends Stream { if (typeof name !== 'string') throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'name', 'string'); + if (this[kStream].headersSent) + throw new errors.Error('ERR_HTTP2_HEADERS_SENT'); + + this[kSetHeader](name, value); + } + + [kSetHeader](name, value) { name = name.trim().toLowerCase(); assertValidHeader(name, value); - this[kHeaders][name] = String(value); + this[kHeaders][name] = value; } get statusMessage() { @@ -403,50 +520,45 @@ class Http2ServerResponse extends Stream { } flushHeaders() { - const stream = this[kStream]; - if (stream !== undefined && stream.headersSent === false) - this[kBeginSend](); + const state = this[kState]; + if (!state.closed && !this[kStream].headersSent) + this.writeHead(state.statusCode); } writeHead(statusCode, statusMessage, headers) { - if (typeof statusMessage === 'string') { + const state = this[kState]; + + if (state.closed) + throw new errors.Error('ERR_HTTP2_STREAM_CLOSED'); + if (this[kStream].headersSent) + throw new errors.Error('ERR_HTTP2_HEADERS_SENT'); + + if (typeof statusMessage === 'string') statusMessageWarn(); - } - if (headers === undefined && typeof statusMessage === 'object') { + if (headers === undefined && typeof statusMessage === 'object') headers = statusMessage; - } - - const stream = this[kStream]; - if (stream === undefined) { - throw new errors.Error('ERR_HTTP2_STREAM_CLOSED'); - } - if (stream.headersSent === true) { - throw new errors.Error('ERR_HTTP2_INFO_HEADERS_AFTER_RESPOND'); - } if (typeof headers === 'object') { const keys = Object.keys(headers); let key = ''; for (var i = 0; i < keys.length; i++) { key = keys[i]; - this.setHeader(key, headers[key]); + this[kSetHeader](key, headers[key]); } } - this.statusCode = statusCode; + state.statusCode = statusCode; this[kBeginSend](); } write(chunk, encoding, cb) { - const stream = this[kStream]; - if (typeof encoding === 'function') { cb = encoding; encoding = 'utf8'; } - if (stream === undefined) { + if (this[kState].closed) { const err = new errors.Error('ERR_HTTP2_STREAM_CLOSED'); if (typeof cb === 'function') process.nextTick(cb, err); @@ -454,12 +566,21 @@ class Http2ServerResponse extends Stream { throw err; return; } - this[kBeginSend](); + + const stream = this[kStream]; + if (!stream.headersSent) + this.writeHead(this[kState].statusCode); return stream.write(chunk, encoding, cb); } end(chunk, encoding, cb) { const stream = this[kStream]; + const state = this[kState]; + + if ((state.closed || state.ending) && + state.headRequest === stream.headRequest) { + return false; + } if (typeof chunk === 'function') { cb = chunk; @@ -468,19 +589,28 @@ class Http2ServerResponse extends Stream { cb = encoding; encoding = 'utf8'; } - if (this.finished === true) { - return false; - } - if (chunk !== null && chunk !== undefined) { + + if (chunk !== null && chunk !== undefined) this.write(chunk, encoding); - } + + const isFinished = this.finished; + state.headRequest = stream.headRequest; + state.ending = true; if (typeof cb === 'function') { - stream.once('finish', cb); + if (isFinished) + this.once('finish', cb); + else + stream.once('finish', cb); } - this[kBeginSend]({ endStream: true }); - stream.end(); + if (!stream.headersSent) + this.writeHead(this[kState].statusCode); + + if (isFinished) + this[kFinish](); + else + stream.end(); } destroy(err) { @@ -490,63 +620,52 @@ class Http2ServerResponse extends Stream { } setTimeout(msecs, callback) { - const stream = this[kStream]; if (this[kState].closed) return; - stream.setTimeout(msecs, callback); + this[kStream].setTimeout(msecs, callback); } createPushResponse(headers, callback) { if (typeof callback !== 'function') throw new errors.TypeError('ERR_INVALID_CALLBACK'); - const stream = this[kStream]; - if (stream === undefined) { + if (this[kState].closed) { process.nextTick(callback, new errors.Error('ERR_HTTP2_STREAM_CLOSED')); return; } - stream.pushStream(headers, {}, function(stream, headers, options) { + this[kStream].pushStream(headers, {}, function(stream, headers, options) { const response = new Http2ServerResponse(stream); callback(null, response); }); } - [kBeginSend](options) { - const stream = this[kStream]; - if (stream !== undefined && - stream.destroyed === false && - stream.headersSent === false) { - const headers = this[kHeaders]; - headers[HTTP2_HEADER_STATUS] = this[kState].statusCode; - options = options || Object.create(null); - options.getTrailers = (trailers) => { - Object.assign(trailers, this[kTrailers]); - }; - stream.respond(headers, options); - } + [kBeginSend]() { + const state = this[kState]; + const headers = this[kHeaders]; + headers[HTTP2_HEADER_STATUS] = state.statusCode; + const options = { + endStream: state.ending, + getTrailers: (trailers) => Object.assign(trailers, this[kTrailers]) + }; + this[kStream].respond(headers, options); } - [kFinish](code) { + [kFinish]() { + const stream = this[kStream]; const state = this[kState]; - if (state.closed) + if (state.closed || stream.headRequest !== state.headRequest) return; - if (code !== undefined) - state.closedCode = Number(code); state.closed = true; - state.headersSent = this[kStream].headersSent; - this.end(); - process.nextTick(() => (this[kStream] = undefined)); + this[kProxySocket] = null; + stream[kResponse] = undefined; this.emit('finish'); } // TODO doesn't support callbacks writeContinue() { const stream = this[kStream]; - if (stream === undefined || - stream.headersSent === true || - stream.destroyed === true) { + if (stream.headersSent || this[kState].closed) return false; - } - this[kStream].additionalHeaders({ + stream.additionalHeaders({ [HTTP2_HEADER_STATUS]: HTTP_STATUS_CONTINUE }); return true; diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 0dd8d5e100c..1a01b3fb75d 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -165,8 +165,7 @@ function onSessionHeaders(id, cat, flags, headers) { // For head requests, there must not be a body... // end the writable side immediately. stream.end(); - const state = stream[kState]; - state.headRequest = true; + stream[kState].headRequest = true; } } else { stream = new ClientHttp2Stream(owner, id, { readable: !endOfStream }); @@ -1276,6 +1275,7 @@ class Http2Stream extends Duplex { rst: false, rstCode: NGHTTP2_NO_ERROR, headersSent: false, + headRequest: false, aborted: false, closeHandler: onSessionClose.bind(this) }; @@ -1332,6 +1332,11 @@ class Http2Stream extends Duplex { return this[kState].aborted; } + // true if dealing with a HEAD request + get headRequest() { + return this[kState].headRequest; + } + // The error code reported when this Http2Stream was closed. get rstCode() { return this[kState].rst ? this[kState].rstCode : undefined; @@ -1532,12 +1537,15 @@ function continueStreamDestroy(self, err, callback) { // All done const rst = state.rst; const code = rst ? state.rstCode : NGHTTP2_NO_ERROR; - if (!err && code !== NGHTTP2_NO_ERROR) { + // RST code 8 not emitted as an error as its used by clients to signify + // abort and is already covered by aborted event, also allows more + // seamless compatibility with http1 + if (!err && code !== NGHTTP2_NO_ERROR && code !== NGHTTP2_CANCEL) { err = new errors.Error('ERR_HTTP2_STREAM_ERROR', code); } + callback(err); process.nextTick(emit, self, 'streamClosed', code); debug(`[${sessionName(session[kType])}] stream ${self[kID]} destroyed`); - callback(err); } function finishStreamDestroy(self, handle) { |