Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEugene Ostroukhov <eostroukhov@chromium.org>2017-11-11 03:01:00 +0300
committerEugene Ostroukhov <eostroukhov@google.com>2017-12-12 02:53:21 +0300
commit73ad3f9bea3993b486621aaf9e61484dc37741d4 (patch)
tree259b21142e6c4ceedde66deb1eb37a110e317d9e /src/inspector_socket.cc
parente51fb90a6db53588ab2b884e4309d4eea9e37bbd (diff)
inspector: Fix crash for WS connection
Attaching WS session will now include a roundtrip onto the main thread to make sure there is no other session (e.g. JS bindings) This change also required refactoring WS socket implementation to better support scenarios like this. Fixes: https://github.com/nodejs/node/issues/16852 PR-URL: https://github.com/nodejs/node/pull/17085 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Timothy Gu <timothygu99@gmail.com>
Diffstat (limited to 'src/inspector_socket.cc')
-rw-r--r--src/inspector_socket.cc825
1 files changed, 464 insertions, 361 deletions
diff --git a/src/inspector_socket.cc b/src/inspector_socket.cc
index 49d337b70b1..23b77f6aa56 100644
--- a/src/inspector_socket.cc
+++ b/src/inspector_socket.cc
@@ -1,4 +1,6 @@
#include "inspector_socket.h"
+
+#include "http_parser.h"
#include "util-inl.h"
#define NODE_WANT_INTERNALS 1
@@ -18,12 +20,71 @@
namespace node {
namespace inspector {
-static const char CLOSE_FRAME[] = {'\x88', '\x00'};
+class TcpHolder {
+ public:
+ using Pointer = std::unique_ptr<TcpHolder, void(*)(TcpHolder*)>;
-enum ws_decode_result {
- FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR
+ static Pointer Accept(uv_stream_t* server,
+ InspectorSocket::DelegatePointer delegate);
+ void SetHandler(ProtocolHandler* handler);
+ int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb);
+ uv_tcp_t* tcp() {
+ return &tcp_;
+ }
+ InspectorSocket::Delegate* delegate();
+
+ private:
+ static TcpHolder* From(void* handle) {
+ return node::ContainerOf(&TcpHolder::tcp_,
+ reinterpret_cast<uv_tcp_t*>(handle));
+ }
+ static void OnClosed(uv_handle_t* handle);
+ static void OnDataReceivedCb(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf);
+ static void DisconnectAndDispose(TcpHolder* holder);
+ explicit TcpHolder(InspectorSocket::DelegatePointer delegate);
+ ~TcpHolder() = default;
+ void ReclaimUvBuf(const uv_buf_t* buf, ssize_t read);
+
+ uv_tcp_t tcp_;
+ const InspectorSocket::DelegatePointer delegate_;
+ ProtocolHandler* handler_;
+ std::vector<char> buffer;
+};
+
+
+class ProtocolHandler {
+ public:
+ ProtocolHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp);
+
+ virtual void AcceptUpgrade(const std::string& accept_key) = 0;
+ virtual void OnData(std::vector<char>* data) = 0;
+ virtual void OnEof() = 0;
+ virtual void Write(const std::vector<char> data) = 0;
+ virtual void CancelHandshake() = 0;
+
+ std::string GetHost();
+
+ InspectorSocket* inspector() {
+ return inspector_;
+ }
+
+ static void Shutdown(ProtocolHandler* handler) {
+ handler->Shutdown();
+ }
+
+ protected:
+ virtual ~ProtocolHandler() = default;
+ virtual void Shutdown() = 0;
+ int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb);
+ InspectorSocket::Delegate* delegate();
+
+ InspectorSocket* const inspector_;
+ TcpHolder::Pointer tcp_;
};
+namespace {
+
#if DUMP_READS || DUMP_WRITES
static void dump_hex(const char* buf, size_t len) {
const char* ptr = buf;
@@ -50,64 +111,52 @@ static void dump_hex(const char* buf, size_t len) {
}
#endif
-static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
- buffer->erase(buffer->begin(), buffer->begin() + count);
-}
-
-static void dispose_inspector(uv_handle_t* handle) {
- InspectorSocket* inspector = inspector_from_stream(handle);
- inspector_cb close =
- inspector->ws_mode ? inspector->ws_state->close_cb : nullptr;
- inspector->buffer.clear();
- delete inspector->ws_state;
- inspector->ws_state = nullptr;
- if (close) {
- close(inspector, 0);
- }
-}
-
-static void close_connection(InspectorSocket* inspector) {
- uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp);
- if (!uv_is_closing(socket)) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(socket));
- uv_close(socket, dispose_inspector);
- }
-}
-
-struct WriteRequest {
- WriteRequest(InspectorSocket* inspector, const char* data, size_t size)
- : inspector(inspector)
- , storage(data, data + size)
- , buf(uv_buf_init(&storage[0], storage.size())) {}
+class WriteRequest {
+ public:
+ WriteRequest(ProtocolHandler* handler, const std::vector<char>& buffer)
+ : handler(handler)
+ , storage(buffer)
+ , buf(uv_buf_init(storage.data(), storage.size())) {}
static WriteRequest* from_write_req(uv_write_t* req) {
return node::ContainerOf(&WriteRequest::req, req);
}
- InspectorSocket* const inspector;
+ static void Cleanup(uv_write_t* req, int status) {
+ delete WriteRequest::from_write_req(req);
+ }
+
+ ProtocolHandler* const handler;
std::vector<char> storage;
uv_write_t req;
uv_buf_t buf;
};
-// Cleanup
-static void write_request_cleanup(uv_write_t* req, int status) {
- delete WriteRequest::from_write_req(req);
+void allocate_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
+ *buf = uv_buf_init(new char[len], len);
}
-static int write_to_client(InspectorSocket* inspector,
- const char* msg,
- size_t len,
- uv_write_cb write_cb = write_request_cleanup) {
-#if DUMP_WRITES
- printf("%s (%ld bytes):\n", __FUNCTION__, len);
- dump_hex(msg, len);
-#endif
+static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
+ buffer->erase(buffer->begin(), buffer->begin() + count);
+}
- // Freed in write_request_cleanup
- WriteRequest* wr = new WriteRequest(inspector, msg, len);
- uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&inspector->tcp);
- return uv_write(&wr->req, stream, &wr->buf, 1, write_cb) < 0;
+// Cleanup
+
+static const char CLOSE_FRAME[] = {'\x88', '\x00'};
+
+enum ws_decode_result {
+ FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR
+};
+
+static void generate_accept_string(const std::string& client_key,
+ char (*buffer)[ACCEPT_KEY_LENGTH]) {
+ // Magic string from websockets spec.
+ static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+ std::string input(client_key + ws_magic);
+ char hash[SHA_DIGEST_LENGTH];
+ SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(),
+ reinterpret_cast<unsigned char*>(hash));
+ node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer));
}
// Constants for hybi-10 frame format.
@@ -134,11 +183,11 @@ const size_t kTwoBytePayloadLengthField = 126;
const size_t kEightBytePayloadLengthField = 127;
const size_t kMaskingKeyWidthInBytes = 4;
-static std::vector<char> encode_frame_hybi17(const char* message,
- size_t data_length) {
+static std::vector<char> encode_frame_hybi17(const std::vector<char>& message) {
std::vector<char> frame;
OpCode op_code = kOpCodeText;
frame.push_back(kFinalBit | op_code);
+ const size_t data_length = message.size();
if (data_length <= kMaxSingleBytePayloadLength) {
frame.push_back(static_cast<char>(data_length));
} else if (data_length <= 0xFFFF) {
@@ -158,7 +207,7 @@ static std::vector<char> encode_frame_hybi17(const char* message,
extended_payload_length + 8);
CHECK_EQ(0, remaining);
}
- frame.insert(frame.end(), message, message + data_length);
+ frame.insert(frame.end(), message.begin(), message.end());
return frame;
}
@@ -248,272 +297,368 @@ static ws_decode_result decode_frame_hybi17(const std::vector<char>& buffer,
return closed ? FRAME_CLOSE : FRAME_OK;
}
-static void invoke_read_callback(InspectorSocket* inspector,
- int status, const uv_buf_t* buf) {
- if (inspector->ws_state->read_cb) {
- inspector->ws_state->read_cb(
- reinterpret_cast<uv_stream_t*>(&inspector->tcp), status, buf);
+
+// WS protocol
+class WsHandler : public ProtocolHandler {
+ public:
+ WsHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
+ : ProtocolHandler(inspector, std::move(tcp)),
+ OnCloseSent(&WsHandler::WaitForCloseReply),
+ OnCloseRecieved(&WsHandler::CloseFrameReceived),
+ dispose_(false) { }
+
+ void AcceptUpgrade(const std::string& accept_key) override { }
+ void CancelHandshake() override {}
+
+ void OnEof() override {
+ tcp_.reset();
+ if (dispose_)
+ delete this;
}
-}
-static void shutdown_complete(InspectorSocket* inspector) {
- close_connection(inspector);
-}
+ void OnData(std::vector<char>* data) override {
+ // 1. Parse.
+ int processed = 0;
+ do {
+ processed = ParseWsFrames(*data);
+ // 2. Fix the data size & length
+ if (processed > 0) {
+ remove_from_beginning(data, processed);
+ }
+ } while (processed > 0 && !data->empty());
+ }
-static void on_close_frame_written(uv_write_t* req, int status) {
- WriteRequest* wr = WriteRequest::from_write_req(req);
- InspectorSocket* inspector = wr->inspector;
- delete wr;
- inspector->ws_state->close_sent = true;
- if (inspector->ws_state->received_close) {
- shutdown_complete(inspector);
+ void Write(const std::vector<char> data) {
+ std::vector<char> output = encode_frame_hybi17(data);
+ WriteRaw(output, WriteRequest::Cleanup);
}
-}
-static void close_frame_received(InspectorSocket* inspector) {
- inspector->ws_state->received_close = true;
- if (!inspector->ws_state->close_sent) {
- invoke_read_callback(inspector, 0, 0);
- write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME),
- on_close_frame_written);
- } else {
- shutdown_complete(inspector);
+ protected:
+ void Shutdown() override {
+ if (tcp_) {
+ dispose_ = true;
+ SendClose();
+ } else {
+ delete this;
+ }
}
-}
-static int parse_ws_frames(InspectorSocket* inspector) {
- int bytes_consumed = 0;
- std::vector<char> output;
- bool compressed = false;
-
- ws_decode_result r = decode_frame_hybi17(inspector->buffer,
- true /* client_frame */,
- &bytes_consumed, &output,
- &compressed);
- // Compressed frame means client is ignoring the headers and misbehaves
- if (compressed || r == FRAME_ERROR) {
- invoke_read_callback(inspector, UV_EPROTO, nullptr);
- close_connection(inspector);
- bytes_consumed = 0;
- } else if (r == FRAME_CLOSE) {
- close_frame_received(inspector);
- bytes_consumed = 0;
- } else if (r == FRAME_OK && inspector->ws_state->alloc_cb
- && inspector->ws_state->read_cb) {
- uv_buf_t buffer;
- size_t len = output.size();
- inspector->ws_state->alloc_cb(
- reinterpret_cast<uv_handle_t*>(&inspector->tcp),
- len, &buffer);
- CHECK_GE(buffer.len, len);
- memcpy(buffer.base, &output[0], len);
- invoke_read_callback(inspector, len, &buffer);
- }
- return bytes_consumed;
-}
+ private:
+ using Callback = void (WsHandler::*)(void);
-static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
- *buf = uv_buf_init(new char[len], len);
-}
+ static void OnCloseFrameWritten(uv_write_t* req, int status) {
+ WriteRequest* wr = WriteRequest::from_write_req(req);
+ WsHandler* handler = static_cast<WsHandler*>(wr->handler);
+ delete wr;
+ Callback cb = handler->OnCloseSent;
+ (handler->*cb)();
+ }
-static void reclaim_uv_buf(InspectorSocket* inspector, const uv_buf_t* buf,
- ssize_t read) {
- if (read > 0) {
- std::vector<char>& buffer = inspector->buffer;
- buffer.insert(buffer.end(), buf->base, buf->base + read);
+ void WaitForCloseReply() {
+ OnCloseRecieved = &WsHandler::OnEof;
}
- delete[] buf->base;
-}
-static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf) {
- InspectorSocket* inspector = inspector_from_stream(stream);
- reclaim_uv_buf(inspector, buf, nread);
- if (nread < 0 || nread == UV_EOF) {
- inspector->connection_eof = true;
- if (!inspector->shutting_down && inspector->ws_state->read_cb) {
- inspector->ws_state->read_cb(stream, nread, nullptr);
+ void SendClose() {
+ WriteRaw(std::vector<char>(CLOSE_FRAME, CLOSE_FRAME + sizeof(CLOSE_FRAME)),
+ OnCloseFrameWritten);
+ }
+
+ void CloseFrameReceived() {
+ OnCloseSent = &WsHandler::OnEof;
+ SendClose();
+ }
+
+ int ParseWsFrames(const std::vector<char>& buffer) {
+ int bytes_consumed = 0;
+ std::vector<char> output;
+ bool compressed = false;
+
+ ws_decode_result r = decode_frame_hybi17(buffer,
+ true /* client_frame */,
+ &bytes_consumed, &output,
+ &compressed);
+ // Compressed frame means client is ignoring the headers and misbehaves
+ if (compressed || r == FRAME_ERROR) {
+ OnEof();
+ bytes_consumed = 0;
+ } else if (r == FRAME_CLOSE) {
+ (this->*OnCloseRecieved)();
+ bytes_consumed = 0;
+ } else if (r == FRAME_OK) {
+ delegate()->OnWsFrame(output);
}
- if (inspector->ws_state->close_sent &&
- !inspector->ws_state->received_close) {
- shutdown_complete(inspector); // invoke callback
+ return bytes_consumed;
+ }
+
+
+ Callback OnCloseSent;
+ Callback OnCloseRecieved;
+ bool dispose_;
+};
+
+// HTTP protocol
+class HttpEvent {
+ public:
+ HttpEvent(const std::string& path, bool upgrade,
+ bool isGET, const std::string& ws_key) : path(path),
+ upgrade(upgrade),
+ isGET(isGET),
+ ws_key(ws_key) { }
+
+ std::string path;
+ bool upgrade;
+ bool isGET;
+ std::string ws_key;
+ std::string current_header_;
+};
+
+class HttpHandler : public ProtocolHandler {
+ public:
+ explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
+ : ProtocolHandler(inspector, std::move(tcp)),
+ parsing_value_(false) {
+ http_parser_init(&parser_, HTTP_REQUEST);
+ http_parser_settings_init(&parser_settings);
+ parser_settings.on_header_field = OnHeaderField;
+ parser_settings.on_header_value = OnHeaderValue;
+ parser_settings.on_message_complete = OnMessageComplete;
+ parser_settings.on_url = OnPath;
+ }
+
+ void AcceptUpgrade(const std::string& accept_key) override {
+ char accept_string[ACCEPT_KEY_LENGTH];
+ generate_accept_string(accept_key, &accept_string);
+ const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: ";
+ const char accept_ws_suffix[] = "\r\n\r\n";
+ std::vector<char> reply(accept_ws_prefix,
+ accept_ws_prefix + sizeof(accept_ws_prefix) - 1);
+ reply.insert(reply.end(), accept_string,
+ accept_string + sizeof(accept_string));
+ reply.insert(reply.end(), accept_ws_suffix,
+ accept_ws_suffix + sizeof(accept_ws_suffix) - 1);
+ if (WriteRaw(reply, WriteRequest::Cleanup) >= 0) {
+ inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_)));
+ } else {
+ tcp_.reset();
}
- } else {
- #if DUMP_READS
- printf("%s read %ld bytes\n", __FUNCTION__, nread);
- if (nread > 0) {
- dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread,
- nread);
- }
- #endif
- // 2. Parse.
- int processed = 0;
- do {
- processed = parse_ws_frames(inspector);
- // 3. Fix the buffer size & length
- if (processed > 0) {
- remove_from_beginning(&inspector->buffer, processed);
+ }
+
+ void CancelHandshake() {
+ const char HANDSHAKE_FAILED_RESPONSE[] =
+ "HTTP/1.0 400 Bad Request\r\n"
+ "Content-Type: text/html; charset=UTF-8\r\n\r\n"
+ "WebSockets request was expected\r\n";
+ WriteRaw(std::vector<char>(HANDSHAKE_FAILED_RESPONSE,
+ HANDSHAKE_FAILED_RESPONSE + sizeof(HANDSHAKE_FAILED_RESPONSE) - 1),
+ ThenCloseAndReportFailure);
+ }
+
+
+ void OnEof() override {
+ tcp_.reset();
+ }
+
+ void OnData(std::vector<char>* data) override {
+ http_parser_execute(&parser_, &parser_settings, data->data(), data->size());
+ data->clear();
+ if (parser_.http_errno != HPE_OK) {
+ CancelHandshake();
+ }
+ // Event handling may delete *this
+ std::vector<HttpEvent> events;
+ std::swap(events, events_);
+ for (const HttpEvent& event : events) {
+ bool shouldContinue = event.isGET && !event.upgrade;
+ if (!event.isGET) {
+ CancelHandshake();
+ } else if (!event.upgrade) {
+ delegate()->OnHttpGet(event.path);
+ } else if (event.ws_key.empty()) {
+ CancelHandshake();
+ } else {
+ delegate()->OnSocketUpgrade(event.path, event.ws_key);
}
- } while (processed > 0 && !inspector->buffer.empty());
+ if (!shouldContinue)
+ return;
+ }
}
-}
-int inspector_read_start(InspectorSocket* inspector,
- uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
- CHECK(inspector->ws_mode);
- CHECK(!inspector->shutting_down || read_cb == nullptr);
- inspector->ws_state->close_sent = false;
- inspector->ws_state->alloc_cb = alloc_cb;
- inspector->ws_state->read_cb = read_cb;
- int err =
- uv_read_start(reinterpret_cast<uv_stream_t*>(&inspector->tcp),
- prepare_buffer,
- websockets_data_cb);
- if (err < 0) {
- close_connection(inspector);
- }
- return err;
-}
+ void Write(const std::vector<char> data) override {
+ WriteRaw(data, WriteRequest::Cleanup);
+ }
-void inspector_read_stop(InspectorSocket* inspector) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp));
- inspector->ws_state->alloc_cb = nullptr;
- inspector->ws_state->read_cb = nullptr;
-}
+ protected:
+ void Shutdown() override {
+ delete this;
+ }
-static void generate_accept_string(const std::string& client_key,
- char (*buffer)[ACCEPT_KEY_LENGTH]) {
- // Magic string from websockets spec.
- static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- std::string input(client_key + ws_magic);
- char hash[SHA_DIGEST_LENGTH];
- SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(),
- reinterpret_cast<unsigned char*>(hash));
- node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer));
-}
+ private:
+ static void ThenCloseAndReportFailure(uv_write_t* req, int status) {
+ ProtocolHandler* handler = WriteRequest::from_write_req(req)->handler;
+ WriteRequest::Cleanup(req, status);
+ handler->inspector()->SwitchProtocol(nullptr);
+ }
-static int header_value_cb(http_parser* parser, const char* at, size_t length) {
- static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key";
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- state->parsing_value = true;
- if (state->current_header.size() == sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 &&
- node::StringEqualNoCaseN(state->current_header.data(),
- SEC_WEBSOCKET_KEY_HEADER,
- sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) {
- state->ws_key.append(at, length);
- }
- return 0;
-}
+ static int OnHeaderValue(http_parser* parser, const char* at, size_t length) {
+ static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key";
+ HttpHandler* handler = From(parser);
+ handler->parsing_value_ = true;
+ if (handler->current_header_.size() ==
+ sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 &&
+ node::StringEqualNoCaseN(handler->current_header_.data(),
+ SEC_WEBSOCKET_KEY_HEADER,
+ sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) {
+ handler->ws_key_.append(at, length);
+ }
+ return 0;
+ }
-static int header_field_cb(http_parser* parser, const char* at, size_t length) {
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- if (state->parsing_value) {
- state->parsing_value = false;
- state->current_header.clear();
+ static int OnHeaderField(http_parser* parser, const char* at, size_t length) {
+ HttpHandler* handler = From(parser);
+ if (handler->parsing_value_) {
+ handler->parsing_value_ = false;
+ handler->current_header_.clear();
+ }
+ handler->current_header_.append(at, length);
+ return 0;
}
- state->current_header.append(at, length);
- return 0;
-}
-static int path_cb(http_parser* parser, const char* at, size_t length) {
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- state->path.append(at, length);
- return 0;
-}
+ static int OnPath(http_parser* parser, const char* at, size_t length) {
+ HttpHandler* handler = From(parser);
+ handler->path_.append(at, length);
+ return 0;
+ }
+
+ static HttpHandler* From(http_parser* parser) {
+ return node::ContainerOf(&HttpHandler::parser_, parser);
+ }
+
+ static int OnMessageComplete(http_parser* parser) {
+ // Event needs to be fired after the parser is done.
+ HttpHandler* handler = From(parser);
+ handler->events_.push_back(HttpEvent(handler->path_, parser->upgrade,
+ parser->method == HTTP_GET,
+ handler->ws_key_));
+ handler->path_ = "";
+ handler->ws_key_ = "";
+ handler->parsing_value_ = false;
+ handler->current_header_ = "";
+
+ return 0;
+ }
+
+ bool parsing_value_;
+ http_parser parser_;
+ http_parser_settings parser_settings;
+ std::vector<HttpEvent> events_;
+ std::string current_header_;
+ std::string ws_key_;
+ std::string path_;
+};
+
+} // namespace
-static void handshake_complete(InspectorSocket* inspector) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp));
- handshake_cb callback = inspector->http_parsing_state->callback;
- inspector->ws_state = new ws_state_s();
- inspector->ws_mode = true;
- callback(inspector, kInspectorHandshakeUpgraded,
- inspector->http_parsing_state->path);
+// Any protocol
+ProtocolHandler::ProtocolHandler(InspectorSocket* inspector,
+ TcpHolder::Pointer tcp)
+ : inspector_(inspector), tcp_(std::move(tcp)) {
+ CHECK_NE(nullptr, tcp_);
+ tcp_->SetHandler(this);
}
-static void cleanup_http_parsing_state(InspectorSocket* inspector) {
- delete inspector->http_parsing_state;
- inspector->http_parsing_state = nullptr;
+int ProtocolHandler::WriteRaw(const std::vector<char>& buffer,
+ uv_write_cb write_cb) {
+ return tcp_->WriteRaw(buffer, write_cb);
}
-static void report_handshake_failure_cb(uv_handle_t* handle) {
- dispose_inspector(handle);
- InspectorSocket* inspector = inspector_from_stream(handle);
- handshake_cb cb = inspector->http_parsing_state->callback;
- cleanup_http_parsing_state(inspector);
- cb(inspector, kInspectorHandshakeFailed, std::string());
+InspectorSocket::Delegate* ProtocolHandler::delegate() {
+ return tcp_->delegate();
}
-static void close_and_report_handshake_failure(InspectorSocket* inspector) {
- uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp);
- if (uv_is_closing(socket)) {
- report_handshake_failure_cb(socket);
+std::string ProtocolHandler::GetHost() {
+ char ip[INET6_ADDRSTRLEN];
+ sockaddr_storage addr;
+ int len = sizeof(addr);
+ int err = uv_tcp_getsockname(tcp_->tcp(),
+ reinterpret_cast<struct sockaddr*>(&addr),
+ &len);
+ if (err != 0)
+ return "";
+ if (addr.ss_family == AF_INET6) {
+ const sockaddr_in6* v6 = reinterpret_cast<const sockaddr_in6*>(&addr);
+ err = uv_ip6_name(v6, ip, sizeof(ip));
} else {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(socket));
- uv_close(socket, report_handshake_failure_cb);
+ const sockaddr_in* v4 = reinterpret_cast<const sockaddr_in*>(&addr);
+ err = uv_ip4_name(v4, ip, sizeof(ip));
+ }
+ if (err != 0)
+ return "";
+ return ip;
+}
+
+// RAII uv_tcp_t wrapper
+TcpHolder::TcpHolder(InspectorSocket::DelegatePointer delegate)
+ : tcp_(),
+ delegate_(std::move(delegate)),
+ handler_(nullptr) { }
+
+// static
+TcpHolder::Pointer TcpHolder::Accept(
+ uv_stream_t* server,
+ InspectorSocket::DelegatePointer delegate) {
+ TcpHolder* result = new TcpHolder(std::move(delegate));
+ uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_);
+ int err = uv_tcp_init(server->loop, &result->tcp_);
+ if (err == 0) {
+ err = uv_accept(server, tcp);
+ }
+ if (err == 0) {
+ err = uv_read_start(tcp, allocate_buffer, OnDataReceivedCb);
+ }
+ if (err == 0) {
+ return { result, DisconnectAndDispose };
+ } else {
+ fprintf(stderr, "[%s:%d@%s]\n", __FILE__, __LINE__, __FUNCTION__);
+
+ delete result;
+ return { nullptr, nullptr };
}
}
-static void then_close_and_report_failure(uv_write_t* req, int status) {
- InspectorSocket* inspector = WriteRequest::from_write_req(req)->inspector;
- write_request_cleanup(req, status);
- close_and_report_handshake_failure(inspector);
+void TcpHolder::SetHandler(ProtocolHandler* handler) {
+ handler_ = handler;
}
-static void handshake_failed(InspectorSocket* inspector) {
- const char HANDSHAKE_FAILED_RESPONSE[] =
- "HTTP/1.0 400 Bad Request\r\n"
- "Content-Type: text/html; charset=UTF-8\r\n\r\n"
- "WebSockets request was expected\r\n";
- write_to_client(inspector, HANDSHAKE_FAILED_RESPONSE,
- sizeof(HANDSHAKE_FAILED_RESPONSE) - 1,
- then_close_and_report_failure);
+int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) {
+#if DUMP_WRITES
+ printf("%s (%ld bytes):\n", __FUNCTION__, buffer.size());
+ dump_hex(buffer.data(), buffer.size());
+ printf("\n");
+#endif
+
+ // Freed in write_request_cleanup
+ WriteRequest* wr = new WriteRequest(handler_, buffer);
+ uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_);
+ int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb);
+ if (err < 0)
+ delete wr;
+ return err < 0;
}
-// init_handshake references message_complete_cb
-static void init_handshake(InspectorSocket* socket);
-
-static int message_complete_cb(http_parser* parser) {
- InspectorSocket* inspector = static_cast<InspectorSocket*>(parser->data);
- struct http_parsing_state_s* state = inspector->http_parsing_state;
- if (parser->method != HTTP_GET) {
- handshake_failed(inspector);
- } else if (!parser->upgrade) {
- if (state->callback(inspector, kInspectorHandshakeHttpGet, state->path)) {
- init_handshake(inspector);
- } else {
- handshake_failed(inspector);
- }
- } else if (state->ws_key.empty()) {
- handshake_failed(inspector);
- } else if (state->callback(inspector, kInspectorHandshakeUpgrading,
- state->path)) {
- char accept_string[ACCEPT_KEY_LENGTH];
- generate_accept_string(state->ws_key, &accept_string);
- const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Sec-WebSocket-Accept: ";
- const char accept_ws_suffix[] = "\r\n\r\n";
- std::string reply(accept_ws_prefix, sizeof(accept_ws_prefix) - 1);
- reply.append(accept_string, sizeof(accept_string));
- reply.append(accept_ws_suffix, sizeof(accept_ws_suffix) - 1);
- if (write_to_client(inspector, &reply[0], reply.size()) >= 0) {
- handshake_complete(inspector);
- inspector->http_parsing_state->done = true;
- } else {
- close_and_report_handshake_failure(inspector);
- }
- } else {
- handshake_failed(inspector);
- }
- return 0;
+InspectorSocket::Delegate* TcpHolder::delegate() {
+ return delegate_.get();
+}
+
+// static
+void TcpHolder::OnClosed(uv_handle_t* handle) {
+ delete From(handle);
}
-static void data_received_cb(uv_stream_t* tcp, ssize_t nread,
- const uv_buf_t* buf) {
+void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread,
+ const uv_buf_t* buf) {
#if DUMP_READS
if (nread >= 0) {
printf("%s (%ld bytes)\n", __FUNCTION__, nread);
@@ -522,107 +667,65 @@ static void data_received_cb(uv_stream_t* tcp, ssize_t nread,
printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread));
}
#endif
- InspectorSocket* inspector = inspector_from_stream(tcp);
- reclaim_uv_buf(inspector, buf, nread);
+ TcpHolder* holder = From(tcp);
+ holder->ReclaimUvBuf(buf, nread);
if (nread < 0 || nread == UV_EOF) {
- close_and_report_handshake_failure(inspector);
+ holder->handler_->OnEof();
} else {
- http_parsing_state_s* state = inspector->http_parsing_state;
- http_parser* parser = &state->parser;
- http_parser_execute(parser, &state->parser_settings,
- inspector->buffer.data(), nread);
- remove_from_beginning(&inspector->buffer, nread);
- if (parser->http_errno != HPE_OK) {
- handshake_failed(inspector);
- }
- if (inspector->http_parsing_state->done) {
- cleanup_http_parsing_state(inspector);
- }
+ holder->handler_->OnData(&holder->buffer);
}
}
-static void init_handshake(InspectorSocket* socket) {
- http_parsing_state_s* state = socket->http_parsing_state;
- CHECK_NE(state, nullptr);
- state->current_header.clear();
- state->ws_key.clear();
- state->path.clear();
- state->done = false;
- http_parser_init(&state->parser, HTTP_REQUEST);
- state->parser.data = socket;
- http_parser_settings* settings = &state->parser_settings;
- http_parser_settings_init(settings);
- settings->on_header_field = header_field_cb;
- settings->on_header_value = header_value_cb;
- settings->on_message_complete = message_complete_cb;
- settings->on_url = path_cb;
+// static
+void TcpHolder::DisconnectAndDispose(TcpHolder* holder) {
+ uv_handle_t* handle = reinterpret_cast<uv_handle_t*>(&holder->tcp_);
+ uv_close(handle, OnClosed);
}
-int inspector_accept(uv_stream_t* server, InspectorSocket* socket,
- handshake_cb callback) {
- CHECK_NE(callback, nullptr);
- CHECK_EQ(socket->http_parsing_state, nullptr);
-
- socket->http_parsing_state = new http_parsing_state_s();
- uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&socket->tcp);
- int err = uv_tcp_init(server->loop, &socket->tcp);
-
- if (err == 0) {
- err = uv_accept(server, tcp);
- }
- if (err == 0) {
- init_handshake(socket);
- socket->http_parsing_state->callback = callback;
- err = uv_read_start(tcp, prepare_buffer,
- data_received_cb);
- }
- if (err != 0) {
- uv_close(reinterpret_cast<uv_handle_t*>(tcp), nullptr);
+void TcpHolder::ReclaimUvBuf(const uv_buf_t* buf, ssize_t read) {
+ if (read > 0) {
+ buffer.insert(buffer.end(), buf->base, buf->base + read);
}
- return err;
+ delete[] buf->base;
}
-void inspector_write(InspectorSocket* inspector, const char* data,
- size_t len) {
- if (inspector->ws_mode) {
- std::vector<char> output = encode_frame_hybi17(data, len);
- write_to_client(inspector, &output[0], output.size());
+// Public interface
+InspectorSocket::InspectorSocket()
+ : protocol_handler_(nullptr, ProtocolHandler::Shutdown) { }
+
+InspectorSocket::~InspectorSocket() = default;
+
+// static
+InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server,
+ DelegatePointer delegate) {
+ auto tcp = TcpHolder::Accept(server, std::move(delegate));
+ if (tcp) {
+ InspectorSocket* inspector = new InspectorSocket();
+ inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp)));
+ return InspectorSocket::Pointer(inspector);
} else {
- write_to_client(inspector, data, len);
+ return InspectorSocket::Pointer(nullptr);
}
}
-void inspector_close(InspectorSocket* inspector,
- inspector_cb callback) {
- // libuv throws assertions when closing stream that's already closed - we
- // need to do the same.
- CHECK(!uv_is_closing(reinterpret_cast<uv_handle_t*>(&inspector->tcp)));
- CHECK(!inspector->shutting_down);
- inspector->shutting_down = true;
- inspector->ws_state->close_cb = callback;
- if (inspector->connection_eof) {
- close_connection(inspector);
- } else {
- inspector_read_stop(inspector);
- write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME),
- on_close_frame_written);
- inspector_read_start(inspector, nullptr, nullptr);
- }
+void InspectorSocket::AcceptUpgrade(const std::string& ws_key) {
+ protocol_handler_->AcceptUpgrade(ws_key);
+}
+
+void InspectorSocket::CancelHandshake() {
+ protocol_handler_->CancelHandshake();
+}
+
+std::string InspectorSocket::GetHost() {
+ return protocol_handler_->GetHost();
}
-bool inspector_is_active(const InspectorSocket* inspector) {
- const uv_handle_t* tcp =
- reinterpret_cast<const uv_handle_t*>(&inspector->tcp);
- return !inspector->shutting_down && !uv_is_closing(tcp);
+void InspectorSocket::SwitchProtocol(ProtocolHandler* handler) {
+ protocol_handler_.reset(std::move(handler));
}
-void InspectorSocket::reinit() {
- http_parsing_state = nullptr;
- ws_state = nullptr;
- buffer.clear();
- ws_mode = false;
- shutting_down = false;
- connection_eof = false;
+void InspectorSocket::Write(const char* data, size_t len) {
+ protocol_handler_->Write(std::vector<char>(data, data + len));
}
} // namespace inspector