diff options
author | eidheim <eidheim@gmail.com> | 2014-08-05 23:38:18 +0400 |
---|---|---|
committer | eidheim <eidheim@gmail.com> | 2014-08-05 23:38:18 +0400 |
commit | 7b0c98ce2a1f9e0a7d4d3459b8ff6592f9f3cf9c (patch) | |
tree | 4a24f731a971bbb1cb271fd3f7286994c7a4dfce | |
parent | 12d9e53291912c90bdcc5973ee261330fa38b767 (diff) |
Fixed pointer bugv1.03
-rw-r--r-- | main_ws.cpp | 6 | ||||
-rw-r--r-- | main_wss.cpp | 8 | ||||
-rw-r--r-- | server_ws.hpp | 116 | ||||
-rw-r--r-- | server_wss.hpp | 5 |
4 files changed, 75 insertions, 60 deletions
diff --git a/main_ws.cpp b/main_ws.cpp index 1c223e5..3380e21 100644 --- a/main_ws.cpp +++ b/main_ws.cpp @@ -26,7 +26,7 @@ int main() { response_stream << response; //server.send is an asynchronous function - server.send(connection.id, response_stream, [](const boost::system::error_code& ec){ + server.send(connection, response_stream, [](const boost::system::error_code& ec){ if(!ec) cout << "Message sent successfully" << endl; else { @@ -67,12 +67,12 @@ int main() { string response=message_stream.str()+" from "+to_string((size_t)connection.id); - for(auto connection_id: server.get_connection_ids()) { + for(auto connection_pointer: server.get_connection_pointers()) { stringstream response_stream; response_stream << response; //server.send is an asynchronous function - server.send(connection_id, response_stream); + server.send(*connection_pointer, response_stream); } }; diff --git a/main_wss.cpp b/main_wss.cpp index 95efbd0..26ff91f 100644 --- a/main_wss.cpp +++ b/main_wss.cpp @@ -26,7 +26,7 @@ int main() { response_stream << response; //server.send is an asynchronous function - server.send(connection.id, response_stream, [](const boost::system::error_code& ec){ + server.send(connection, response_stream, [](const boost::system::error_code& ec){ if(!ec) cout << "Message sent successfully" << endl; else { @@ -67,12 +67,12 @@ int main() { string response=message_stream.str()+" from "+to_string((size_t)connection.id); - for(auto connection_id: server.get_connection_ids()) { + for(auto connection_pointer: server.get_connection_pointers()) { stringstream response_stream; response_stream << response; //server.send is an asynchronous function - server.send(connection_id, response_stream); + server.send(*connection_pointer, response_stream); } }; @@ -80,4 +80,4 @@ int main() { server.start(); return 0; -} +}
\ No newline at end of file diff --git a/server_ws.hpp b/server_ws.hpp index d0b95fd..d865ce8 100644 --- a/server_ws.hpp +++ b/server_ws.hpp @@ -9,12 +9,13 @@ #include <unordered_map> #include <thread> #include <mutex> -#include <unordered_set> +#include <set> #include <iostream> namespace SimpleWeb { - struct Connection { + class Connection { + public: std::string method, path, http_version; std::shared_ptr<std::istream> message; @@ -27,8 +28,7 @@ namespace SimpleWeb { void* id; }; - class WebSocketCallbacks { - public: + struct WebSocketCallbacks { std::function<void(Connection&)> onopen; std::function<void(Connection&)> onmessage; std::function<void(Connection&, const boost::system::error_code&)> onerror; @@ -40,10 +40,7 @@ namespace SimpleWeb { template <class socket_type> class SocketServerBase { public: - endpoint_type endpoint; - - SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port), - acceptor(m_io_service, m_endpoint), num_threads(num_threads) {} + endpoint_type endpoint; void start() { accept(); @@ -66,8 +63,13 @@ namespace SimpleWeb { //message_header: 129=one fragment, text, 130=one fragment, binary //See http://tools.ietf.org/html/rfc6455#section-5.2 for more information - void send(void* connection_id, std::ostream& stream, const std::function<void(const boost::system::error_code&)>& callback=nullptr, + void send(Connection& connection, std::ostream& stream, + const std::function<void(const boost::system::error_code&)>& callback=nullptr, unsigned char message_header=129) const { + if(!connections.count(&connection)) { + throw std::invalid_argument("Connection closed"); + } + std::shared_ptr<boost::asio::streambuf> write_buffer(new boost::asio::streambuf); std::ostream response(write_buffer.get()); @@ -97,27 +99,28 @@ namespace SimpleWeb { response << stream.rdbuf(); - //Needs to copy the callback-function in case its destroyed - boost::asio::async_write(*(socket_type*)connection_id, *write_buffer, - [this, write_buffer, callback, connection_id](const boost::system::error_code& ec, size_t bytes_transferred) { + //Need to copy the callback-function in case its destroyed + boost::asio::async_write(*static_cast<socket_type*>(connection.id), *write_buffer, + [this, write_buffer, callback] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(callback) { callback(ec); } }); } - std::unordered_set<void*> get_connection_ids() { - connection_ids_mutex.lock(); - auto copy=connection_ids; - connection_ids_mutex.unlock(); + std::set<Connection*> get_connection_pointers() { + connections_mutex.lock(); + auto copy=connections; + connections_mutex.unlock(); return copy; } protected: const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - std::unordered_set<void*> connection_ids; - std::mutex connection_ids_mutex; + std::set<Connection*> connections; + std::mutex connections_mutex; boost::asio::io_service m_io_service; boost::asio::ip::tcp::endpoint m_endpoint; @@ -125,7 +128,13 @@ namespace SimpleWeb { size_t num_threads; std::vector<std::thread> threads; - virtual void accept() {} + size_t timeout_request; + size_t timeout_idle; + + SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port), + acceptor(m_io_service, m_endpoint), num_threads(num_threads) {} + + virtual void accept()=0; void process_request_and_start_connection(std::shared_ptr<socket_type> socket) { //Create new read_buffer for async_read_until() @@ -133,7 +142,8 @@ namespace SimpleWeb { std::shared_ptr<boost::asio::streambuf> read_buffer(new boost::asio::streambuf); boost::asio::async_read_until(*socket, *read_buffer, "\r\n\r\n", - [this, socket, read_buffer](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, read_buffer] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { //Convert to istream to extract string-lines std::istream stream(read_buffer.get()); @@ -208,13 +218,14 @@ namespace SimpleWeb { connection->id=socket.get(); //Capture write_buffer in lambda so it is not destroyed before async_write is finished boost::asio::async_write(*socket, *write_buffer, - [this, socket, write_buffer, read_buffer, &an_endpoint, connection](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, write_buffer, read_buffer, &an_endpoint, connection] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { - connection_open(socket.get(), an_endpoint.second, *connection); + connection_open(an_endpoint.second, *connection); read_write_messages(socket, read_buffer, an_endpoint.second, connection); } else - connection_error(socket.get(), an_endpoint.second, *connection, ec); + connection_error(an_endpoint.second, *connection, ec); }); } return; @@ -222,9 +233,11 @@ namespace SimpleWeb { } } - void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) { + void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, + WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) { boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2), - [this, socket, read_buffer, &websocketcallbacks, connection](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, read_buffer, &websocketcallbacks, connection] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { std::istream stream(read_buffer.get()); @@ -239,7 +252,8 @@ namespace SimpleWeb { if(length==126) { //2 next bytes is the size of content boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2), - [this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, read_buffer, &websocketcallbacks, connection, opcode] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { std::istream stream(read_buffer.get()); @@ -255,13 +269,14 @@ namespace SimpleWeb { read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode); } else - connection_error(socket.get(), websocketcallbacks, *connection, ec); + connection_error(websocketcallbacks, *connection, ec); }); } else if(length==127) { //8 next bytes is the size of content boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(8), - [this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, read_buffer, &websocketcallbacks, connection, opcode] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { std::istream stream(read_buffer.get()); @@ -277,21 +292,24 @@ namespace SimpleWeb { read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode); } else - connection_error(socket.get(), websocketcallbacks, *connection, ec); + connection_error(websocketcallbacks, *connection, ec); }); } else read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode); } else - connection_error(socket.get(), websocketcallbacks, *connection, ec); + connection_error(websocketcallbacks, *connection, ec); }); } - void read_write_message_content(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, size_t length, - WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection, unsigned char opcode) { + void read_write_message_content(std::shared_ptr<socket_type> socket, + std::shared_ptr<boost::asio::streambuf> read_buffer, + size_t length, WebSocketCallbacks& websocketcallbacks, + std::shared_ptr<Connection> connection, unsigned char opcode) { boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(4+length), - [this, socket, read_buffer, length, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) { + [this, socket, read_buffer, length, &websocketcallbacks, connection, opcode] + (const boost::system::error_code& ec, size_t bytes_transferred) { if(!ec) { std::istream stream(read_buffer.get()); @@ -318,7 +336,7 @@ namespace SimpleWeb { status=(byte1<<8)+byte2; } - connection_close(socket.get(), websocketcallbacks, *connection, status); + connection_close(websocketcallbacks, *connection, status); return; } @@ -329,34 +347,30 @@ namespace SimpleWeb { read_write_messages(socket, read_buffer, websocketcallbacks, connection); } else - connection_error(socket.get(), websocketcallbacks, *connection, ec); + connection_error(websocketcallbacks, *connection, ec); }); } - void connection_open(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection) { - connection_ids_mutex.lock(); - connection_ids.insert(socket); - connection_ids_mutex.unlock(); + void connection_open(const WebSocketCallbacks& websocketcallbacks, Connection& connection) { + connections_mutex.lock(); + connections.insert(&connection); + connections_mutex.unlock(); if(websocketcallbacks.onopen) websocketcallbacks.onopen(connection); } - void connection_close(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) { - //((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both); - //((socket_type*)socket)->close(); - connection_ids_mutex.lock(); - connection_ids.erase(socket); - connection_ids_mutex.unlock(); + void connection_close(const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) { + connections_mutex.lock(); + connections.erase(&connection); + connections_mutex.unlock(); if(websocketcallbacks.onclose) websocketcallbacks.onclose(connection, status); } - void connection_error(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) { - //((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both); - //((socket_type*)socket)->close(); - connection_ids_mutex.lock(); - connection_ids.erase(socket); - connection_ids_mutex.unlock(); + void connection_error(const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) { + connections_mutex.lock(); + connections.erase(&connection); + connections_mutex.unlock(); if(websocketcallbacks.onerror) { boost::system::error_code ec_tmp=ec; websocketcallbacks.onerror(connection, ec_tmp); diff --git a/server_wss.hpp b/server_wss.hpp index ca1ca0a..2596d47 100644 --- a/server_wss.hpp +++ b/server_wss.hpp @@ -11,7 +11,7 @@ namespace SimpleWeb { class Server<WSS> : public SocketServerBase<WSS> { public: Server(unsigned short port, size_t num_threads, const std::string& cert_file, const std::string& private_key_file) : - SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) { + SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) { context.use_certificate_chain_file(cert_file); context.use_private_key_file(private_key_file, boost::asio::ssl::context::pem); } @@ -29,7 +29,8 @@ namespace SimpleWeb { accept(); if(!ec) { - (*socket).async_handshake(boost::asio::ssl::stream_base::server, [this, socket](const boost::system::error_code& ec) { + (*socket).async_handshake(boost::asio::ssl::stream_base::server, + [this, socket](const boost::system::error_code& ec) { if(!ec) { process_request_and_start_connection(socket); } |