diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/inspector_agent.cc | 4 | ||||
-rw-r--r-- | src/inspector_agent.h | 1 | ||||
-rw-r--r-- | src/inspector_io.cc | 73 | ||||
-rw-r--r-- | src/inspector_io.h | 8 | ||||
-rw-r--r-- | src/inspector_socket.cc | 825 | ||||
-rw-r--r-- | src/inspector_socket.h | 96 | ||||
-rw-r--r-- | src/inspector_socket_server.cc | 306 | ||||
-rw-r--r-- | src/inspector_socket_server.h | 28 | ||||
-rw-r--r-- | src/node.cc | 2 |
9 files changed, 699 insertions, 644 deletions
diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 9677dca37b5..216b43ca6c3 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -548,10 +548,6 @@ void Agent::Connect(InspectorSessionDelegate* delegate) { client_->connectFrontend(delegate); } -bool Agent::IsConnected() { - return io_ && io_->IsConnected(); -} - void Agent::WaitForDisconnect() { CHECK_NE(client_, nullptr); client_->contextDestroyed(parent_env_->context()); diff --git a/src/inspector_agent.h b/src/inspector_agent.h index 29b9546b514..a2d61d0c8db 100644 --- a/src/inspector_agent.h +++ b/src/inspector_agent.h @@ -48,7 +48,6 @@ class Agent { bool IsStarted() { return !!client_; } // IO thread started, and client connected - bool IsConnected(); bool IsWaitingForConnect(); void WaitForDisconnect(); diff --git a/src/inspector_io.cc b/src/inspector_io.cc index 538cbab3c9f..9af4458c6b2 100644 --- a/src/inspector_io.cc +++ b/src/inspector_io.cc @@ -136,7 +136,7 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate { const std::string& script_name, bool wait); // Calls PostIncomingMessage() with appropriate InspectorAction: // kStartSession - bool StartSession(int session_id, const std::string& target_id) override; + void StartSession(int session_id, const std::string& target_id) override; // kSendMessage void MessageReceived(int session_id, const std::string& message) override; // kEndSession @@ -145,19 +145,22 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate { std::vector<std::string> GetTargetIds() override; std::string GetTargetTitle(const std::string& id) override; std::string GetTargetUrl(const std::string& id) override; - bool IsConnected() { return connected_; } void ServerDone() override { io_->ServerDone(); } + void AssignTransport(InspectorSocketServer* server) { + server_ = server; + } + private: InspectorIo* io_; - bool connected_; int session_id_; const std::string script_name_; const std::string script_path_; const std::string target_id_; bool waiting_; + InspectorSocketServer* server_; }; void InterruptCallback(v8::Isolate*, void* agent) { @@ -226,10 +229,6 @@ void InspectorIo::Stop() { DispatchMessages(); } -bool InspectorIo::IsConnected() { - return delegate_ != nullptr && delegate_->IsConnected(); -} - bool InspectorIo::IsStarted() { return platform_ != nullptr; } @@ -264,6 +263,7 @@ void InspectorIo::IoThreadAsyncCb(uv_async_t* async) { MessageQueue<TransportAction> outgoing_message_queue; io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue); for (const auto& outgoing : outgoing_message_queue) { + int session_id = std::get<1>(outgoing); switch (std::get<0>(outgoing)) { case TransportAction::kKill: transport->TerminateConnections(); @@ -272,8 +272,14 @@ void InspectorIo::IoThreadAsyncCb(uv_async_t* async) { transport->Stop(nullptr); break; case TransportAction::kSendMessage: - std::string message = StringViewToUtf8(std::get<2>(outgoing)->string()); - transport->Send(std::get<1>(outgoing), message); + transport->Send(session_id, + StringViewToUtf8(std::get<2>(outgoing)->string())); + break; + case TransportAction::kAcceptSession: + transport->AcceptSession(session_id); + break; + case TransportAction::kDeclineSession: + transport->DeclineSession(session_id); break; } } @@ -293,6 +299,7 @@ void InspectorIo::ThreadMain() { wait_for_connect_); delegate_ = &delegate; Transport server(&delegate, &loop, options_.host_name(), options_.port()); + delegate.AssignTransport(&server); TransportAndIo<Transport> queue_transport(&server, this); thread_req_.data = &queue_transport; if (!server.Start()) { @@ -308,6 +315,7 @@ void InspectorIo::ThreadMain() { uv_run(&loop, UV_RUN_DEFAULT); thread_req_.data = nullptr; CHECK_EQ(uv_loop_close(&loop), 0); + delegate.AssignTransport(nullptr); delegate_ = nullptr; } @@ -358,6 +366,21 @@ void InspectorIo::NotifyMessageReceived() { incoming_message_cond_.Broadcast(scoped_lock); } +TransportAction InspectorIo::Attach(int session_id) { + Agent* agent = parent_env_->inspector_agent(); + if (agent->delegate() != nullptr) + return TransportAction::kDeclineSession; + + CHECK_EQ(session_delegate_, nullptr); + session_id_ = session_id; + state_ = State::kConnected; + fprintf(stderr, "Debugger attached.\n"); + session_delegate_ = std::unique_ptr<InspectorSessionDelegate>( + new IoSessionDelegate(this)); + agent->Connect(session_delegate_.get()); + return TransportAction::kAcceptSession; +} + void InspectorIo::DispatchMessages() { // This function can be reentered if there was an incoming message while // V8 was processing another inspector request (e.g. if the user is @@ -375,16 +398,14 @@ void InspectorIo::DispatchMessages() { MessageQueue<InspectorAction>::value_type task; std::swap(dispatching_message_queue_.front(), task); dispatching_message_queue_.pop_front(); + int id = std::get<1>(task); StringView message = std::get<2>(task)->string(); switch (std::get<0>(task)) { case InspectorAction::kStartSession: - CHECK_EQ(session_delegate_, nullptr); - session_id_ = std::get<1>(task); - state_ = State::kConnected; - fprintf(stderr, "Debugger attached.\n"); - session_delegate_ = std::unique_ptr<InspectorSessionDelegate>( - new IoSessionDelegate(this)); - parent_env_->inspector_agent()->Connect(session_delegate_.get()); + Write(Attach(id), id, StringView()); + break; + case InspectorAction::kStartSessionUnconditionally: + Attach(id); break; case InspectorAction::kEndSession: CHECK_NE(session_delegate_, nullptr); @@ -428,22 +449,23 @@ InspectorIoDelegate::InspectorIoDelegate(InspectorIo* io, const std::string& script_name, bool wait) : io_(io), - connected_(false), session_id_(0), script_name_(script_name), script_path_(script_path), target_id_(GenerateID()), - waiting_(wait) { } + waiting_(wait), + server_(nullptr) { } -bool InspectorIoDelegate::StartSession(int session_id, +void InspectorIoDelegate::StartSession(int session_id, const std::string& target_id) { - if (connected_) - return false; - connected_ = true; - session_id_++; - io_->PostIncomingMessage(InspectorAction::kStartSession, session_id, ""); - return true; + session_id_ = session_id; + InspectorAction action = InspectorAction::kStartSession; + if (waiting_) { + action = InspectorAction::kStartSessionUnconditionally; + server_->AcceptSession(session_id); + } + io_->PostIncomingMessage(action, session_id, ""); } void InspectorIoDelegate::MessageReceived(int session_id, @@ -464,7 +486,6 @@ void InspectorIoDelegate::MessageReceived(int session_id, } void InspectorIoDelegate::EndSession(int session_id) { - connected_ = false; io_->PostIncomingMessage(InspectorAction::kEndSession, session_id, ""); } diff --git a/src/inspector_io.h b/src/inspector_io.h index 7c15466eed9..79ccc6095ff 100644 --- a/src/inspector_io.h +++ b/src/inspector_io.h @@ -36,6 +36,7 @@ class InspectorIoDelegate; enum class InspectorAction { kStartSession, + kStartSessionUnconditionally, // First attach with --inspect-brk kEndSession, kSendMessage }; @@ -44,7 +45,9 @@ enum class InspectorAction { enum class TransportAction { kKill, kSendMessage, - kStop + kStop, + kAcceptSession, + kDeclineSession }; class InspectorIo { @@ -61,7 +64,6 @@ class InspectorIo { void Stop(); bool IsStarted(); - bool IsConnected(); void WaitForDisconnect(); // Called from thread to queue an incoming message and trigger @@ -124,6 +126,8 @@ class InspectorIo { void WaitForFrontendMessageWhilePaused(); // Broadcast incoming_message_cond_ void NotifyMessageReceived(); + // Attach session to an inspector. Either kAcceptSession or kDeclineSession + TransportAction Attach(int session_id); const DebugOptions options_; diff --git a/src/inspector_socket.cc b/src/inspector_socket.cc index 49d337b70b1..23b77f6aa56 100644 --- a/src/inspector_socket.cc +++ b/src/inspector_socket.cc @@ -1,4 +1,6 @@ #include "inspector_socket.h" + +#include "http_parser.h" #include "util-inl.h" #define NODE_WANT_INTERNALS 1 @@ -18,12 +20,71 @@ namespace node { namespace inspector { -static const char CLOSE_FRAME[] = {'\x88', '\x00'}; +class TcpHolder { + public: + using Pointer = std::unique_ptr<TcpHolder, void(*)(TcpHolder*)>; -enum ws_decode_result { - FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR + static Pointer Accept(uv_stream_t* server, + InspectorSocket::DelegatePointer delegate); + void SetHandler(ProtocolHandler* handler); + int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb); + uv_tcp_t* tcp() { + return &tcp_; + } + InspectorSocket::Delegate* delegate(); + + private: + static TcpHolder* From(void* handle) { + return node::ContainerOf(&TcpHolder::tcp_, + reinterpret_cast<uv_tcp_t*>(handle)); + } + static void OnClosed(uv_handle_t* handle); + static void OnDataReceivedCb(uv_stream_t* stream, ssize_t nread, + const uv_buf_t* buf); + static void DisconnectAndDispose(TcpHolder* holder); + explicit TcpHolder(InspectorSocket::DelegatePointer delegate); + ~TcpHolder() = default; + void ReclaimUvBuf(const uv_buf_t* buf, ssize_t read); + + uv_tcp_t tcp_; + const InspectorSocket::DelegatePointer delegate_; + ProtocolHandler* handler_; + std::vector<char> buffer; +}; + + +class ProtocolHandler { + public: + ProtocolHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp); + + virtual void AcceptUpgrade(const std::string& accept_key) = 0; + virtual void OnData(std::vector<char>* data) = 0; + virtual void OnEof() = 0; + virtual void Write(const std::vector<char> data) = 0; + virtual void CancelHandshake() = 0; + + std::string GetHost(); + + InspectorSocket* inspector() { + return inspector_; + } + + static void Shutdown(ProtocolHandler* handler) { + handler->Shutdown(); + } + + protected: + virtual ~ProtocolHandler() = default; + virtual void Shutdown() = 0; + int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb); + InspectorSocket::Delegate* delegate(); + + InspectorSocket* const inspector_; + TcpHolder::Pointer tcp_; }; +namespace { + #if DUMP_READS || DUMP_WRITES static void dump_hex(const char* buf, size_t len) { const char* ptr = buf; @@ -50,64 +111,52 @@ static void dump_hex(const char* buf, size_t len) { } #endif -static void remove_from_beginning(std::vector<char>* buffer, size_t count) { - buffer->erase(buffer->begin(), buffer->begin() + count); -} - -static void dispose_inspector(uv_handle_t* handle) { - InspectorSocket* inspector = inspector_from_stream(handle); - inspector_cb close = - inspector->ws_mode ? inspector->ws_state->close_cb : nullptr; - inspector->buffer.clear(); - delete inspector->ws_state; - inspector->ws_state = nullptr; - if (close) { - close(inspector, 0); - } -} - -static void close_connection(InspectorSocket* inspector) { - uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp); - if (!uv_is_closing(socket)) { - uv_read_stop(reinterpret_cast<uv_stream_t*>(socket)); - uv_close(socket, dispose_inspector); - } -} - -struct WriteRequest { - WriteRequest(InspectorSocket* inspector, const char* data, size_t size) - : inspector(inspector) - , storage(data, data + size) - , buf(uv_buf_init(&storage[0], storage.size())) {} +class WriteRequest { + public: + WriteRequest(ProtocolHandler* handler, const std::vector<char>& buffer) + : handler(handler) + , storage(buffer) + , buf(uv_buf_init(storage.data(), storage.size())) {} static WriteRequest* from_write_req(uv_write_t* req) { return node::ContainerOf(&WriteRequest::req, req); } - InspectorSocket* const inspector; + static void Cleanup(uv_write_t* req, int status) { + delete WriteRequest::from_write_req(req); + } + + ProtocolHandler* const handler; std::vector<char> storage; uv_write_t req; uv_buf_t buf; }; -// Cleanup -static void write_request_cleanup(uv_write_t* req, int status) { - delete WriteRequest::from_write_req(req); +void allocate_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) { + *buf = uv_buf_init(new char[len], len); } -static int write_to_client(InspectorSocket* inspector, - const char* msg, - size_t len, - uv_write_cb write_cb = write_request_cleanup) { -#if DUMP_WRITES - printf("%s (%ld bytes):\n", __FUNCTION__, len); - dump_hex(msg, len); -#endif +static void remove_from_beginning(std::vector<char>* buffer, size_t count) { + buffer->erase(buffer->begin(), buffer->begin() + count); +} - // Freed in write_request_cleanup - WriteRequest* wr = new WriteRequest(inspector, msg, len); - uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&inspector->tcp); - return uv_write(&wr->req, stream, &wr->buf, 1, write_cb) < 0; +// Cleanup + +static const char CLOSE_FRAME[] = {'\x88', '\x00'}; + +enum ws_decode_result { + FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR +}; + +static void generate_accept_string(const std::string& client_key, + char (*buffer)[ACCEPT_KEY_LENGTH]) { + // Magic string from websockets spec. + static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + std::string input(client_key + ws_magic); + char hash[SHA_DIGEST_LENGTH]; + SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), + reinterpret_cast<unsigned char*>(hash)); + node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer)); } // Constants for hybi-10 frame format. @@ -134,11 +183,11 @@ const size_t kTwoBytePayloadLengthField = 126; const size_t kEightBytePayloadLengthField = 127; const size_t kMaskingKeyWidthInBytes = 4; -static std::vector<char> encode_frame_hybi17(const char* message, - size_t data_length) { +static std::vector<char> encode_frame_hybi17(const std::vector<char>& message) { std::vector<char> frame; OpCode op_code = kOpCodeText; frame.push_back(kFinalBit | op_code); + const size_t data_length = message.size(); if (data_length <= kMaxSingleBytePayloadLength) { frame.push_back(static_cast<char>(data_length)); } else if (data_length <= 0xFFFF) { @@ -158,7 +207,7 @@ static std::vector<char> encode_frame_hybi17(const char* message, extended_payload_length + 8); CHECK_EQ(0, remaining); } - frame.insert(frame.end(), message, message + data_length); + frame.insert(frame.end(), message.begin(), message.end()); return frame; } @@ -248,272 +297,368 @@ static ws_decode_result decode_frame_hybi17(const std::vector<char>& buffer, return closed ? FRAME_CLOSE : FRAME_OK; } -static void invoke_read_callback(InspectorSocket* inspector, - int status, const uv_buf_t* buf) { - if (inspector->ws_state->read_cb) { - inspector->ws_state->read_cb( - reinterpret_cast<uv_stream_t*>(&inspector->tcp), status, buf); + +// WS protocol +class WsHandler : public ProtocolHandler { + public: + WsHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp) + : ProtocolHandler(inspector, std::move(tcp)), + OnCloseSent(&WsHandler::WaitForCloseReply), + OnCloseRecieved(&WsHandler::CloseFrameReceived), + dispose_(false) { } + + void AcceptUpgrade(const std::string& accept_key) override { } + void CancelHandshake() override {} + + void OnEof() override { + tcp_.reset(); + if (dispose_) + delete this; } -} -static void shutdown_complete(InspectorSocket* inspector) { - close_connection(inspector); -} + void OnData(std::vector<char>* data) override { + // 1. Parse. + int processed = 0; + do { + processed = ParseWsFrames(*data); + // 2. Fix the data size & length + if (processed > 0) { + remove_from_beginning(data, processed); + } + } while (processed > 0 && !data->empty()); + } -static void on_close_frame_written(uv_write_t* req, int status) { - WriteRequest* wr = WriteRequest::from_write_req(req); - InspectorSocket* inspector = wr->inspector; - delete wr; - inspector->ws_state->close_sent = true; - if (inspector->ws_state->received_close) { - shutdown_complete(inspector); + void Write(const std::vector<char> data) { + std::vector<char> output = encode_frame_hybi17(data); + WriteRaw(output, WriteRequest::Cleanup); } -} -static void close_frame_received(InspectorSocket* inspector) { - inspector->ws_state->received_close = true; - if (!inspector->ws_state->close_sent) { - invoke_read_callback(inspector, 0, 0); - write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME), - on_close_frame_written); - } else { - shutdown_complete(inspector); + protected: + void Shutdown() override { + if (tcp_) { + dispose_ = true; + SendClose(); + } else { + delete this; + } } -} -static int parse_ws_frames(InspectorSocket* inspector) { - int bytes_consumed = 0; - std::vector<char> output; - bool compressed = false; - - ws_decode_result r = decode_frame_hybi17(inspector->buffer, - true /* client_frame */, - &bytes_consumed, &output, - &compressed); - // Compressed frame means client is ignoring the headers and misbehaves - if (compressed || r == FRAME_ERROR) { - invoke_read_callback(inspector, UV_EPROTO, nullptr); - close_connection(inspector); - bytes_consumed = 0; - } else if (r == FRAME_CLOSE) { - close_frame_received(inspector); - bytes_consumed = 0; - } else if (r == FRAME_OK && inspector->ws_state->alloc_cb - && inspector->ws_state->read_cb) { - uv_buf_t buffer; - size_t len = output.size(); - inspector->ws_state->alloc_cb( - reinterpret_cast<uv_handle_t*>(&inspector->tcp), - len, &buffer); - CHECK_GE(buffer.len, len); - memcpy(buffer.base, &output[0], len); - invoke_read_callback(inspector, len, &buffer); - } - return bytes_consumed; -} + private: + using Callback = void (WsHandler::*)(void); -static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) { - *buf = uv_buf_init(new char[len], len); -} + static void OnCloseFrameWritten(uv_write_t* req, int status) { + WriteRequest* wr = WriteRequest::from_write_req(req); + WsHandler* handler = static_cast<WsHandler*>(wr->handler); + delete wr; + Callback cb = handler->OnCloseSent; + (handler->*cb)(); + } -static void reclaim_uv_buf(InspectorSocket* inspector, const uv_buf_t* buf, - ssize_t read) { - if (read > 0) { - std::vector<char>& buffer = inspector->buffer; - buffer.insert(buffer.end(), buf->base, buf->base + read); + void WaitForCloseReply() { + OnCloseRecieved = &WsHandler::OnEof; } - delete[] buf->base; -} -static void websockets_data_cb(uv_stream_t* stream, ssize_t nread, - const uv_buf_t* buf) { - InspectorSocket* inspector = inspector_from_stream(stream); - reclaim_uv_buf(inspector, buf, nread); - if (nread < 0 || nread == UV_EOF) { - inspector->connection_eof = true; - if (!inspector->shutting_down && inspector->ws_state->read_cb) { - inspector->ws_state->read_cb(stream, nread, nullptr); + void SendClose() { + WriteRaw(std::vector<char>(CLOSE_FRAME, CLOSE_FRAME + sizeof(CLOSE_FRAME)), + OnCloseFrameWritten); + } + + void CloseFrameReceived() { + OnCloseSent = &WsHandler::OnEof; + SendClose(); + } + + int ParseWsFrames(const std::vector<char>& buffer) { + int bytes_consumed = 0; + std::vector<char> output; + bool compressed = false; + + ws_decode_result r = decode_frame_hybi17(buffer, + true /* client_frame */, + &bytes_consumed, &output, + &compressed); + // Compressed frame means client is ignoring the headers and misbehaves + if (compressed || r == FRAME_ERROR) { + OnEof(); + bytes_consumed = 0; + } else if (r == FRAME_CLOSE) { + (this->*OnCloseRecieved)(); + bytes_consumed = 0; + } else if (r == FRAME_OK) { + delegate()->OnWsFrame(output); } - if (inspector->ws_state->close_sent && - !inspector->ws_state->received_close) { - shutdown_complete(inspector); // invoke callback + return bytes_consumed; + } + + + Callback OnCloseSent; + Callback OnCloseRecieved; + bool dispose_; +}; + +// HTTP protocol +class HttpEvent { + public: + HttpEvent(const std::string& path, bool upgrade, + bool isGET, const std::string& ws_key) : path(path), + upgrade(upgrade), + isGET(isGET), + ws_key(ws_key) { } + + std::string path; + bool upgrade; + bool isGET; + std::string ws_key; + std::string current_header_; +}; + +class HttpHandler : public ProtocolHandler { + public: + explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp) + : ProtocolHandler(inspector, std::move(tcp)), + parsing_value_(false) { + http_parser_init(&parser_, HTTP_REQUEST); + http_parser_settings_init(&parser_settings); + parser_settings.on_header_field = OnHeaderField; + parser_settings.on_header_value = OnHeaderValue; + parser_settings.on_message_complete = OnMessageComplete; + parser_settings.on_url = OnPath; + } + + void AcceptUpgrade(const std::string& accept_key) override { + char accept_string[ACCEPT_KEY_LENGTH]; + generate_accept_string(accept_key, &accept_string); + const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: "; + const char accept_ws_suffix[] = "\r\n\r\n"; + std::vector<char> reply(accept_ws_prefix, + accept_ws_prefix + sizeof(accept_ws_prefix) - 1); + reply.insert(reply.end(), accept_string, + accept_string + sizeof(accept_string)); + reply.insert(reply.end(), accept_ws_suffix, + accept_ws_suffix + sizeof(accept_ws_suffix) - 1); + if (WriteRaw(reply, WriteRequest::Cleanup) >= 0) { + inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_))); + } else { + tcp_.reset(); } - } else { - #if DUMP_READS - printf("%s read %ld bytes\n", __FUNCTION__, nread); - if (nread > 0) { - dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread, - nread); - } - #endif - // 2. Parse. - int processed = 0; - do { - processed = parse_ws_frames(inspector); - // 3. Fix the buffer size & length - if (processed > 0) { - remove_from_beginning(&inspector->buffer, processed); + } + + void CancelHandshake() { + const char HANDSHAKE_FAILED_RESPONSE[] = + "HTTP/1.0 400 Bad Request\r\n" + "Content-Type: text/html; charset=UTF-8\r\n\r\n" + "WebSockets request was expected\r\n"; + WriteRaw(std::vector<char>(HANDSHAKE_FAILED_RESPONSE, + HANDSHAKE_FAILED_RESPONSE + sizeof(HANDSHAKE_FAILED_RESPONSE) - 1), + ThenCloseAndReportFailure); + } + + + void OnEof() override { + tcp_.reset(); + } + + void OnData(std::vector<char>* data) override { + http_parser_execute(&parser_, &parser_settings, data->data(), data->size()); + data->clear(); + if (parser_.http_errno != HPE_OK) { + CancelHandshake(); + } + // Event handling may delete *this + std::vector<HttpEvent> events; + std::swap(events, events_); + for (const HttpEvent& event : events) { + bool shouldContinue = event.isGET && !event.upgrade; + if (!event.isGET) { + CancelHandshake(); + } else if (!event.upgrade) { + delegate()->OnHttpGet(event.path); + } else if (event.ws_key.empty()) { + CancelHandshake(); + } else { + delegate()->OnSocketUpgrade(event.path, event.ws_key); } - } while (processed > 0 && !inspector->buffer.empty()); + if (!shouldContinue) + return; + } } -} -int inspector_read_start(InspectorSocket* inspector, - uv_alloc_cb alloc_cb, uv_read_cb read_cb) { - CHECK(inspector->ws_mode); - CHECK(!inspector->shutting_down || read_cb == nullptr); - inspector->ws_state->close_sent = false; - inspector->ws_state->alloc_cb = alloc_cb; - inspector->ws_state->read_cb = read_cb; - int err = - uv_read_start(reinterpret_cast<uv_stream_t*>(&inspector->tcp), - prepare_buffer, - websockets_data_cb); - if (err < 0) { - close_connection(inspector); - } - return err; -} + void Write(const std::vector<char> data) override { + WriteRaw(data, WriteRequest::Cleanup); + } -void inspector_read_stop(InspectorSocket* inspector) { - uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp)); - inspector->ws_state->alloc_cb = nullptr; - inspector->ws_state->read_cb = nullptr; -} + protected: + void Shutdown() override { + delete this; + } -static void generate_accept_string(const std::string& client_key, - char (*buffer)[ACCEPT_KEY_LENGTH]) { - // Magic string from websockets spec. - static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - std::string input(client_key + ws_magic); - char hash[SHA_DIGEST_LENGTH]; - SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(), - reinterpret_cast<unsigned char*>(hash)); - node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer)); -} + private: + static void ThenCloseAndReportFailure(uv_write_t* req, int status) { + ProtocolHandler* handler = WriteRequest::from_write_req(req)->handler; + WriteRequest::Cleanup(req, status); + handler->inspector()->SwitchProtocol(nullptr); + } -static int header_value_cb(http_parser* parser, const char* at, size_t length) { - static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key"; - auto inspector = static_cast<InspectorSocket*>(parser->data); - auto state = inspector->http_parsing_state; - state->parsing_value = true; - if (state->current_header.size() == sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 && - node::StringEqualNoCaseN(state->current_header.data(), - SEC_WEBSOCKET_KEY_HEADER, - sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) { - state->ws_key.append(at, length); - } - return 0; -} + static int OnHeaderValue(http_parser* parser, const char* at, size_t length) { + static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key"; + HttpHandler* handler = From(parser); + handler->parsing_value_ = true; + if (handler->current_header_.size() == + sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 && + node::StringEqualNoCaseN(handler->current_header_.data(), + SEC_WEBSOCKET_KEY_HEADER, + sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) { + handler->ws_key_.append(at, length); + } + return 0; + } -static int header_field_cb(http_parser* parser, const char* at, size_t length) { - auto inspector = static_cast<InspectorSocket*>(parser->data); - auto state = inspector->http_parsing_state; - if (state->parsing_value) { - state->parsing_value = false; - state->current_header.clear(); + static int OnHeaderField(http_parser* parser, const char* at, size_t length) { + HttpHandler* handler = From(parser); + if (handler->parsing_value_) { + handler->parsing_value_ = false; + handler->current_header_.clear(); + } + handler->current_header_.append(at, length); + return 0; } - state->current_header.append(at, length); - return 0; -} -static int path_cb(http_parser* parser, const char* at, size_t length) { - auto inspector = static_cast<InspectorSocket*>(parser->data); - auto state = inspector->http_parsing_state; - state->path.append(at, length); - return 0; -} + static int OnPath(http_parser* parser, const char* at, size_t length) { + HttpHandler* handler = From(parser); + handler->path_.append(at, length); + return 0; + } + + static HttpHandler* From(http_parser* parser) { + return node::ContainerOf(&HttpHandler::parser_, parser); + } + + static int OnMessageComplete(http_parser* parser) { + // Event needs to be fired after the parser is done. + HttpHandler* handler = From(parser); + handler->events_.push_back(HttpEvent(handler->path_, parser->upgrade, + parser->method == HTTP_GET, + handler->ws_key_)); + handler->path_ = ""; + handler->ws_key_ = ""; + handler->parsing_value_ = false; + handler->current_header_ = ""; + + return 0; + } + + bool parsing_value_; + http_parser parser_; + http_parser_settings parser_settings; + std::vector<HttpEvent> events_; + std::string current_header_; + std::string ws_key_; + std::string path_; +}; + +} // namespace -static void handshake_complete(InspectorSocket* inspector) { - uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp)); - handshake_cb callback = inspector->http_parsing_state->callback; - inspector->ws_state = new ws_state_s(); - inspector->ws_mode = true; - callback(inspector, kInspectorHandshakeUpgraded, - inspector->http_parsing_state->path); +// Any protocol +ProtocolHandler::ProtocolHandler(InspectorSocket* inspector, + TcpHolder::Pointer tcp) + : inspector_(inspector), tcp_(std::move(tcp)) { + CHECK_NE(nullptr, tcp_); + tcp_->SetHandler(this); } -static void cleanup_http_parsing_state(InspectorSocket* inspector) { - delete inspector->http_parsing_state; - inspector->http_parsing_state = nullptr; +int ProtocolHandler::WriteRaw(const std::vector<char>& buffer, + uv_write_cb write_cb) { + return tcp_->WriteRaw(buffer, write_cb); } -static void report_handshake_failure_cb(uv_handle_t* handle) { - dispose_inspector(handle); - InspectorSocket* inspector = inspector_from_stream(handle); - handshake_cb cb = inspector->http_parsing_state->callback; - cleanup_http_parsing_state(inspector); - cb(inspector, kInspectorHandshakeFailed, std::string()); +InspectorSocket::Delegate* ProtocolHandler::delegate() { + return tcp_->delegate(); } -static void close_and_report_handshake_failure(InspectorSocket* inspector) { - uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp); - if (uv_is_closing(socket)) { - report_handshake_failure_cb(socket); +std::string ProtocolHandler::GetHost() { + char ip[INET6_ADDRSTRLEN]; + sockaddr_storage addr; + int len = sizeof(addr); + int err = uv_tcp_getsockname(tcp_->tcp(), + reinterpret_cast<struct sockaddr*>(&addr), + &len); + if (err != 0) + return ""; + if (addr.ss_family == AF_INET6) { + const sockaddr_in6* v6 = reinterpret_cast<const sockaddr_in6*>(&addr); + err = uv_ip6_name(v6, ip, sizeof(ip)); } else { - uv_read_stop(reinterpret_cast<uv_stream_t*>(socket)); - uv_close(socket, report_handshake_failure_cb); + const sockaddr_in* v4 = reinterpret_cast<const sockaddr_in*>(&addr); + err = uv_ip4_name(v4, ip, sizeof(ip)); + } + if (err != 0) + return ""; + return ip; +} + +// RAII uv_tcp_t wrapper +TcpHolder::TcpHolder(InspectorSocket::DelegatePointer delegate) + : tcp_(), + delegate_(std::move(delegate)), + handler_(nullptr) { } + +// static +TcpHolder::Pointer TcpHolder::Accept( + uv_stream_t* server, + InspectorSocket::DelegatePointer delegate) { + TcpHolder* result = new TcpHolder(std::move(delegate)); + uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_); + int err = uv_tcp_init(server->loop, &result->tcp_); + if (err == 0) { + err = uv_accept(server, tcp); + } + if (err == 0) { + err = uv_read_start(tcp, allocate_buffer, OnDataReceivedCb); + } + if (err == 0) { + return { result, DisconnectAndDispose }; + } else { + fprintf(stderr, "[%s:%d@%s]\n", __FILE__, __LINE__, __FUNCTION__); + + delete result; + return { nullptr, nullptr }; } } -static void then_close_and_report_failure(uv_write_t* req, int status) { - InspectorSocket* inspector = WriteRequest::from_write_req(req)->inspector; - write_request_cleanup(req, status); - close_and_report_handshake_failure(inspector); +void TcpHolder::SetHandler(ProtocolHandler* handler) { + handler_ = handler; } -static void handshake_failed(InspectorSocket* inspector) { - const char HANDSHAKE_FAILED_RESPONSE[] = - "HTTP/1.0 400 Bad Request\r\n" - "Content-Type: text/html; charset=UTF-8\r\n\r\n" - "WebSockets request was expected\r\n"; - write_to_client(inspector, HANDSHAKE_FAILED_RESPONSE, - sizeof(HANDSHAKE_FAILED_RESPONSE) - 1, - then_close_and_report_failure); +int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) { +#if DUMP_WRITES + printf("%s (%ld bytes):\n", __FUNCTION__, buffer.size()); + dump_hex(buffer.data(), buffer.size()); + printf("\n"); +#endif + + // Freed in write_request_cleanup + WriteRequest* wr = new WriteRequest(handler_, buffer); + uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_); + int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb); + if (err < 0) + delete wr; + return err < 0; } -// init_handshake references message_complete_cb -static void init_handshake(InspectorSocket* socket); - -static int message_complete_cb(http_parser* parser) { - InspectorSocket* inspector = static_cast<InspectorSocket*>(parser->data); - struct http_parsing_state_s* state = inspector->http_parsing_state; - if (parser->method != HTTP_GET) { - handshake_failed(inspector); - } else if (!parser->upgrade) { - if (state->callback(inspector, kInspectorHandshakeHttpGet, state->path)) { - init_handshake(inspector); - } else { - handshake_failed(inspector); - } - } else if (state->ws_key.empty()) { - handshake_failed(inspector); - } else if (state->callback(inspector, kInspectorHandshakeUpgrading, - state->path)) { - char accept_string[ACCEPT_KEY_LENGTH]; - generate_accept_string(state->ws_key, &accept_string); - const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Accept: "; - const char accept_ws_suffix[] = "\r\n\r\n"; - std::string reply(accept_ws_prefix, sizeof(accept_ws_prefix) - 1); - reply.append(accept_string, sizeof(accept_string)); - reply.append(accept_ws_suffix, sizeof(accept_ws_suffix) - 1); - if (write_to_client(inspector, &reply[0], reply.size()) >= 0) { - handshake_complete(inspector); - inspector->http_parsing_state->done = true; - } else { - close_and_report_handshake_failure(inspector); - } - } else { - handshake_failed(inspector); - } - return 0; +InspectorSocket::Delegate* TcpHolder::delegate() { + return delegate_.get(); +} + +// static +void TcpHolder::OnClosed(uv_handle_t* handle) { + delete From(handle); } -static void data_received_cb(uv_stream_t* tcp, ssize_t nread, - const uv_buf_t* buf) { +void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread, + const uv_buf_t* buf) { #if DUMP_READS if (nread >= 0) { printf("%s (%ld bytes)\n", __FUNCTION__, nread); @@ -522,107 +667,65 @@ static void data_received_cb(uv_stream_t* tcp, ssize_t nread, printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread)); } #endif - InspectorSocket* inspector = inspector_from_stream(tcp); - reclaim_uv_buf(inspector, buf, nread); + TcpHolder* holder = From(tcp); + holder->ReclaimUvBuf(buf, nread); if (nread < 0 || nread == UV_EOF) { - close_and_report_handshake_failure(inspector); + holder->handler_->OnEof(); } else { - http_parsing_state_s* state = inspector->http_parsing_state; - http_parser* parser = &state->parser; - http_parser_execute(parser, &state->parser_settings, - inspector->buffer.data(), nread); - remove_from_beginning(&inspector->buffer, nread); - if (parser->http_errno != HPE_OK) { - handshake_failed(inspector); - } - if (inspector->http_parsing_state->done) { - cleanup_http_parsing_state(inspector); - } + holder->handler_->OnData(&holder->buffer); } } -static void init_handshake(InspectorSocket* socket) { - http_parsing_state_s* state = socket->http_parsing_state; - CHECK_NE(state, nullptr); - state->current_header.clear(); - state->ws_key.clear(); - state->path.clear(); - state->done = false; - http_parser_init(&state->parser, HTTP_REQUEST); - state->parser.data = socket; - http_parser_settings* settings = &state->parser_settings; - http_parser_settings_init(settings); - settings->on_header_field = header_field_cb; - settings->on_header_value = header_value_cb; - settings->on_message_complete = message_complete_cb; - settings->on_url = path_cb; +// static +void TcpHolder::DisconnectAndDispose(TcpHolder* holder) { + uv_handle_t* handle = reinterpret_cast<uv_handle_t*>(&holder->tcp_); + uv_close(handle, OnClosed); } -int inspector_accept(uv_stream_t* server, InspectorSocket* socket, - handshake_cb callback) { - CHECK_NE(callback, nullptr); - CHECK_EQ(socket->http_parsing_state, nullptr); - - socket->http_parsing_state = new http_parsing_state_s(); - uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&socket->tcp); - int err = uv_tcp_init(server->loop, &socket->tcp); - - if (err == 0) { - err = uv_accept(server, tcp); - } - if (err == 0) { - init_handshake(socket); - socket->http_parsing_state->callback = callback; - err = uv_read_start(tcp, prepare_buffer, - data_received_cb); - } - if (err != 0) { - uv_close(reinterpret_cast<uv_handle_t*>(tcp), nullptr); +void TcpHolder::ReclaimUvBuf(const uv_buf_t* buf, ssize_t read) { + if (read > 0) { + buffer.insert(buffer.end(), buf->base, buf->base + read); } - return err; + delete[] buf->base; } -void inspector_write(InspectorSocket* inspector, const char* data, - size_t len) { - if (inspector->ws_mode) { - std::vector<char> output = encode_frame_hybi17(data, len); - write_to_client(inspector, &output[0], output.size()); +// Public interface +InspectorSocket::InspectorSocket() + : protocol_handler_(nullptr, ProtocolHandler::Shutdown) { } + +InspectorSocket::~InspectorSocket() = default; + +// static +InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server, + DelegatePointer delegate) { + auto tcp = TcpHolder::Accept(server, std::move(delegate)); + if (tcp) { + InspectorSocket* inspector = new InspectorSocket(); + inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp))); + return InspectorSocket::Pointer(inspector); } else { - write_to_client(inspector, data, len); + return InspectorSocket::Pointer(nullptr); } } -void inspector_close(InspectorSocket* inspector, - inspector_cb callback) { - // libuv throws assertions when closing stream that's already closed - we - // need to do the same. - CHECK(!uv_is_closing(reinterpret_cast<uv_handle_t*>(&inspector->tcp))); - CHECK(!inspector->shutting_down); - inspector->shutting_down = true; - inspector->ws_state->close_cb = callback; - if (inspector->connection_eof) { - close_connection(inspector); - } else { - inspector_read_stop(inspector); - write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME), - on_close_frame_written); - inspector_read_start(inspector, nullptr, nullptr); - } +void InspectorSocket::AcceptUpgrade(const std::string& ws_key) { + protocol_handler_->AcceptUpgrade(ws_key); +} + +void InspectorSocket::CancelHandshake() { + protocol_handler_->CancelHandshake(); +} + +std::string InspectorSocket::GetHost() { + return protocol_handler_->GetHost(); } -bool inspector_is_active(const InspectorSocket* inspector) { - const uv_handle_t* tcp = - reinterpret_cast<const uv_handle_t*>(&inspector->tcp); - return !inspector->shutting_down && !uv_is_closing(tcp); +void InspectorSocket::SwitchProtocol(ProtocolHandler* handler) { + protocol_handler_.reset(std::move(handler)); } -void InspectorSocket::reinit() { - http_parsing_state = nullptr; - ws_state = nullptr; - buffer.clear(); - ws_mode = false; - shutting_down = false; - connection_eof = false; +void InspectorSocket::Write(const char* data, size_t len) { + protocol_handler_->Write(std::vector<char>(data, data + len)); } } // namespace inspector diff --git a/src/inspector_socket.h b/src/inspector_socket.h index f93150d6f9a..1a3411435ee 100644 --- a/src/inspector_socket.h +++ b/src/inspector_socket.h @@ -1,7 +1,6 @@ #ifndef SRC_INSPECTOR_SOCKET_H_ #define SRC_INSPECTOR_SOCKET_H_ -#include "http_parser.h" #include "util-inl.h" #include "uv.h" @@ -11,88 +10,41 @@ namespace node { namespace inspector { -enum inspector_handshake_event { - kInspectorHandshakeUpgrading, - kInspectorHandshakeUpgraded, - kInspectorHandshakeHttpGet, - kInspectorHandshakeFailed -}; - -class InspectorSocket; - -typedef void (*inspector_cb)(InspectorSocket*, int); -// Notifies as handshake is progressing. Returning false as a response to -// kInspectorHandshakeUpgrading or kInspectorHandshakeHttpGet event will abort -// the connection. inspector_write can be used from the callback. -typedef bool (*handshake_cb)(InspectorSocket*, - enum inspector_handshake_event state, - const std::string& path); - -struct http_parsing_state_s { - http_parser parser; - http_parser_settings parser_settings; - handshake_cb callback; - bool done; - bool parsing_value; - std::string ws_key; - std::string path; - std::string current_header; -}; - -struct ws_state_s { - uv_alloc_cb alloc_cb; - uv_read_cb read_cb; - inspector_cb close_cb; - bool close_sent; - bool received_close; -}; +class ProtocolHandler; // HTTP Wrapper around a uv_tcp_t class InspectorSocket { public: - InspectorSocket() : data(nullptr), http_parsing_state(nullptr), - ws_state(nullptr), buffer(0), ws_mode(false), - shutting_down(false), connection_eof(false) { } - void reinit(); - void* data; - struct http_parsing_state_s* http_parsing_state; - struct ws_state_s* ws_state; - std::vector<char> buffer; - uv_tcp_t tcp; - bool ws_mode; - bool shutting_down; - bool connection_eof; + class Delegate { + public: + virtual void OnHttpGet(const std::string& path) = 0; + virtual void OnSocketUpgrade(const std::string& path, + const std::string& accept_key) = 0; + virtual void OnWsFrame(const std::vector<char>& frame) = 0; + virtual ~Delegate() {} + }; - private: - DISALLOW_COPY_AND_ASSIGN(InspectorSocket); -}; + using DelegatePointer = std::unique_ptr<Delegate>; + using Pointer = std::unique_ptr<InspectorSocket>; + + static Pointer Accept(uv_stream_t* server, DelegatePointer delegate); -int inspector_accept(uv_stream_t* server, InspectorSocket* inspector, - handshake_cb callback); + ~InspectorSocket(); -void inspector_close(InspectorSocket* inspector, - inspector_cb callback); + void AcceptUpgrade(const std::string& accept_key); + void CancelHandshake(); + void Write(const char* data, size_t len); + void SwitchProtocol(ProtocolHandler* handler); + std::string GetHost(); -// Callbacks will receive stream handles. Use inspector_from_stream to get -// InspectorSocket* from the stream handle. -int inspector_read_start(InspectorSocket* inspector, uv_alloc_cb, - uv_read_cb); -void inspector_read_stop(InspectorSocket* inspector); -void inspector_write(InspectorSocket* inspector, - const char* data, size_t len); -bool inspector_is_active(const InspectorSocket* inspector); + private: + InspectorSocket(); -inline InspectorSocket* inspector_from_stream(uv_tcp_t* stream) { - return node::ContainerOf(&InspectorSocket::tcp, stream); -} + std::unique_ptr<ProtocolHandler, void(*)(ProtocolHandler*)> protocol_handler_; -inline InspectorSocket* inspector_from_stream(uv_stream_t* stream) { - return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream)); -} + DISALLOW_COPY_AND_ASSIGN(InspectorSocket); +}; -inline InspectorSocket* inspector_from_stream(uv_handle_t* stream) { - return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream)); -} } // namespace inspector } // namespace node diff --git a/src/inspector_socket_server.cc b/src/inspector_socket_server.cc index 958c41a654a..bf114251cb1 100644 --- a/src/inspector_socket_server.cc +++ b/src/inspector_socket_server.cc @@ -33,7 +33,6 @@ std::string FormatWsAddress(const std::string& host, int port, return url.str(); } - namespace { static const uint8_t PROTOCOL_JSON[] = { @@ -114,8 +113,8 @@ void SendHttpResponse(InspectorSocket* socket, const std::string& response) { "\r\n"; char header[sizeof(HEADERS) + 20]; int header_len = snprintf(header, sizeof(header), HEADERS, response.size()); - inspector_write(socket, header, header_len); - inspector_write(socket, response.data(), response.size()); + socket->Write(header, header_len); + socket->Write(response.data(), response.size()); } void SendVersionResponse(InspectorSocket* socket) { @@ -145,28 +144,6 @@ void SendProtocolJson(InspectorSocket* socket) { CHECK_EQ(Z_OK, inflateEnd(&strm)); SendHttpResponse(socket, data); } - -int GetSocketHost(uv_tcp_t* socket, std::string* out_host) { - char ip[INET6_ADDRSTRLEN]; - sockaddr_storage addr; - int len = sizeof(addr); - int err = uv_tcp_getsockname(socket, - reinterpret_cast<struct sockaddr*>(&addr), - &len); - if (err != 0) - return err; - if (addr.ss_family == AF_INET6) { - const sockaddr_in6* v6 = reinterpret_cast<const sockaddr_in6*>(&addr); - err = uv_ip6_name(v6, ip, sizeof(ip)); - } else { - const sockaddr_in* v4 = reinterpret_cast<const sockaddr_in*>(&addr); - err = uv_ip4_name(v4, ip, sizeof(ip)); - } - if (err != 0) - return err; - *out_host = ip; - return err; -} } // namespace @@ -209,46 +186,58 @@ class Closer { class SocketSession { public: - static int Accept(InspectorSocketServer* server, int server_port, - uv_stream_t* server_socket); + SocketSession(InspectorSocketServer* server, int id, int server_port); + void Close() { + ws_socket_.reset(); + } void Send(const std::string& message); - void Close(); - + void Own(InspectorSocket::Pointer ws_socket) { + ws_socket_ = std::move(ws_socket); + } int id() const { return id_; } - bool IsForTarget(const std::string& target_id) const { - return target_id_ == target_id; + int server_port() { + return server_port_; } - static int ServerPortForClient(InspectorSocket* client) { - return From(client)->server_port_; + InspectorSocket* ws_socket() { + return ws_socket_.get(); } - - private: - SocketSession(InspectorSocketServer* server, int server_port); - static SocketSession* From(InspectorSocket* socket) { - return node::ContainerOf(&SocketSession::socket_, socket); + void set_ws_key(const std::string& ws_key) { + ws_key_ = ws_key; + } + void Accept() { + ws_socket_->AcceptUpgrade(ws_key_); + } + void Decline() { + ws_socket_->CancelHandshake(); } - enum class State { kHttp, kWebSocket, kClosing, kEOF, kDeclined }; - static bool HandshakeCallback(InspectorSocket* socket, - enum inspector_handshake_event state, - const std::string& path); - static void ReadCallback(uv_stream_t* stream, ssize_t read, - const uv_buf_t* buf); - static void CloseCallback(InspectorSocket* socket, int code); + class Delegate : public InspectorSocket::Delegate { + public: + Delegate(InspectorSocketServer* server, int session_id) + : server_(server), session_id_(session_id) { } + ~Delegate() { + server_->SessionTerminated(session_id_); + } + void OnHttpGet(const std::string& path) override; + void OnSocketUpgrade(const std::string& path, + const std::string& ws_key) override; + void OnWsFrame(const std::vector<char>& data) override; + + private: + SocketSession* Session() { + return server_->Session(session_id_); + } - void FrontendConnected(); - void SetDeclined() { state_ = State::kDeclined; } - void SetTargetId(const std::string& target_id) { - CHECK(target_id_.empty()); - target_id_ = target_id; - } + InspectorSocketServer* server_; + int session_id_; + }; + private: const int id_; - InspectorSocket socket_; + InspectorSocket::Pointer ws_socket_; InspectorSocketServer* server_; - std::string target_id_; - State state_; const int server_port_; + std::string ws_key_; }; class ServerSocket { @@ -269,7 +258,6 @@ class ServerSocket { return node::ContainerOf(&ServerSocket::tcp_socket_, reinterpret_cast<uv_tcp_t*>(socket)); } - static void SocketConnectedCallback(uv_stream_t* tcp_socket, int status); static void SocketClosedCallback(uv_handle_t* tcp_socket); static void FreeOnCloseCallback(uv_handle_t* tcp_socket_) { @@ -296,41 +284,57 @@ InspectorSocketServer::InspectorSocketServer(SocketServerDelegate* delegate, state_ = ServerState::kNew; } -bool InspectorSocketServer::SessionStarted(SocketSession* session, - const std::string& id) { - if (TargetExists(id) && delegate_->StartSession(session->id(), id)) { - connected_sessions_[session->id()] = session; - return true; - } else { - return false; +InspectorSocketServer::~InspectorSocketServer() = default; + +SocketSession* InspectorSocketServer::Session(int session_id) { + auto it = connected_sessions_.find(session_id); + return it == connected_sessions_.end() ? nullptr : it->second.second.get(); +} + +void InspectorSocketServer::SessionStarted(int session_id, + const std::string& id, + const std::string& ws_key) { + SocketSession* session = Session(session_id); + if (!TargetExists(id)) { + Session(session_id)->Decline(); + return; } + connected_sessions_[session_id].first = id; + session->set_ws_key(ws_key); + delegate_->StartSession(session_id, id); } -void InspectorSocketServer::SessionTerminated(SocketSession* session) { - int id = session->id(); - if (connected_sessions_.erase(id) != 0) { - delegate_->EndSession(id); - if (connected_sessions_.empty()) { - if (state_ == ServerState::kRunning && !server_sockets_.empty()) { - PrintDebuggerReadyMessage(host_, server_sockets_[0]->port(), - delegate_->GetTargetIds(), out_); - } - if (state_ == ServerState::kStopped) { - delegate_->ServerDone(); - } +void InspectorSocketServer::SessionTerminated(int session_id) { + if (Session(session_id) == nullptr) { + return; + } + bool was_attached = connected_sessions_[session_id].first != ""; + if (was_attached) { + delegate_->EndSession(session_id); + } + connected_sessions_.erase(session_id); + if (connected_sessions_.empty()) { + if (was_attached && state_ == ServerState::kRunning + && !server_sockets_.empty()) { + PrintDebuggerReadyMessage(host_, server_sockets_[0]->port(), + delegate_->GetTargetIds(), out_); + } + if (state_ == ServerState::kStopped) { + delegate_->ServerDone(); } } - delete session; } -bool InspectorSocketServer::HandleGetRequest(InspectorSocket* socket, +bool InspectorSocketServer::HandleGetRequest(int session_id, const std::string& path) { + SocketSession* session = Session(session_id); + InspectorSocket* socket = session->ws_socket(); const char* command = MatchPathSegment(path.c_str(), "/json"); if (command == nullptr) return false; if (MatchPathSegment(command, "list") || command[0] == '\0') { - SendListResponse(socket); + SendListResponse(socket, session); return true; } else if (MatchPathSegment(command, "protocol")) { SendProtocolJson(socket); @@ -348,7 +352,8 @@ bool InspectorSocketServer::HandleGetRequest(InspectorSocket* socket, return false; } -void InspectorSocketServer::SendListResponse(InspectorSocket* socket) { +void InspectorSocketServer::SendListResponse(InspectorSocket* socket, + SocketSession* session) { std::vector<std::map<std::string, std::string>> response; for (const std::string& id : delegate_->GetTargetIds()) { response.push_back(std::map<std::string, std::string>()); @@ -366,15 +371,14 @@ void InspectorSocketServer::SendListResponse(InspectorSocket* socket) { bool connected = false; for (const auto& session : connected_sessions_) { - if (session.second->IsForTarget(id)) { + if (session.second.first == id) { connected = true; break; } } if (!connected) { - std::string host; - int port = SocketSession::ServerPortForClient(socket); - GetSocketHost(&socket->tcp, &host); + std::string host = socket->GetHost(); + int port = session->server_port(); std::ostringstream frontend_url; frontend_url << "chrome-devtools://devtools/bundled"; frontend_url << "/inspector.html?experiments=true&v8only=true&ws="; @@ -444,9 +448,8 @@ void InspectorSocketServer::Stop(ServerCallback cb) { } void InspectorSocketServer::TerminateConnections() { - for (const auto& session : connected_sessions_) { - session.second->Close(); - } + for (const auto& key_value : connected_sessions_) + key_value.second.second->Close(); } bool InspectorSocketServer::TargetExists(const std::string& id) { @@ -455,13 +458,6 @@ bool InspectorSocketServer::TargetExists(const std::string& id) { return found != target_ids.end(); } -void InspectorSocketServer::Send(int session_id, const std::string& message) { - auto session_iterator = connected_sessions_.find(session_id); - if (session_iterator != connected_sessions_.end()) { - session_iterator->second->Send(message); - } -} - void InspectorSocketServer::ServerSocketListening(ServerSocket* server_socket) { server_sockets_.push_back(server_socket); } @@ -491,92 +487,73 @@ int InspectorSocketServer::Port() const { return port_; } -// InspectorSession tracking -SocketSession::SocketSession(InspectorSocketServer* server, int server_port) - : id_(server->GenerateSessionId()), - server_(server), - state_(State::kHttp), - server_port_(server_port) { } +void InspectorSocketServer::Accept(int server_port, + uv_stream_t* server_socket) { + std::unique_ptr<SocketSession> session( + new SocketSession(this, next_session_id_++, server_port)); + + InspectorSocket::DelegatePointer delegate = + InspectorSocket::DelegatePointer( + new SocketSession::Delegate(this, session->id())); -void SocketSession::Close() { - CHECK_NE(state_, State::kClosing); - state_ = State::kClosing; - inspector_close(&socket_, CloseCallback); + InspectorSocket::Pointer inspector = + InspectorSocket::Accept(server_socket, std::move(delegate)); + if (inspector) { + session->Own(std::move(inspector)); + connected_sessions_[session->id()].second = std::move(session); + } } -// static -int SocketSession::Accept(InspectorSocketServer* server, int server_port, - uv_stream_t* server_socket) { - // Memory is freed when the socket closes. - SocketSession* session = new SocketSession(server, server_port); - int err = inspector_accept(server_socket, &session->socket_, - HandshakeCallback); - if (err != 0) { - delete session; +void InspectorSocketServer::AcceptSession(int session_id) { + SocketSession* session = Session(session_id); + if (session == nullptr) { + delegate_->EndSession(session_id); + } else { + session->Accept(); } - return err; } -// static -bool SocketSession::HandshakeCallback(InspectorSocket* socket, - inspector_handshake_event event, - const std::string& path) { - SocketSession* session = SocketSession::From(socket); - InspectorSocketServer* server = session->server_; - const std::string& id = path.empty() ? path : path.substr(1); - switch (event) { - case kInspectorHandshakeHttpGet: - return server->HandleGetRequest(socket, path); - case kInspectorHandshakeUpgrading: - if (server->SessionStarted(session, id)) { - session->SetTargetId(id); - return true; - } else { - session->SetDeclined(); - return false; - } - case kInspectorHandshakeUpgraded: - session->FrontendConnected(); - return true; - case kInspectorHandshakeFailed: - server->SessionTerminated(session); - return false; - default: - UNREACHABLE(); - return false; +void InspectorSocketServer::DeclineSession(int session_id) { + auto it = connected_sessions_.find(session_id); + if (it != connected_sessions_.end()) { + it->second.first.clear(); + it->second.second->Decline(); } } -// static -void SocketSession::CloseCallback(InspectorSocket* socket, int code) { - SocketSession* session = SocketSession::From(socket); - CHECK_EQ(State::kClosing, session->state_); - session->server_->SessionTerminated(session); +void InspectorSocketServer::Send(int session_id, const std::string& message) { + SocketSession* session = Session(session_id); + if (session != nullptr) { + session->Send(message); + } +} + +// InspectorSession tracking +SocketSession::SocketSession(InspectorSocketServer* server, int id, + int server_port) + : id_(id), + server_(server), + server_port_(server_port) { } + + +void SocketSession::Send(const std::string& message) { + ws_socket_->Write(message.data(), message.length()); } -void SocketSession::FrontendConnected() { - CHECK_EQ(State::kHttp, state_); - state_ = State::kWebSocket; - inspector_read_start(&socket_, OnBufferAlloc, ReadCallback); +void SocketSession::Delegate::OnHttpGet(const std::string& path) { + if (!server_->HandleGetRequest(session_id_, path)) + Session()->ws_socket()->CancelHandshake(); } -// static -void SocketSession::ReadCallback(uv_stream_t* stream, ssize_t read, - const uv_buf_t* buf) { - InspectorSocket* socket = inspector_from_stream(stream); - SocketSession* session = SocketSession::From(socket); - if (read > 0) { - session->server_->MessageReceived(session->id_, - std::string(buf->base, read)); - } else { - session->Close(); - } - if (buf != nullptr && buf->base != nullptr) - delete[] buf->base; +void SocketSession::Delegate::OnSocketUpgrade(const std::string& path, + const std::string& ws_key) { + std::string id = path.empty() ? path : path.substr(1); + server_->SessionStarted(session_id_, id, ws_key); } -void SocketSession::Send(const std::string& message) { - inspector_write(&socket_, message.data(), message.length()); +void SocketSession::Delegate::OnWsFrame(const std::vector<char>& data) { + server_->MessageReceived(session_id_, + std::string(data.data(), data.size())); } // ServerSocket implementation @@ -624,8 +601,7 @@ void ServerSocket::SocketConnectedCallback(uv_stream_t* tcp_socket, if (status == 0) { ServerSocket* server_socket = ServerSocket::FromTcpSocket(tcp_socket); // Memory is freed when the socket closes. - SocketSession::Accept(server_socket->server_, server_socket->port_, - tcp_socket); + server_socket->server_->Accept(server_socket->port_, tcp_socket); } } diff --git a/src/inspector_socket_server.h b/src/inspector_socket_server.h index 16b047da333..b193e33a46d 100644 --- a/src/inspector_socket_server.h +++ b/src/inspector_socket_server.h @@ -22,7 +22,7 @@ class ServerSocket; class SocketServerDelegate { public: - virtual bool StartSession(int session_id, const std::string& target_id) = 0; + virtual void StartSession(int session_id, const std::string& target_id) = 0; virtual void EndSession(int session_id) = 0; virtual void MessageReceived(int session_id, const std::string& message) = 0; virtual std::vector<std::string> GetTargetIds() = 0; @@ -34,8 +34,6 @@ class SocketServerDelegate { // HTTP Server, writes messages requested as TransportActions, and responds // to HTTP requests and WS upgrades. - - class InspectorSocketServer { public: using ServerCallback = void (*)(InspectorSocketServer*); @@ -44,6 +42,8 @@ class InspectorSocketServer { const std::string& host, int port, FILE* out = stderr); + ~InspectorSocketServer(); + // Start listening on host/port bool Start(); @@ -54,6 +54,10 @@ class InspectorSocketServer { void Send(int session_id, const std::string& message); // kKill void TerminateConnections(); + // kAcceptSession + void AcceptSession(int session_id); + // kDeclineSession + void DeclineSession(int session_id); int Port() const; @@ -62,19 +66,18 @@ class InspectorSocketServer { void ServerSocketClosed(ServerSocket* server_socket); // Session connection lifecycle - bool HandleGetRequest(InspectorSocket* socket, const std::string& path); - bool SessionStarted(SocketSession* session, const std::string& id); - void SessionTerminated(SocketSession* session); + void Accept(int server_port, uv_stream_t* server_socket); + bool HandleGetRequest(int session_id, const std::string& path); + void SessionStarted(int session_id, const std::string& target_id, + const std::string& ws_id); + void SessionTerminated(int session_id); void MessageReceived(int session_id, const std::string& message) { delegate_->MessageReceived(session_id, message); } - - int GenerateSessionId() { - return next_session_id_++; - } + SocketSession* Session(int session_id); private: - void SendListResponse(InspectorSocket* socket); + void SendListResponse(InspectorSocket* socket, SocketSession* session); bool TargetExists(const std::string& id); enum class ServerState {kNew, kRunning, kStopping, kStopped}; @@ -85,7 +88,8 @@ class InspectorSocketServer { std::string path_; std::vector<ServerSocket*> server_sockets_; Closer* closer_; - std::map<int, SocketSession*> connected_sessions_; + std::map<int, std::pair<std::string, std::unique_ptr<SocketSession>>> + connected_sessions_; int next_session_id_; FILE* out_; ServerState state_; diff --git a/src/node.cc b/src/node.cc index ed2f0c14ff3..3af764d1fba 100644 --- a/src/node.cc +++ b/src/node.cc @@ -2006,7 +2006,7 @@ static void InitGroups(const FunctionCallbackInfo<Value>& args) { static void WaitForInspectorDisconnect(Environment* env) { #if HAVE_INSPECTOR - if (env->inspector_agent()->IsConnected()) { + if (env->inspector_agent()->delegate() != nullptr) { // Restore signal dispositions, the app is done and is no longer // capable of handling signals. #if defined(__POSIX__) && !defined(NODE_SHARED_MODE) |