Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/inspector_agent.cc4
-rw-r--r--src/inspector_agent.h1
-rw-r--r--src/inspector_io.cc73
-rw-r--r--src/inspector_io.h8
-rw-r--r--src/inspector_socket.cc825
-rw-r--r--src/inspector_socket.h96
-rw-r--r--src/inspector_socket_server.cc306
-rw-r--r--src/inspector_socket_server.h28
-rw-r--r--src/node.cc2
9 files changed, 699 insertions, 644 deletions
diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc
index 9677dca37b5..216b43ca6c3 100644
--- a/src/inspector_agent.cc
+++ b/src/inspector_agent.cc
@@ -548,10 +548,6 @@ void Agent::Connect(InspectorSessionDelegate* delegate) {
client_->connectFrontend(delegate);
}
-bool Agent::IsConnected() {
- return io_ && io_->IsConnected();
-}
-
void Agent::WaitForDisconnect() {
CHECK_NE(client_, nullptr);
client_->contextDestroyed(parent_env_->context());
diff --git a/src/inspector_agent.h b/src/inspector_agent.h
index 29b9546b514..a2d61d0c8db 100644
--- a/src/inspector_agent.h
+++ b/src/inspector_agent.h
@@ -48,7 +48,6 @@ class Agent {
bool IsStarted() { return !!client_; }
// IO thread started, and client connected
- bool IsConnected();
bool IsWaitingForConnect();
void WaitForDisconnect();
diff --git a/src/inspector_io.cc b/src/inspector_io.cc
index 538cbab3c9f..9af4458c6b2 100644
--- a/src/inspector_io.cc
+++ b/src/inspector_io.cc
@@ -136,7 +136,7 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
const std::string& script_name, bool wait);
// Calls PostIncomingMessage() with appropriate InspectorAction:
// kStartSession
- bool StartSession(int session_id, const std::string& target_id) override;
+ void StartSession(int session_id, const std::string& target_id) override;
// kSendMessage
void MessageReceived(int session_id, const std::string& message) override;
// kEndSession
@@ -145,19 +145,22 @@ class InspectorIoDelegate: public node::inspector::SocketServerDelegate {
std::vector<std::string> GetTargetIds() override;
std::string GetTargetTitle(const std::string& id) override;
std::string GetTargetUrl(const std::string& id) override;
- bool IsConnected() { return connected_; }
void ServerDone() override {
io_->ServerDone();
}
+ void AssignTransport(InspectorSocketServer* server) {
+ server_ = server;
+ }
+
private:
InspectorIo* io_;
- bool connected_;
int session_id_;
const std::string script_name_;
const std::string script_path_;
const std::string target_id_;
bool waiting_;
+ InspectorSocketServer* server_;
};
void InterruptCallback(v8::Isolate*, void* agent) {
@@ -226,10 +229,6 @@ void InspectorIo::Stop() {
DispatchMessages();
}
-bool InspectorIo::IsConnected() {
- return delegate_ != nullptr && delegate_->IsConnected();
-}
-
bool InspectorIo::IsStarted() {
return platform_ != nullptr;
}
@@ -264,6 +263,7 @@ void InspectorIo::IoThreadAsyncCb(uv_async_t* async) {
MessageQueue<TransportAction> outgoing_message_queue;
io->SwapBehindLock(&io->outgoing_message_queue_, &outgoing_message_queue);
for (const auto& outgoing : outgoing_message_queue) {
+ int session_id = std::get<1>(outgoing);
switch (std::get<0>(outgoing)) {
case TransportAction::kKill:
transport->TerminateConnections();
@@ -272,8 +272,14 @@ void InspectorIo::IoThreadAsyncCb(uv_async_t* async) {
transport->Stop(nullptr);
break;
case TransportAction::kSendMessage:
- std::string message = StringViewToUtf8(std::get<2>(outgoing)->string());
- transport->Send(std::get<1>(outgoing), message);
+ transport->Send(session_id,
+ StringViewToUtf8(std::get<2>(outgoing)->string()));
+ break;
+ case TransportAction::kAcceptSession:
+ transport->AcceptSession(session_id);
+ break;
+ case TransportAction::kDeclineSession:
+ transport->DeclineSession(session_id);
break;
}
}
@@ -293,6 +299,7 @@ void InspectorIo::ThreadMain() {
wait_for_connect_);
delegate_ = &delegate;
Transport server(&delegate, &loop, options_.host_name(), options_.port());
+ delegate.AssignTransport(&server);
TransportAndIo<Transport> queue_transport(&server, this);
thread_req_.data = &queue_transport;
if (!server.Start()) {
@@ -308,6 +315,7 @@ void InspectorIo::ThreadMain() {
uv_run(&loop, UV_RUN_DEFAULT);
thread_req_.data = nullptr;
CHECK_EQ(uv_loop_close(&loop), 0);
+ delegate.AssignTransport(nullptr);
delegate_ = nullptr;
}
@@ -358,6 +366,21 @@ void InspectorIo::NotifyMessageReceived() {
incoming_message_cond_.Broadcast(scoped_lock);
}
+TransportAction InspectorIo::Attach(int session_id) {
+ Agent* agent = parent_env_->inspector_agent();
+ if (agent->delegate() != nullptr)
+ return TransportAction::kDeclineSession;
+
+ CHECK_EQ(session_delegate_, nullptr);
+ session_id_ = session_id;
+ state_ = State::kConnected;
+ fprintf(stderr, "Debugger attached.\n");
+ session_delegate_ = std::unique_ptr<InspectorSessionDelegate>(
+ new IoSessionDelegate(this));
+ agent->Connect(session_delegate_.get());
+ return TransportAction::kAcceptSession;
+}
+
void InspectorIo::DispatchMessages() {
// This function can be reentered if there was an incoming message while
// V8 was processing another inspector request (e.g. if the user is
@@ -375,16 +398,14 @@ void InspectorIo::DispatchMessages() {
MessageQueue<InspectorAction>::value_type task;
std::swap(dispatching_message_queue_.front(), task);
dispatching_message_queue_.pop_front();
+ int id = std::get<1>(task);
StringView message = std::get<2>(task)->string();
switch (std::get<0>(task)) {
case InspectorAction::kStartSession:
- CHECK_EQ(session_delegate_, nullptr);
- session_id_ = std::get<1>(task);
- state_ = State::kConnected;
- fprintf(stderr, "Debugger attached.\n");
- session_delegate_ = std::unique_ptr<InspectorSessionDelegate>(
- new IoSessionDelegate(this));
- parent_env_->inspector_agent()->Connect(session_delegate_.get());
+ Write(Attach(id), id, StringView());
+ break;
+ case InspectorAction::kStartSessionUnconditionally:
+ Attach(id);
break;
case InspectorAction::kEndSession:
CHECK_NE(session_delegate_, nullptr);
@@ -428,22 +449,23 @@ InspectorIoDelegate::InspectorIoDelegate(InspectorIo* io,
const std::string& script_name,
bool wait)
: io_(io),
- connected_(false),
session_id_(0),
script_name_(script_name),
script_path_(script_path),
target_id_(GenerateID()),
- waiting_(wait) { }
+ waiting_(wait),
+ server_(nullptr) { }
-bool InspectorIoDelegate::StartSession(int session_id,
+void InspectorIoDelegate::StartSession(int session_id,
const std::string& target_id) {
- if (connected_)
- return false;
- connected_ = true;
- session_id_++;
- io_->PostIncomingMessage(InspectorAction::kStartSession, session_id, "");
- return true;
+ session_id_ = session_id;
+ InspectorAction action = InspectorAction::kStartSession;
+ if (waiting_) {
+ action = InspectorAction::kStartSessionUnconditionally;
+ server_->AcceptSession(session_id);
+ }
+ io_->PostIncomingMessage(action, session_id, "");
}
void InspectorIoDelegate::MessageReceived(int session_id,
@@ -464,7 +486,6 @@ void InspectorIoDelegate::MessageReceived(int session_id,
}
void InspectorIoDelegate::EndSession(int session_id) {
- connected_ = false;
io_->PostIncomingMessage(InspectorAction::kEndSession, session_id, "");
}
diff --git a/src/inspector_io.h b/src/inspector_io.h
index 7c15466eed9..79ccc6095ff 100644
--- a/src/inspector_io.h
+++ b/src/inspector_io.h
@@ -36,6 +36,7 @@ class InspectorIoDelegate;
enum class InspectorAction {
kStartSession,
+ kStartSessionUnconditionally, // First attach with --inspect-brk
kEndSession,
kSendMessage
};
@@ -44,7 +45,9 @@ enum class InspectorAction {
enum class TransportAction {
kKill,
kSendMessage,
- kStop
+ kStop,
+ kAcceptSession,
+ kDeclineSession
};
class InspectorIo {
@@ -61,7 +64,6 @@ class InspectorIo {
void Stop();
bool IsStarted();
- bool IsConnected();
void WaitForDisconnect();
// Called from thread to queue an incoming message and trigger
@@ -124,6 +126,8 @@ class InspectorIo {
void WaitForFrontendMessageWhilePaused();
// Broadcast incoming_message_cond_
void NotifyMessageReceived();
+ // Attach session to an inspector. Either kAcceptSession or kDeclineSession
+ TransportAction Attach(int session_id);
const DebugOptions options_;
diff --git a/src/inspector_socket.cc b/src/inspector_socket.cc
index 49d337b70b1..23b77f6aa56 100644
--- a/src/inspector_socket.cc
+++ b/src/inspector_socket.cc
@@ -1,4 +1,6 @@
#include "inspector_socket.h"
+
+#include "http_parser.h"
#include "util-inl.h"
#define NODE_WANT_INTERNALS 1
@@ -18,12 +20,71 @@
namespace node {
namespace inspector {
-static const char CLOSE_FRAME[] = {'\x88', '\x00'};
+class TcpHolder {
+ public:
+ using Pointer = std::unique_ptr<TcpHolder, void(*)(TcpHolder*)>;
-enum ws_decode_result {
- FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR
+ static Pointer Accept(uv_stream_t* server,
+ InspectorSocket::DelegatePointer delegate);
+ void SetHandler(ProtocolHandler* handler);
+ int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb);
+ uv_tcp_t* tcp() {
+ return &tcp_;
+ }
+ InspectorSocket::Delegate* delegate();
+
+ private:
+ static TcpHolder* From(void* handle) {
+ return node::ContainerOf(&TcpHolder::tcp_,
+ reinterpret_cast<uv_tcp_t*>(handle));
+ }
+ static void OnClosed(uv_handle_t* handle);
+ static void OnDataReceivedCb(uv_stream_t* stream, ssize_t nread,
+ const uv_buf_t* buf);
+ static void DisconnectAndDispose(TcpHolder* holder);
+ explicit TcpHolder(InspectorSocket::DelegatePointer delegate);
+ ~TcpHolder() = default;
+ void ReclaimUvBuf(const uv_buf_t* buf, ssize_t read);
+
+ uv_tcp_t tcp_;
+ const InspectorSocket::DelegatePointer delegate_;
+ ProtocolHandler* handler_;
+ std::vector<char> buffer;
+};
+
+
+class ProtocolHandler {
+ public:
+ ProtocolHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp);
+
+ virtual void AcceptUpgrade(const std::string& accept_key) = 0;
+ virtual void OnData(std::vector<char>* data) = 0;
+ virtual void OnEof() = 0;
+ virtual void Write(const std::vector<char> data) = 0;
+ virtual void CancelHandshake() = 0;
+
+ std::string GetHost();
+
+ InspectorSocket* inspector() {
+ return inspector_;
+ }
+
+ static void Shutdown(ProtocolHandler* handler) {
+ handler->Shutdown();
+ }
+
+ protected:
+ virtual ~ProtocolHandler() = default;
+ virtual void Shutdown() = 0;
+ int WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb);
+ InspectorSocket::Delegate* delegate();
+
+ InspectorSocket* const inspector_;
+ TcpHolder::Pointer tcp_;
};
+namespace {
+
#if DUMP_READS || DUMP_WRITES
static void dump_hex(const char* buf, size_t len) {
const char* ptr = buf;
@@ -50,64 +111,52 @@ static void dump_hex(const char* buf, size_t len) {
}
#endif
-static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
- buffer->erase(buffer->begin(), buffer->begin() + count);
-}
-
-static void dispose_inspector(uv_handle_t* handle) {
- InspectorSocket* inspector = inspector_from_stream(handle);
- inspector_cb close =
- inspector->ws_mode ? inspector->ws_state->close_cb : nullptr;
- inspector->buffer.clear();
- delete inspector->ws_state;
- inspector->ws_state = nullptr;
- if (close) {
- close(inspector, 0);
- }
-}
-
-static void close_connection(InspectorSocket* inspector) {
- uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp);
- if (!uv_is_closing(socket)) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(socket));
- uv_close(socket, dispose_inspector);
- }
-}
-
-struct WriteRequest {
- WriteRequest(InspectorSocket* inspector, const char* data, size_t size)
- : inspector(inspector)
- , storage(data, data + size)
- , buf(uv_buf_init(&storage[0], storage.size())) {}
+class WriteRequest {
+ public:
+ WriteRequest(ProtocolHandler* handler, const std::vector<char>& buffer)
+ : handler(handler)
+ , storage(buffer)
+ , buf(uv_buf_init(storage.data(), storage.size())) {}
static WriteRequest* from_write_req(uv_write_t* req) {
return node::ContainerOf(&WriteRequest::req, req);
}
- InspectorSocket* const inspector;
+ static void Cleanup(uv_write_t* req, int status) {
+ delete WriteRequest::from_write_req(req);
+ }
+
+ ProtocolHandler* const handler;
std::vector<char> storage;
uv_write_t req;
uv_buf_t buf;
};
-// Cleanup
-static void write_request_cleanup(uv_write_t* req, int status) {
- delete WriteRequest::from_write_req(req);
+void allocate_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
+ *buf = uv_buf_init(new char[len], len);
}
-static int write_to_client(InspectorSocket* inspector,
- const char* msg,
- size_t len,
- uv_write_cb write_cb = write_request_cleanup) {
-#if DUMP_WRITES
- printf("%s (%ld bytes):\n", __FUNCTION__, len);
- dump_hex(msg, len);
-#endif
+static void remove_from_beginning(std::vector<char>* buffer, size_t count) {
+ buffer->erase(buffer->begin(), buffer->begin() + count);
+}
- // Freed in write_request_cleanup
- WriteRequest* wr = new WriteRequest(inspector, msg, len);
- uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&inspector->tcp);
- return uv_write(&wr->req, stream, &wr->buf, 1, write_cb) < 0;
+// Cleanup
+
+static const char CLOSE_FRAME[] = {'\x88', '\x00'};
+
+enum ws_decode_result {
+ FRAME_OK, FRAME_INCOMPLETE, FRAME_CLOSE, FRAME_ERROR
+};
+
+static void generate_accept_string(const std::string& client_key,
+ char (*buffer)[ACCEPT_KEY_LENGTH]) {
+ // Magic string from websockets spec.
+ static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+ std::string input(client_key + ws_magic);
+ char hash[SHA_DIGEST_LENGTH];
+ SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(),
+ reinterpret_cast<unsigned char*>(hash));
+ node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer));
}
// Constants for hybi-10 frame format.
@@ -134,11 +183,11 @@ const size_t kTwoBytePayloadLengthField = 126;
const size_t kEightBytePayloadLengthField = 127;
const size_t kMaskingKeyWidthInBytes = 4;
-static std::vector<char> encode_frame_hybi17(const char* message,
- size_t data_length) {
+static std::vector<char> encode_frame_hybi17(const std::vector<char>& message) {
std::vector<char> frame;
OpCode op_code = kOpCodeText;
frame.push_back(kFinalBit | op_code);
+ const size_t data_length = message.size();
if (data_length <= kMaxSingleBytePayloadLength) {
frame.push_back(static_cast<char>(data_length));
} else if (data_length <= 0xFFFF) {
@@ -158,7 +207,7 @@ static std::vector<char> encode_frame_hybi17(const char* message,
extended_payload_length + 8);
CHECK_EQ(0, remaining);
}
- frame.insert(frame.end(), message, message + data_length);
+ frame.insert(frame.end(), message.begin(), message.end());
return frame;
}
@@ -248,272 +297,368 @@ static ws_decode_result decode_frame_hybi17(const std::vector<char>& buffer,
return closed ? FRAME_CLOSE : FRAME_OK;
}
-static void invoke_read_callback(InspectorSocket* inspector,
- int status, const uv_buf_t* buf) {
- if (inspector->ws_state->read_cb) {
- inspector->ws_state->read_cb(
- reinterpret_cast<uv_stream_t*>(&inspector->tcp), status, buf);
+
+// WS protocol
+class WsHandler : public ProtocolHandler {
+ public:
+ WsHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
+ : ProtocolHandler(inspector, std::move(tcp)),
+ OnCloseSent(&WsHandler::WaitForCloseReply),
+ OnCloseRecieved(&WsHandler::CloseFrameReceived),
+ dispose_(false) { }
+
+ void AcceptUpgrade(const std::string& accept_key) override { }
+ void CancelHandshake() override {}
+
+ void OnEof() override {
+ tcp_.reset();
+ if (dispose_)
+ delete this;
}
-}
-static void shutdown_complete(InspectorSocket* inspector) {
- close_connection(inspector);
-}
+ void OnData(std::vector<char>* data) override {
+ // 1. Parse.
+ int processed = 0;
+ do {
+ processed = ParseWsFrames(*data);
+ // 2. Fix the data size & length
+ if (processed > 0) {
+ remove_from_beginning(data, processed);
+ }
+ } while (processed > 0 && !data->empty());
+ }
-static void on_close_frame_written(uv_write_t* req, int status) {
- WriteRequest* wr = WriteRequest::from_write_req(req);
- InspectorSocket* inspector = wr->inspector;
- delete wr;
- inspector->ws_state->close_sent = true;
- if (inspector->ws_state->received_close) {
- shutdown_complete(inspector);
+ void Write(const std::vector<char> data) {
+ std::vector<char> output = encode_frame_hybi17(data);
+ WriteRaw(output, WriteRequest::Cleanup);
}
-}
-static void close_frame_received(InspectorSocket* inspector) {
- inspector->ws_state->received_close = true;
- if (!inspector->ws_state->close_sent) {
- invoke_read_callback(inspector, 0, 0);
- write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME),
- on_close_frame_written);
- } else {
- shutdown_complete(inspector);
+ protected:
+ void Shutdown() override {
+ if (tcp_) {
+ dispose_ = true;
+ SendClose();
+ } else {
+ delete this;
+ }
}
-}
-static int parse_ws_frames(InspectorSocket* inspector) {
- int bytes_consumed = 0;
- std::vector<char> output;
- bool compressed = false;
-
- ws_decode_result r = decode_frame_hybi17(inspector->buffer,
- true /* client_frame */,
- &bytes_consumed, &output,
- &compressed);
- // Compressed frame means client is ignoring the headers and misbehaves
- if (compressed || r == FRAME_ERROR) {
- invoke_read_callback(inspector, UV_EPROTO, nullptr);
- close_connection(inspector);
- bytes_consumed = 0;
- } else if (r == FRAME_CLOSE) {
- close_frame_received(inspector);
- bytes_consumed = 0;
- } else if (r == FRAME_OK && inspector->ws_state->alloc_cb
- && inspector->ws_state->read_cb) {
- uv_buf_t buffer;
- size_t len = output.size();
- inspector->ws_state->alloc_cb(
- reinterpret_cast<uv_handle_t*>(&inspector->tcp),
- len, &buffer);
- CHECK_GE(buffer.len, len);
- memcpy(buffer.base, &output[0], len);
- invoke_read_callback(inspector, len, &buffer);
- }
- return bytes_consumed;
-}
+ private:
+ using Callback = void (WsHandler::*)(void);
-static void prepare_buffer(uv_handle_t* stream, size_t len, uv_buf_t* buf) {
- *buf = uv_buf_init(new char[len], len);
-}
+ static void OnCloseFrameWritten(uv_write_t* req, int status) {
+ WriteRequest* wr = WriteRequest::from_write_req(req);
+ WsHandler* handler = static_cast<WsHandler*>(wr->handler);
+ delete wr;
+ Callback cb = handler->OnCloseSent;
+ (handler->*cb)();
+ }
-static void reclaim_uv_buf(InspectorSocket* inspector, const uv_buf_t* buf,
- ssize_t read) {
- if (read > 0) {
- std::vector<char>& buffer = inspector->buffer;
- buffer.insert(buffer.end(), buf->base, buf->base + read);
+ void WaitForCloseReply() {
+ OnCloseRecieved = &WsHandler::OnEof;
}
- delete[] buf->base;
-}
-static void websockets_data_cb(uv_stream_t* stream, ssize_t nread,
- const uv_buf_t* buf) {
- InspectorSocket* inspector = inspector_from_stream(stream);
- reclaim_uv_buf(inspector, buf, nread);
- if (nread < 0 || nread == UV_EOF) {
- inspector->connection_eof = true;
- if (!inspector->shutting_down && inspector->ws_state->read_cb) {
- inspector->ws_state->read_cb(stream, nread, nullptr);
+ void SendClose() {
+ WriteRaw(std::vector<char>(CLOSE_FRAME, CLOSE_FRAME + sizeof(CLOSE_FRAME)),
+ OnCloseFrameWritten);
+ }
+
+ void CloseFrameReceived() {
+ OnCloseSent = &WsHandler::OnEof;
+ SendClose();
+ }
+
+ int ParseWsFrames(const std::vector<char>& buffer) {
+ int bytes_consumed = 0;
+ std::vector<char> output;
+ bool compressed = false;
+
+ ws_decode_result r = decode_frame_hybi17(buffer,
+ true /* client_frame */,
+ &bytes_consumed, &output,
+ &compressed);
+ // Compressed frame means client is ignoring the headers and misbehaves
+ if (compressed || r == FRAME_ERROR) {
+ OnEof();
+ bytes_consumed = 0;
+ } else if (r == FRAME_CLOSE) {
+ (this->*OnCloseRecieved)();
+ bytes_consumed = 0;
+ } else if (r == FRAME_OK) {
+ delegate()->OnWsFrame(output);
}
- if (inspector->ws_state->close_sent &&
- !inspector->ws_state->received_close) {
- shutdown_complete(inspector); // invoke callback
+ return bytes_consumed;
+ }
+
+
+ Callback OnCloseSent;
+ Callback OnCloseRecieved;
+ bool dispose_;
+};
+
+// HTTP protocol
+class HttpEvent {
+ public:
+ HttpEvent(const std::string& path, bool upgrade,
+ bool isGET, const std::string& ws_key) : path(path),
+ upgrade(upgrade),
+ isGET(isGET),
+ ws_key(ws_key) { }
+
+ std::string path;
+ bool upgrade;
+ bool isGET;
+ std::string ws_key;
+ std::string current_header_;
+};
+
+class HttpHandler : public ProtocolHandler {
+ public:
+ explicit HttpHandler(InspectorSocket* inspector, TcpHolder::Pointer tcp)
+ : ProtocolHandler(inspector, std::move(tcp)),
+ parsing_value_(false) {
+ http_parser_init(&parser_, HTTP_REQUEST);
+ http_parser_settings_init(&parser_settings);
+ parser_settings.on_header_field = OnHeaderField;
+ parser_settings.on_header_value = OnHeaderValue;
+ parser_settings.on_message_complete = OnMessageComplete;
+ parser_settings.on_url = OnPath;
+ }
+
+ void AcceptUpgrade(const std::string& accept_key) override {
+ char accept_string[ACCEPT_KEY_LENGTH];
+ generate_accept_string(accept_key, &accept_string);
+ const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
+ "Upgrade: websocket\r\n"
+ "Connection: Upgrade\r\n"
+ "Sec-WebSocket-Accept: ";
+ const char accept_ws_suffix[] = "\r\n\r\n";
+ std::vector<char> reply(accept_ws_prefix,
+ accept_ws_prefix + sizeof(accept_ws_prefix) - 1);
+ reply.insert(reply.end(), accept_string,
+ accept_string + sizeof(accept_string));
+ reply.insert(reply.end(), accept_ws_suffix,
+ accept_ws_suffix + sizeof(accept_ws_suffix) - 1);
+ if (WriteRaw(reply, WriteRequest::Cleanup) >= 0) {
+ inspector_->SwitchProtocol(new WsHandler(inspector_, std::move(tcp_)));
+ } else {
+ tcp_.reset();
}
- } else {
- #if DUMP_READS
- printf("%s read %ld bytes\n", __FUNCTION__, nread);
- if (nread > 0) {
- dump_hex(inspector->buffer.data() + inspector->buffer.size() - nread,
- nread);
- }
- #endif
- // 2. Parse.
- int processed = 0;
- do {
- processed = parse_ws_frames(inspector);
- // 3. Fix the buffer size & length
- if (processed > 0) {
- remove_from_beginning(&inspector->buffer, processed);
+ }
+
+ void CancelHandshake() {
+ const char HANDSHAKE_FAILED_RESPONSE[] =
+ "HTTP/1.0 400 Bad Request\r\n"
+ "Content-Type: text/html; charset=UTF-8\r\n\r\n"
+ "WebSockets request was expected\r\n";
+ WriteRaw(std::vector<char>(HANDSHAKE_FAILED_RESPONSE,
+ HANDSHAKE_FAILED_RESPONSE + sizeof(HANDSHAKE_FAILED_RESPONSE) - 1),
+ ThenCloseAndReportFailure);
+ }
+
+
+ void OnEof() override {
+ tcp_.reset();
+ }
+
+ void OnData(std::vector<char>* data) override {
+ http_parser_execute(&parser_, &parser_settings, data->data(), data->size());
+ data->clear();
+ if (parser_.http_errno != HPE_OK) {
+ CancelHandshake();
+ }
+ // Event handling may delete *this
+ std::vector<HttpEvent> events;
+ std::swap(events, events_);
+ for (const HttpEvent& event : events) {
+ bool shouldContinue = event.isGET && !event.upgrade;
+ if (!event.isGET) {
+ CancelHandshake();
+ } else if (!event.upgrade) {
+ delegate()->OnHttpGet(event.path);
+ } else if (event.ws_key.empty()) {
+ CancelHandshake();
+ } else {
+ delegate()->OnSocketUpgrade(event.path, event.ws_key);
}
- } while (processed > 0 && !inspector->buffer.empty());
+ if (!shouldContinue)
+ return;
+ }
}
-}
-int inspector_read_start(InspectorSocket* inspector,
- uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
- CHECK(inspector->ws_mode);
- CHECK(!inspector->shutting_down || read_cb == nullptr);
- inspector->ws_state->close_sent = false;
- inspector->ws_state->alloc_cb = alloc_cb;
- inspector->ws_state->read_cb = read_cb;
- int err =
- uv_read_start(reinterpret_cast<uv_stream_t*>(&inspector->tcp),
- prepare_buffer,
- websockets_data_cb);
- if (err < 0) {
- close_connection(inspector);
- }
- return err;
-}
+ void Write(const std::vector<char> data) override {
+ WriteRaw(data, WriteRequest::Cleanup);
+ }
-void inspector_read_stop(InspectorSocket* inspector) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp));
- inspector->ws_state->alloc_cb = nullptr;
- inspector->ws_state->read_cb = nullptr;
-}
+ protected:
+ void Shutdown() override {
+ delete this;
+ }
-static void generate_accept_string(const std::string& client_key,
- char (*buffer)[ACCEPT_KEY_LENGTH]) {
- // Magic string from websockets spec.
- static const char ws_magic[] = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- std::string input(client_key + ws_magic);
- char hash[SHA_DIGEST_LENGTH];
- SHA1(reinterpret_cast<const unsigned char*>(&input[0]), input.size(),
- reinterpret_cast<unsigned char*>(hash));
- node::base64_encode(hash, sizeof(hash), *buffer, sizeof(*buffer));
-}
+ private:
+ static void ThenCloseAndReportFailure(uv_write_t* req, int status) {
+ ProtocolHandler* handler = WriteRequest::from_write_req(req)->handler;
+ WriteRequest::Cleanup(req, status);
+ handler->inspector()->SwitchProtocol(nullptr);
+ }
-static int header_value_cb(http_parser* parser, const char* at, size_t length) {
- static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key";
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- state->parsing_value = true;
- if (state->current_header.size() == sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 &&
- node::StringEqualNoCaseN(state->current_header.data(),
- SEC_WEBSOCKET_KEY_HEADER,
- sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) {
- state->ws_key.append(at, length);
- }
- return 0;
-}
+ static int OnHeaderValue(http_parser* parser, const char* at, size_t length) {
+ static const char SEC_WEBSOCKET_KEY_HEADER[] = "Sec-WebSocket-Key";
+ HttpHandler* handler = From(parser);
+ handler->parsing_value_ = true;
+ if (handler->current_header_.size() ==
+ sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1 &&
+ node::StringEqualNoCaseN(handler->current_header_.data(),
+ SEC_WEBSOCKET_KEY_HEADER,
+ sizeof(SEC_WEBSOCKET_KEY_HEADER) - 1)) {
+ handler->ws_key_.append(at, length);
+ }
+ return 0;
+ }
-static int header_field_cb(http_parser* parser, const char* at, size_t length) {
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- if (state->parsing_value) {
- state->parsing_value = false;
- state->current_header.clear();
+ static int OnHeaderField(http_parser* parser, const char* at, size_t length) {
+ HttpHandler* handler = From(parser);
+ if (handler->parsing_value_) {
+ handler->parsing_value_ = false;
+ handler->current_header_.clear();
+ }
+ handler->current_header_.append(at, length);
+ return 0;
}
- state->current_header.append(at, length);
- return 0;
-}
-static int path_cb(http_parser* parser, const char* at, size_t length) {
- auto inspector = static_cast<InspectorSocket*>(parser->data);
- auto state = inspector->http_parsing_state;
- state->path.append(at, length);
- return 0;
-}
+ static int OnPath(http_parser* parser, const char* at, size_t length) {
+ HttpHandler* handler = From(parser);
+ handler->path_.append(at, length);
+ return 0;
+ }
+
+ static HttpHandler* From(http_parser* parser) {
+ return node::ContainerOf(&HttpHandler::parser_, parser);
+ }
+
+ static int OnMessageComplete(http_parser* parser) {
+ // Event needs to be fired after the parser is done.
+ HttpHandler* handler = From(parser);
+ handler->events_.push_back(HttpEvent(handler->path_, parser->upgrade,
+ parser->method == HTTP_GET,
+ handler->ws_key_));
+ handler->path_ = "";
+ handler->ws_key_ = "";
+ handler->parsing_value_ = false;
+ handler->current_header_ = "";
+
+ return 0;
+ }
+
+ bool parsing_value_;
+ http_parser parser_;
+ http_parser_settings parser_settings;
+ std::vector<HttpEvent> events_;
+ std::string current_header_;
+ std::string ws_key_;
+ std::string path_;
+};
+
+} // namespace
-static void handshake_complete(InspectorSocket* inspector) {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(&inspector->tcp));
- handshake_cb callback = inspector->http_parsing_state->callback;
- inspector->ws_state = new ws_state_s();
- inspector->ws_mode = true;
- callback(inspector, kInspectorHandshakeUpgraded,
- inspector->http_parsing_state->path);
+// Any protocol
+ProtocolHandler::ProtocolHandler(InspectorSocket* inspector,
+ TcpHolder::Pointer tcp)
+ : inspector_(inspector), tcp_(std::move(tcp)) {
+ CHECK_NE(nullptr, tcp_);
+ tcp_->SetHandler(this);
}
-static void cleanup_http_parsing_state(InspectorSocket* inspector) {
- delete inspector->http_parsing_state;
- inspector->http_parsing_state = nullptr;
+int ProtocolHandler::WriteRaw(const std::vector<char>& buffer,
+ uv_write_cb write_cb) {
+ return tcp_->WriteRaw(buffer, write_cb);
}
-static void report_handshake_failure_cb(uv_handle_t* handle) {
- dispose_inspector(handle);
- InspectorSocket* inspector = inspector_from_stream(handle);
- handshake_cb cb = inspector->http_parsing_state->callback;
- cleanup_http_parsing_state(inspector);
- cb(inspector, kInspectorHandshakeFailed, std::string());
+InspectorSocket::Delegate* ProtocolHandler::delegate() {
+ return tcp_->delegate();
}
-static void close_and_report_handshake_failure(InspectorSocket* inspector) {
- uv_handle_t* socket = reinterpret_cast<uv_handle_t*>(&inspector->tcp);
- if (uv_is_closing(socket)) {
- report_handshake_failure_cb(socket);
+std::string ProtocolHandler::GetHost() {
+ char ip[INET6_ADDRSTRLEN];
+ sockaddr_storage addr;
+ int len = sizeof(addr);
+ int err = uv_tcp_getsockname(tcp_->tcp(),
+ reinterpret_cast<struct sockaddr*>(&addr),
+ &len);
+ if (err != 0)
+ return "";
+ if (addr.ss_family == AF_INET6) {
+ const sockaddr_in6* v6 = reinterpret_cast<const sockaddr_in6*>(&addr);
+ err = uv_ip6_name(v6, ip, sizeof(ip));
} else {
- uv_read_stop(reinterpret_cast<uv_stream_t*>(socket));
- uv_close(socket, report_handshake_failure_cb);
+ const sockaddr_in* v4 = reinterpret_cast<const sockaddr_in*>(&addr);
+ err = uv_ip4_name(v4, ip, sizeof(ip));
+ }
+ if (err != 0)
+ return "";
+ return ip;
+}
+
+// RAII uv_tcp_t wrapper
+TcpHolder::TcpHolder(InspectorSocket::DelegatePointer delegate)
+ : tcp_(),
+ delegate_(std::move(delegate)),
+ handler_(nullptr) { }
+
+// static
+TcpHolder::Pointer TcpHolder::Accept(
+ uv_stream_t* server,
+ InspectorSocket::DelegatePointer delegate) {
+ TcpHolder* result = new TcpHolder(std::move(delegate));
+ uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&result->tcp_);
+ int err = uv_tcp_init(server->loop, &result->tcp_);
+ if (err == 0) {
+ err = uv_accept(server, tcp);
+ }
+ if (err == 0) {
+ err = uv_read_start(tcp, allocate_buffer, OnDataReceivedCb);
+ }
+ if (err == 0) {
+ return { result, DisconnectAndDispose };
+ } else {
+ fprintf(stderr, "[%s:%d@%s]\n", __FILE__, __LINE__, __FUNCTION__);
+
+ delete result;
+ return { nullptr, nullptr };
}
}
-static void then_close_and_report_failure(uv_write_t* req, int status) {
- InspectorSocket* inspector = WriteRequest::from_write_req(req)->inspector;
- write_request_cleanup(req, status);
- close_and_report_handshake_failure(inspector);
+void TcpHolder::SetHandler(ProtocolHandler* handler) {
+ handler_ = handler;
}
-static void handshake_failed(InspectorSocket* inspector) {
- const char HANDSHAKE_FAILED_RESPONSE[] =
- "HTTP/1.0 400 Bad Request\r\n"
- "Content-Type: text/html; charset=UTF-8\r\n\r\n"
- "WebSockets request was expected\r\n";
- write_to_client(inspector, HANDSHAKE_FAILED_RESPONSE,
- sizeof(HANDSHAKE_FAILED_RESPONSE) - 1,
- then_close_and_report_failure);
+int TcpHolder::WriteRaw(const std::vector<char>& buffer, uv_write_cb write_cb) {
+#if DUMP_WRITES
+ printf("%s (%ld bytes):\n", __FUNCTION__, buffer.size());
+ dump_hex(buffer.data(), buffer.size());
+ printf("\n");
+#endif
+
+ // Freed in write_request_cleanup
+ WriteRequest* wr = new WriteRequest(handler_, buffer);
+ uv_stream_t* stream = reinterpret_cast<uv_stream_t*>(&tcp_);
+ int err = uv_write(&wr->req, stream, &wr->buf, 1, write_cb);
+ if (err < 0)
+ delete wr;
+ return err < 0;
}
-// init_handshake references message_complete_cb
-static void init_handshake(InspectorSocket* socket);
-
-static int message_complete_cb(http_parser* parser) {
- InspectorSocket* inspector = static_cast<InspectorSocket*>(parser->data);
- struct http_parsing_state_s* state = inspector->http_parsing_state;
- if (parser->method != HTTP_GET) {
- handshake_failed(inspector);
- } else if (!parser->upgrade) {
- if (state->callback(inspector, kInspectorHandshakeHttpGet, state->path)) {
- init_handshake(inspector);
- } else {
- handshake_failed(inspector);
- }
- } else if (state->ws_key.empty()) {
- handshake_failed(inspector);
- } else if (state->callback(inspector, kInspectorHandshakeUpgrading,
- state->path)) {
- char accept_string[ACCEPT_KEY_LENGTH];
- generate_accept_string(state->ws_key, &accept_string);
- const char accept_ws_prefix[] = "HTTP/1.1 101 Switching Protocols\r\n"
- "Upgrade: websocket\r\n"
- "Connection: Upgrade\r\n"
- "Sec-WebSocket-Accept: ";
- const char accept_ws_suffix[] = "\r\n\r\n";
- std::string reply(accept_ws_prefix, sizeof(accept_ws_prefix) - 1);
- reply.append(accept_string, sizeof(accept_string));
- reply.append(accept_ws_suffix, sizeof(accept_ws_suffix) - 1);
- if (write_to_client(inspector, &reply[0], reply.size()) >= 0) {
- handshake_complete(inspector);
- inspector->http_parsing_state->done = true;
- } else {
- close_and_report_handshake_failure(inspector);
- }
- } else {
- handshake_failed(inspector);
- }
- return 0;
+InspectorSocket::Delegate* TcpHolder::delegate() {
+ return delegate_.get();
+}
+
+// static
+void TcpHolder::OnClosed(uv_handle_t* handle) {
+ delete From(handle);
}
-static void data_received_cb(uv_stream_t* tcp, ssize_t nread,
- const uv_buf_t* buf) {
+void TcpHolder::OnDataReceivedCb(uv_stream_t* tcp, ssize_t nread,
+ const uv_buf_t* buf) {
#if DUMP_READS
if (nread >= 0) {
printf("%s (%ld bytes)\n", __FUNCTION__, nread);
@@ -522,107 +667,65 @@ static void data_received_cb(uv_stream_t* tcp, ssize_t nread,
printf("[%s:%d] %s\n", __FUNCTION__, __LINE__, uv_err_name(nread));
}
#endif
- InspectorSocket* inspector = inspector_from_stream(tcp);
- reclaim_uv_buf(inspector, buf, nread);
+ TcpHolder* holder = From(tcp);
+ holder->ReclaimUvBuf(buf, nread);
if (nread < 0 || nread == UV_EOF) {
- close_and_report_handshake_failure(inspector);
+ holder->handler_->OnEof();
} else {
- http_parsing_state_s* state = inspector->http_parsing_state;
- http_parser* parser = &state->parser;
- http_parser_execute(parser, &state->parser_settings,
- inspector->buffer.data(), nread);
- remove_from_beginning(&inspector->buffer, nread);
- if (parser->http_errno != HPE_OK) {
- handshake_failed(inspector);
- }
- if (inspector->http_parsing_state->done) {
- cleanup_http_parsing_state(inspector);
- }
+ holder->handler_->OnData(&holder->buffer);
}
}
-static void init_handshake(InspectorSocket* socket) {
- http_parsing_state_s* state = socket->http_parsing_state;
- CHECK_NE(state, nullptr);
- state->current_header.clear();
- state->ws_key.clear();
- state->path.clear();
- state->done = false;
- http_parser_init(&state->parser, HTTP_REQUEST);
- state->parser.data = socket;
- http_parser_settings* settings = &state->parser_settings;
- http_parser_settings_init(settings);
- settings->on_header_field = header_field_cb;
- settings->on_header_value = header_value_cb;
- settings->on_message_complete = message_complete_cb;
- settings->on_url = path_cb;
+// static
+void TcpHolder::DisconnectAndDispose(TcpHolder* holder) {
+ uv_handle_t* handle = reinterpret_cast<uv_handle_t*>(&holder->tcp_);
+ uv_close(handle, OnClosed);
}
-int inspector_accept(uv_stream_t* server, InspectorSocket* socket,
- handshake_cb callback) {
- CHECK_NE(callback, nullptr);
- CHECK_EQ(socket->http_parsing_state, nullptr);
-
- socket->http_parsing_state = new http_parsing_state_s();
- uv_stream_t* tcp = reinterpret_cast<uv_stream_t*>(&socket->tcp);
- int err = uv_tcp_init(server->loop, &socket->tcp);
-
- if (err == 0) {
- err = uv_accept(server, tcp);
- }
- if (err == 0) {
- init_handshake(socket);
- socket->http_parsing_state->callback = callback;
- err = uv_read_start(tcp, prepare_buffer,
- data_received_cb);
- }
- if (err != 0) {
- uv_close(reinterpret_cast<uv_handle_t*>(tcp), nullptr);
+void TcpHolder::ReclaimUvBuf(const uv_buf_t* buf, ssize_t read) {
+ if (read > 0) {
+ buffer.insert(buffer.end(), buf->base, buf->base + read);
}
- return err;
+ delete[] buf->base;
}
-void inspector_write(InspectorSocket* inspector, const char* data,
- size_t len) {
- if (inspector->ws_mode) {
- std::vector<char> output = encode_frame_hybi17(data, len);
- write_to_client(inspector, &output[0], output.size());
+// Public interface
+InspectorSocket::InspectorSocket()
+ : protocol_handler_(nullptr, ProtocolHandler::Shutdown) { }
+
+InspectorSocket::~InspectorSocket() = default;
+
+// static
+InspectorSocket::Pointer InspectorSocket::Accept(uv_stream_t* server,
+ DelegatePointer delegate) {
+ auto tcp = TcpHolder::Accept(server, std::move(delegate));
+ if (tcp) {
+ InspectorSocket* inspector = new InspectorSocket();
+ inspector->SwitchProtocol(new HttpHandler(inspector, std::move(tcp)));
+ return InspectorSocket::Pointer(inspector);
} else {
- write_to_client(inspector, data, len);
+ return InspectorSocket::Pointer(nullptr);
}
}
-void inspector_close(InspectorSocket* inspector,
- inspector_cb callback) {
- // libuv throws assertions when closing stream that's already closed - we
- // need to do the same.
- CHECK(!uv_is_closing(reinterpret_cast<uv_handle_t*>(&inspector->tcp)));
- CHECK(!inspector->shutting_down);
- inspector->shutting_down = true;
- inspector->ws_state->close_cb = callback;
- if (inspector->connection_eof) {
- close_connection(inspector);
- } else {
- inspector_read_stop(inspector);
- write_to_client(inspector, CLOSE_FRAME, sizeof(CLOSE_FRAME),
- on_close_frame_written);
- inspector_read_start(inspector, nullptr, nullptr);
- }
+void InspectorSocket::AcceptUpgrade(const std::string& ws_key) {
+ protocol_handler_->AcceptUpgrade(ws_key);
+}
+
+void InspectorSocket::CancelHandshake() {
+ protocol_handler_->CancelHandshake();
+}
+
+std::string InspectorSocket::GetHost() {
+ return protocol_handler_->GetHost();
}
-bool inspector_is_active(const InspectorSocket* inspector) {
- const uv_handle_t* tcp =
- reinterpret_cast<const uv_handle_t*>(&inspector->tcp);
- return !inspector->shutting_down && !uv_is_closing(tcp);
+void InspectorSocket::SwitchProtocol(ProtocolHandler* handler) {
+ protocol_handler_.reset(std::move(handler));
}
-void InspectorSocket::reinit() {
- http_parsing_state = nullptr;
- ws_state = nullptr;
- buffer.clear();
- ws_mode = false;
- shutting_down = false;
- connection_eof = false;
+void InspectorSocket::Write(const char* data, size_t len) {
+ protocol_handler_->Write(std::vector<char>(data, data + len));
}
} // namespace inspector
diff --git a/src/inspector_socket.h b/src/inspector_socket.h
index f93150d6f9a..1a3411435ee 100644
--- a/src/inspector_socket.h
+++ b/src/inspector_socket.h
@@ -1,7 +1,6 @@
#ifndef SRC_INSPECTOR_SOCKET_H_
#define SRC_INSPECTOR_SOCKET_H_
-#include "http_parser.h"
#include "util-inl.h"
#include "uv.h"
@@ -11,88 +10,41 @@
namespace node {
namespace inspector {
-enum inspector_handshake_event {
- kInspectorHandshakeUpgrading,
- kInspectorHandshakeUpgraded,
- kInspectorHandshakeHttpGet,
- kInspectorHandshakeFailed
-};
-
-class InspectorSocket;
-
-typedef void (*inspector_cb)(InspectorSocket*, int);
-// Notifies as handshake is progressing. Returning false as a response to
-// kInspectorHandshakeUpgrading or kInspectorHandshakeHttpGet event will abort
-// the connection. inspector_write can be used from the callback.
-typedef bool (*handshake_cb)(InspectorSocket*,
- enum inspector_handshake_event state,
- const std::string& path);
-
-struct http_parsing_state_s {
- http_parser parser;
- http_parser_settings parser_settings;
- handshake_cb callback;
- bool done;
- bool parsing_value;
- std::string ws_key;
- std::string path;
- std::string current_header;
-};
-
-struct ws_state_s {
- uv_alloc_cb alloc_cb;
- uv_read_cb read_cb;
- inspector_cb close_cb;
- bool close_sent;
- bool received_close;
-};
+class ProtocolHandler;
// HTTP Wrapper around a uv_tcp_t
class InspectorSocket {
public:
- InspectorSocket() : data(nullptr), http_parsing_state(nullptr),
- ws_state(nullptr), buffer(0), ws_mode(false),
- shutting_down(false), connection_eof(false) { }
- void reinit();
- void* data;
- struct http_parsing_state_s* http_parsing_state;
- struct ws_state_s* ws_state;
- std::vector<char> buffer;
- uv_tcp_t tcp;
- bool ws_mode;
- bool shutting_down;
- bool connection_eof;
+ class Delegate {
+ public:
+ virtual void OnHttpGet(const std::string& path) = 0;
+ virtual void OnSocketUpgrade(const std::string& path,
+ const std::string& accept_key) = 0;
+ virtual void OnWsFrame(const std::vector<char>& frame) = 0;
+ virtual ~Delegate() {}
+ };
- private:
- DISALLOW_COPY_AND_ASSIGN(InspectorSocket);
-};
+ using DelegatePointer = std::unique_ptr<Delegate>;
+ using Pointer = std::unique_ptr<InspectorSocket>;
+
+ static Pointer Accept(uv_stream_t* server, DelegatePointer delegate);
-int inspector_accept(uv_stream_t* server, InspectorSocket* inspector,
- handshake_cb callback);
+ ~InspectorSocket();
-void inspector_close(InspectorSocket* inspector,
- inspector_cb callback);
+ void AcceptUpgrade(const std::string& accept_key);
+ void CancelHandshake();
+ void Write(const char* data, size_t len);
+ void SwitchProtocol(ProtocolHandler* handler);
+ std::string GetHost();
-// Callbacks will receive stream handles. Use inspector_from_stream to get
-// InspectorSocket* from the stream handle.
-int inspector_read_start(InspectorSocket* inspector, uv_alloc_cb,
- uv_read_cb);
-void inspector_read_stop(InspectorSocket* inspector);
-void inspector_write(InspectorSocket* inspector,
- const char* data, size_t len);
-bool inspector_is_active(const InspectorSocket* inspector);
+ private:
+ InspectorSocket();
-inline InspectorSocket* inspector_from_stream(uv_tcp_t* stream) {
- return node::ContainerOf(&InspectorSocket::tcp, stream);
-}
+ std::unique_ptr<ProtocolHandler, void(*)(ProtocolHandler*)> protocol_handler_;
-inline InspectorSocket* inspector_from_stream(uv_stream_t* stream) {
- return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
-}
+ DISALLOW_COPY_AND_ASSIGN(InspectorSocket);
+};
-inline InspectorSocket* inspector_from_stream(uv_handle_t* stream) {
- return inspector_from_stream(reinterpret_cast<uv_tcp_t*>(stream));
-}
} // namespace inspector
} // namespace node
diff --git a/src/inspector_socket_server.cc b/src/inspector_socket_server.cc
index 958c41a654a..bf114251cb1 100644
--- a/src/inspector_socket_server.cc
+++ b/src/inspector_socket_server.cc
@@ -33,7 +33,6 @@ std::string FormatWsAddress(const std::string& host, int port,
return url.str();
}
-
namespace {
static const uint8_t PROTOCOL_JSON[] = {
@@ -114,8 +113,8 @@ void SendHttpResponse(InspectorSocket* socket, const std::string& response) {
"\r\n";
char header[sizeof(HEADERS) + 20];
int header_len = snprintf(header, sizeof(header), HEADERS, response.size());
- inspector_write(socket, header, header_len);
- inspector_write(socket, response.data(), response.size());
+ socket->Write(header, header_len);
+ socket->Write(response.data(), response.size());
}
void SendVersionResponse(InspectorSocket* socket) {
@@ -145,28 +144,6 @@ void SendProtocolJson(InspectorSocket* socket) {
CHECK_EQ(Z_OK, inflateEnd(&strm));
SendHttpResponse(socket, data);
}
-
-int GetSocketHost(uv_tcp_t* socket, std::string* out_host) {
- char ip[INET6_ADDRSTRLEN];
- sockaddr_storage addr;
- int len = sizeof(addr);
- int err = uv_tcp_getsockname(socket,
- reinterpret_cast<struct sockaddr*>(&addr),
- &len);
- if (err != 0)
- return err;
- if (addr.ss_family == AF_INET6) {
- const sockaddr_in6* v6 = reinterpret_cast<const sockaddr_in6*>(&addr);
- err = uv_ip6_name(v6, ip, sizeof(ip));
- } else {
- const sockaddr_in* v4 = reinterpret_cast<const sockaddr_in*>(&addr);
- err = uv_ip4_name(v4, ip, sizeof(ip));
- }
- if (err != 0)
- return err;
- *out_host = ip;
- return err;
-}
} // namespace
@@ -209,46 +186,58 @@ class Closer {
class SocketSession {
public:
- static int Accept(InspectorSocketServer* server, int server_port,
- uv_stream_t* server_socket);
+ SocketSession(InspectorSocketServer* server, int id, int server_port);
+ void Close() {
+ ws_socket_.reset();
+ }
void Send(const std::string& message);
- void Close();
-
+ void Own(InspectorSocket::Pointer ws_socket) {
+ ws_socket_ = std::move(ws_socket);
+ }
int id() const { return id_; }
- bool IsForTarget(const std::string& target_id) const {
- return target_id_ == target_id;
+ int server_port() {
+ return server_port_;
}
- static int ServerPortForClient(InspectorSocket* client) {
- return From(client)->server_port_;
+ InspectorSocket* ws_socket() {
+ return ws_socket_.get();
}
-
- private:
- SocketSession(InspectorSocketServer* server, int server_port);
- static SocketSession* From(InspectorSocket* socket) {
- return node::ContainerOf(&SocketSession::socket_, socket);
+ void set_ws_key(const std::string& ws_key) {
+ ws_key_ = ws_key;
+ }
+ void Accept() {
+ ws_socket_->AcceptUpgrade(ws_key_);
+ }
+ void Decline() {
+ ws_socket_->CancelHandshake();
}
- enum class State { kHttp, kWebSocket, kClosing, kEOF, kDeclined };
- static bool HandshakeCallback(InspectorSocket* socket,
- enum inspector_handshake_event state,
- const std::string& path);
- static void ReadCallback(uv_stream_t* stream, ssize_t read,
- const uv_buf_t* buf);
- static void CloseCallback(InspectorSocket* socket, int code);
+ class Delegate : public InspectorSocket::Delegate {
+ public:
+ Delegate(InspectorSocketServer* server, int session_id)
+ : server_(server), session_id_(session_id) { }
+ ~Delegate() {
+ server_->SessionTerminated(session_id_);
+ }
+ void OnHttpGet(const std::string& path) override;
+ void OnSocketUpgrade(const std::string& path,
+ const std::string& ws_key) override;
+ void OnWsFrame(const std::vector<char>& data) override;
+
+ private:
+ SocketSession* Session() {
+ return server_->Session(session_id_);
+ }
- void FrontendConnected();
- void SetDeclined() { state_ = State::kDeclined; }
- void SetTargetId(const std::string& target_id) {
- CHECK(target_id_.empty());
- target_id_ = target_id;
- }
+ InspectorSocketServer* server_;
+ int session_id_;
+ };
+ private:
const int id_;
- InspectorSocket socket_;
+ InspectorSocket::Pointer ws_socket_;
InspectorSocketServer* server_;
- std::string target_id_;
- State state_;
const int server_port_;
+ std::string ws_key_;
};
class ServerSocket {
@@ -269,7 +258,6 @@ class ServerSocket {
return node::ContainerOf(&ServerSocket::tcp_socket_,
reinterpret_cast<uv_tcp_t*>(socket));
}
-
static void SocketConnectedCallback(uv_stream_t* tcp_socket, int status);
static void SocketClosedCallback(uv_handle_t* tcp_socket);
static void FreeOnCloseCallback(uv_handle_t* tcp_socket_) {
@@ -296,41 +284,57 @@ InspectorSocketServer::InspectorSocketServer(SocketServerDelegate* delegate,
state_ = ServerState::kNew;
}
-bool InspectorSocketServer::SessionStarted(SocketSession* session,
- const std::string& id) {
- if (TargetExists(id) && delegate_->StartSession(session->id(), id)) {
- connected_sessions_[session->id()] = session;
- return true;
- } else {
- return false;
+InspectorSocketServer::~InspectorSocketServer() = default;
+
+SocketSession* InspectorSocketServer::Session(int session_id) {
+ auto it = connected_sessions_.find(session_id);
+ return it == connected_sessions_.end() ? nullptr : it->second.second.get();
+}
+
+void InspectorSocketServer::SessionStarted(int session_id,
+ const std::string& id,
+ const std::string& ws_key) {
+ SocketSession* session = Session(session_id);
+ if (!TargetExists(id)) {
+ Session(session_id)->Decline();
+ return;
}
+ connected_sessions_[session_id].first = id;
+ session->set_ws_key(ws_key);
+ delegate_->StartSession(session_id, id);
}
-void InspectorSocketServer::SessionTerminated(SocketSession* session) {
- int id = session->id();
- if (connected_sessions_.erase(id) != 0) {
- delegate_->EndSession(id);
- if (connected_sessions_.empty()) {
- if (state_ == ServerState::kRunning && !server_sockets_.empty()) {
- PrintDebuggerReadyMessage(host_, server_sockets_[0]->port(),
- delegate_->GetTargetIds(), out_);
- }
- if (state_ == ServerState::kStopped) {
- delegate_->ServerDone();
- }
+void InspectorSocketServer::SessionTerminated(int session_id) {
+ if (Session(session_id) == nullptr) {
+ return;
+ }
+ bool was_attached = connected_sessions_[session_id].first != "";
+ if (was_attached) {
+ delegate_->EndSession(session_id);
+ }
+ connected_sessions_.erase(session_id);
+ if (connected_sessions_.empty()) {
+ if (was_attached && state_ == ServerState::kRunning
+ && !server_sockets_.empty()) {
+ PrintDebuggerReadyMessage(host_, server_sockets_[0]->port(),
+ delegate_->GetTargetIds(), out_);
+ }
+ if (state_ == ServerState::kStopped) {
+ delegate_->ServerDone();
}
}
- delete session;
}
-bool InspectorSocketServer::HandleGetRequest(InspectorSocket* socket,
+bool InspectorSocketServer::HandleGetRequest(int session_id,
const std::string& path) {
+ SocketSession* session = Session(session_id);
+ InspectorSocket* socket = session->ws_socket();
const char* command = MatchPathSegment(path.c_str(), "/json");
if (command == nullptr)
return false;
if (MatchPathSegment(command, "list") || command[0] == '\0') {
- SendListResponse(socket);
+ SendListResponse(socket, session);
return true;
} else if (MatchPathSegment(command, "protocol")) {
SendProtocolJson(socket);
@@ -348,7 +352,8 @@ bool InspectorSocketServer::HandleGetRequest(InspectorSocket* socket,
return false;
}
-void InspectorSocketServer::SendListResponse(InspectorSocket* socket) {
+void InspectorSocketServer::SendListResponse(InspectorSocket* socket,
+ SocketSession* session) {
std::vector<std::map<std::string, std::string>> response;
for (const std::string& id : delegate_->GetTargetIds()) {
response.push_back(std::map<std::string, std::string>());
@@ -366,15 +371,14 @@ void InspectorSocketServer::SendListResponse(InspectorSocket* socket) {
bool connected = false;
for (const auto& session : connected_sessions_) {
- if (session.second->IsForTarget(id)) {
+ if (session.second.first == id) {
connected = true;
break;
}
}
if (!connected) {
- std::string host;
- int port = SocketSession::ServerPortForClient(socket);
- GetSocketHost(&socket->tcp, &host);
+ std::string host = socket->GetHost();
+ int port = session->server_port();
std::ostringstream frontend_url;
frontend_url << "chrome-devtools://devtools/bundled";
frontend_url << "/inspector.html?experiments=true&v8only=true&ws=";
@@ -444,9 +448,8 @@ void InspectorSocketServer::Stop(ServerCallback cb) {
}
void InspectorSocketServer::TerminateConnections() {
- for (const auto& session : connected_sessions_) {
- session.second->Close();
- }
+ for (const auto& key_value : connected_sessions_)
+ key_value.second.second->Close();
}
bool InspectorSocketServer::TargetExists(const std::string& id) {
@@ -455,13 +458,6 @@ bool InspectorSocketServer::TargetExists(const std::string& id) {
return found != target_ids.end();
}
-void InspectorSocketServer::Send(int session_id, const std::string& message) {
- auto session_iterator = connected_sessions_.find(session_id);
- if (session_iterator != connected_sessions_.end()) {
- session_iterator->second->Send(message);
- }
-}
-
void InspectorSocketServer::ServerSocketListening(ServerSocket* server_socket) {
server_sockets_.push_back(server_socket);
}
@@ -491,92 +487,73 @@ int InspectorSocketServer::Port() const {
return port_;
}
-// InspectorSession tracking
-SocketSession::SocketSession(InspectorSocketServer* server, int server_port)
- : id_(server->GenerateSessionId()),
- server_(server),
- state_(State::kHttp),
- server_port_(server_port) { }
+void InspectorSocketServer::Accept(int server_port,
+ uv_stream_t* server_socket) {
+ std::unique_ptr<SocketSession> session(
+ new SocketSession(this, next_session_id_++, server_port));
+
+ InspectorSocket::DelegatePointer delegate =
+ InspectorSocket::DelegatePointer(
+ new SocketSession::Delegate(this, session->id()));
-void SocketSession::Close() {
- CHECK_NE(state_, State::kClosing);
- state_ = State::kClosing;
- inspector_close(&socket_, CloseCallback);
+ InspectorSocket::Pointer inspector =
+ InspectorSocket::Accept(server_socket, std::move(delegate));
+ if (inspector) {
+ session->Own(std::move(inspector));
+ connected_sessions_[session->id()].second = std::move(session);
+ }
}
-// static
-int SocketSession::Accept(InspectorSocketServer* server, int server_port,
- uv_stream_t* server_socket) {
- // Memory is freed when the socket closes.
- SocketSession* session = new SocketSession(server, server_port);
- int err = inspector_accept(server_socket, &session->socket_,
- HandshakeCallback);
- if (err != 0) {
- delete session;
+void InspectorSocketServer::AcceptSession(int session_id) {
+ SocketSession* session = Session(session_id);
+ if (session == nullptr) {
+ delegate_->EndSession(session_id);
+ } else {
+ session->Accept();
}
- return err;
}
-// static
-bool SocketSession::HandshakeCallback(InspectorSocket* socket,
- inspector_handshake_event event,
- const std::string& path) {
- SocketSession* session = SocketSession::From(socket);
- InspectorSocketServer* server = session->server_;
- const std::string& id = path.empty() ? path : path.substr(1);
- switch (event) {
- case kInspectorHandshakeHttpGet:
- return server->HandleGetRequest(socket, path);
- case kInspectorHandshakeUpgrading:
- if (server->SessionStarted(session, id)) {
- session->SetTargetId(id);
- return true;
- } else {
- session->SetDeclined();
- return false;
- }
- case kInspectorHandshakeUpgraded:
- session->FrontendConnected();
- return true;
- case kInspectorHandshakeFailed:
- server->SessionTerminated(session);
- return false;
- default:
- UNREACHABLE();
- return false;
+void InspectorSocketServer::DeclineSession(int session_id) {
+ auto it = connected_sessions_.find(session_id);
+ if (it != connected_sessions_.end()) {
+ it->second.first.clear();
+ it->second.second->Decline();
}
}
-// static
-void SocketSession::CloseCallback(InspectorSocket* socket, int code) {
- SocketSession* session = SocketSession::From(socket);
- CHECK_EQ(State::kClosing, session->state_);
- session->server_->SessionTerminated(session);
+void InspectorSocketServer::Send(int session_id, const std::string& message) {
+ SocketSession* session = Session(session_id);
+ if (session != nullptr) {
+ session->Send(message);
+ }
+}
+
+// InspectorSession tracking
+SocketSession::SocketSession(InspectorSocketServer* server, int id,
+ int server_port)
+ : id_(id),
+ server_(server),
+ server_port_(server_port) { }
+
+
+void SocketSession::Send(const std::string& message) {
+ ws_socket_->Write(message.data(), message.length());
}
-void SocketSession::FrontendConnected() {
- CHECK_EQ(State::kHttp, state_);
- state_ = State::kWebSocket;
- inspector_read_start(&socket_, OnBufferAlloc, ReadCallback);
+void SocketSession::Delegate::OnHttpGet(const std::string& path) {
+ if (!server_->HandleGetRequest(session_id_, path))
+ Session()->ws_socket()->CancelHandshake();
}
-// static
-void SocketSession::ReadCallback(uv_stream_t* stream, ssize_t read,
- const uv_buf_t* buf) {
- InspectorSocket* socket = inspector_from_stream(stream);
- SocketSession* session = SocketSession::From(socket);
- if (read > 0) {
- session->server_->MessageReceived(session->id_,
- std::string(buf->base, read));
- } else {
- session->Close();
- }
- if (buf != nullptr && buf->base != nullptr)
- delete[] buf->base;
+void SocketSession::Delegate::OnSocketUpgrade(const std::string& path,
+ const std::string& ws_key) {
+ std::string id = path.empty() ? path : path.substr(1);
+ server_->SessionStarted(session_id_, id, ws_key);
}
-void SocketSession::Send(const std::string& message) {
- inspector_write(&socket_, message.data(), message.length());
+void SocketSession::Delegate::OnWsFrame(const std::vector<char>& data) {
+ server_->MessageReceived(session_id_,
+ std::string(data.data(), data.size()));
}
// ServerSocket implementation
@@ -624,8 +601,7 @@ void ServerSocket::SocketConnectedCallback(uv_stream_t* tcp_socket,
if (status == 0) {
ServerSocket* server_socket = ServerSocket::FromTcpSocket(tcp_socket);
// Memory is freed when the socket closes.
- SocketSession::Accept(server_socket->server_, server_socket->port_,
- tcp_socket);
+ server_socket->server_->Accept(server_socket->port_, tcp_socket);
}
}
diff --git a/src/inspector_socket_server.h b/src/inspector_socket_server.h
index 16b047da333..b193e33a46d 100644
--- a/src/inspector_socket_server.h
+++ b/src/inspector_socket_server.h
@@ -22,7 +22,7 @@ class ServerSocket;
class SocketServerDelegate {
public:
- virtual bool StartSession(int session_id, const std::string& target_id) = 0;
+ virtual void StartSession(int session_id, const std::string& target_id) = 0;
virtual void EndSession(int session_id) = 0;
virtual void MessageReceived(int session_id, const std::string& message) = 0;
virtual std::vector<std::string> GetTargetIds() = 0;
@@ -34,8 +34,6 @@ class SocketServerDelegate {
// HTTP Server, writes messages requested as TransportActions, and responds
// to HTTP requests and WS upgrades.
-
-
class InspectorSocketServer {
public:
using ServerCallback = void (*)(InspectorSocketServer*);
@@ -44,6 +42,8 @@ class InspectorSocketServer {
const std::string& host,
int port,
FILE* out = stderr);
+ ~InspectorSocketServer();
+
// Start listening on host/port
bool Start();
@@ -54,6 +54,10 @@ class InspectorSocketServer {
void Send(int session_id, const std::string& message);
// kKill
void TerminateConnections();
+ // kAcceptSession
+ void AcceptSession(int session_id);
+ // kDeclineSession
+ void DeclineSession(int session_id);
int Port() const;
@@ -62,19 +66,18 @@ class InspectorSocketServer {
void ServerSocketClosed(ServerSocket* server_socket);
// Session connection lifecycle
- bool HandleGetRequest(InspectorSocket* socket, const std::string& path);
- bool SessionStarted(SocketSession* session, const std::string& id);
- void SessionTerminated(SocketSession* session);
+ void Accept(int server_port, uv_stream_t* server_socket);
+ bool HandleGetRequest(int session_id, const std::string& path);
+ void SessionStarted(int session_id, const std::string& target_id,
+ const std::string& ws_id);
+ void SessionTerminated(int session_id);
void MessageReceived(int session_id, const std::string& message) {
delegate_->MessageReceived(session_id, message);
}
-
- int GenerateSessionId() {
- return next_session_id_++;
- }
+ SocketSession* Session(int session_id);
private:
- void SendListResponse(InspectorSocket* socket);
+ void SendListResponse(InspectorSocket* socket, SocketSession* session);
bool TargetExists(const std::string& id);
enum class ServerState {kNew, kRunning, kStopping, kStopped};
@@ -85,7 +88,8 @@ class InspectorSocketServer {
std::string path_;
std::vector<ServerSocket*> server_sockets_;
Closer* closer_;
- std::map<int, SocketSession*> connected_sessions_;
+ std::map<int, std::pair<std::string, std::unique_ptr<SocketSession>>>
+ connected_sessions_;
int next_session_id_;
FILE* out_;
ServerState state_;
diff --git a/src/node.cc b/src/node.cc
index ed2f0c14ff3..3af764d1fba 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -2006,7 +2006,7 @@ static void InitGroups(const FunctionCallbackInfo<Value>& args) {
static void WaitForInspectorDisconnect(Environment* env) {
#if HAVE_INSPECTOR
- if (env->inspector_agent()->IsConnected()) {
+ if (env->inspector_agent()->delegate() != nullptr) {
// Restore signal dispositions, the app is done and is no longer
// capable of handling signals.
#if defined(__POSIX__) && !defined(NODE_SHARED_MODE)