Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/marian-nmt/Simple-WebSocket-Server.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoreidheim <eidheim@gmail.com>2014-08-05 23:38:18 +0400
committereidheim <eidheim@gmail.com>2014-08-05 23:38:18 +0400
commit7b0c98ce2a1f9e0a7d4d3459b8ff6592f9f3cf9c (patch)
tree4a24f731a971bbb1cb271fd3f7286994c7a4dfce
parent12d9e53291912c90bdcc5973ee261330fa38b767 (diff)
Fixed pointer bugv1.03
-rw-r--r--main_ws.cpp6
-rw-r--r--main_wss.cpp8
-rw-r--r--server_ws.hpp116
-rw-r--r--server_wss.hpp5
4 files changed, 75 insertions, 60 deletions
diff --git a/main_ws.cpp b/main_ws.cpp
index 1c223e5..3380e21 100644
--- a/main_ws.cpp
+++ b/main_ws.cpp
@@ -26,7 +26,7 @@ int main() {
response_stream << response;
//server.send is an asynchronous function
- server.send(connection.id, response_stream, [](const boost::system::error_code& ec){
+ server.send(connection, response_stream, [](const boost::system::error_code& ec){
if(!ec)
cout << "Message sent successfully" << endl;
else {
@@ -67,12 +67,12 @@ int main() {
string response=message_stream.str()+" from "+to_string((size_t)connection.id);
- for(auto connection_id: server.get_connection_ids()) {
+ for(auto connection_pointer: server.get_connection_pointers()) {
stringstream response_stream;
response_stream << response;
//server.send is an asynchronous function
- server.send(connection_id, response_stream);
+ server.send(*connection_pointer, response_stream);
}
};
diff --git a/main_wss.cpp b/main_wss.cpp
index 95efbd0..26ff91f 100644
--- a/main_wss.cpp
+++ b/main_wss.cpp
@@ -26,7 +26,7 @@ int main() {
response_stream << response;
//server.send is an asynchronous function
- server.send(connection.id, response_stream, [](const boost::system::error_code& ec){
+ server.send(connection, response_stream, [](const boost::system::error_code& ec){
if(!ec)
cout << "Message sent successfully" << endl;
else {
@@ -67,12 +67,12 @@ int main() {
string response=message_stream.str()+" from "+to_string((size_t)connection.id);
- for(auto connection_id: server.get_connection_ids()) {
+ for(auto connection_pointer: server.get_connection_pointers()) {
stringstream response_stream;
response_stream << response;
//server.send is an asynchronous function
- server.send(connection_id, response_stream);
+ server.send(*connection_pointer, response_stream);
}
};
@@ -80,4 +80,4 @@ int main() {
server.start();
return 0;
-}
+} \ No newline at end of file
diff --git a/server_ws.hpp b/server_ws.hpp
index d0b95fd..d865ce8 100644
--- a/server_ws.hpp
+++ b/server_ws.hpp
@@ -9,12 +9,13 @@
#include <unordered_map>
#include <thread>
#include <mutex>
-#include <unordered_set>
+#include <set>
#include <iostream>
namespace SimpleWeb {
- struct Connection {
+ class Connection {
+ public:
std::string method, path, http_version;
std::shared_ptr<std::istream> message;
@@ -27,8 +28,7 @@ namespace SimpleWeb {
void* id;
};
- class WebSocketCallbacks {
- public:
+ struct WebSocketCallbacks {
std::function<void(Connection&)> onopen;
std::function<void(Connection&)> onmessage;
std::function<void(Connection&, const boost::system::error_code&)> onerror;
@@ -40,10 +40,7 @@ namespace SimpleWeb {
template <class socket_type>
class SocketServerBase {
public:
- endpoint_type endpoint;
-
- SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port),
- acceptor(m_io_service, m_endpoint), num_threads(num_threads) {}
+ endpoint_type endpoint;
void start() {
accept();
@@ -66,8 +63,13 @@ namespace SimpleWeb {
//message_header: 129=one fragment, text, 130=one fragment, binary
//See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
- void send(void* connection_id, std::ostream& stream, const std::function<void(const boost::system::error_code&)>& callback=nullptr,
+ void send(Connection& connection, std::ostream& stream,
+ const std::function<void(const boost::system::error_code&)>& callback=nullptr,
unsigned char message_header=129) const {
+ if(!connections.count(&connection)) {
+ throw std::invalid_argument("Connection closed");
+ }
+
std::shared_ptr<boost::asio::streambuf> write_buffer(new boost::asio::streambuf);
std::ostream response(write_buffer.get());
@@ -97,27 +99,28 @@ namespace SimpleWeb {
response << stream.rdbuf();
- //Needs to copy the callback-function in case its destroyed
- boost::asio::async_write(*(socket_type*)connection_id, *write_buffer,
- [this, write_buffer, callback, connection_id](const boost::system::error_code& ec, size_t bytes_transferred) {
+ //Need to copy the callback-function in case its destroyed
+ boost::asio::async_write(*static_cast<socket_type*>(connection.id), *write_buffer,
+ [this, write_buffer, callback]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(callback) {
callback(ec);
}
});
}
- std::unordered_set<void*> get_connection_ids() {
- connection_ids_mutex.lock();
- auto copy=connection_ids;
- connection_ids_mutex.unlock();
+ std::set<Connection*> get_connection_pointers() {
+ connections_mutex.lock();
+ auto copy=connections;
+ connections_mutex.unlock();
return copy;
}
protected:
const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- std::unordered_set<void*> connection_ids;
- std::mutex connection_ids_mutex;
+ std::set<Connection*> connections;
+ std::mutex connections_mutex;
boost::asio::io_service m_io_service;
boost::asio::ip::tcp::endpoint m_endpoint;
@@ -125,7 +128,13 @@ namespace SimpleWeb {
size_t num_threads;
std::vector<std::thread> threads;
- virtual void accept() {}
+ size_t timeout_request;
+ size_t timeout_idle;
+
+ SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port),
+ acceptor(m_io_service, m_endpoint), num_threads(num_threads) {}
+
+ virtual void accept()=0;
void process_request_and_start_connection(std::shared_ptr<socket_type> socket) {
//Create new read_buffer for async_read_until()
@@ -133,7 +142,8 @@ namespace SimpleWeb {
std::shared_ptr<boost::asio::streambuf> read_buffer(new boost::asio::streambuf);
boost::asio::async_read_until(*socket, *read_buffer, "\r\n\r\n",
- [this, socket, read_buffer](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, read_buffer]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
//Convert to istream to extract string-lines
std::istream stream(read_buffer.get());
@@ -208,13 +218,14 @@ namespace SimpleWeb {
connection->id=socket.get();
//Capture write_buffer in lambda so it is not destroyed before async_write is finished
boost::asio::async_write(*socket, *write_buffer,
- [this, socket, write_buffer, read_buffer, &an_endpoint, connection](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, write_buffer, read_buffer, &an_endpoint, connection]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
- connection_open(socket.get(), an_endpoint.second, *connection);
+ connection_open(an_endpoint.second, *connection);
read_write_messages(socket, read_buffer, an_endpoint.second, connection);
}
else
- connection_error(socket.get(), an_endpoint.second, *connection, ec);
+ connection_error(an_endpoint.second, *connection, ec);
});
}
return;
@@ -222,9 +233,11 @@ namespace SimpleWeb {
}
}
- void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) {
+ void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer,
+ WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) {
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2),
- [this, socket, read_buffer, &websocketcallbacks, connection](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, read_buffer, &websocketcallbacks, connection]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
@@ -239,7 +252,8 @@ namespace SimpleWeb {
if(length==126) {
//2 next bytes is the size of content
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2),
- [this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, read_buffer, &websocketcallbacks, connection, opcode]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
@@ -255,13 +269,14 @@ namespace SimpleWeb {
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
- connection_error(socket.get(), websocketcallbacks, *connection, ec);
+ connection_error(websocketcallbacks, *connection, ec);
});
}
else if(length==127) {
//8 next bytes is the size of content
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(8),
- [this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, read_buffer, &websocketcallbacks, connection, opcode]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
@@ -277,21 +292,24 @@ namespace SimpleWeb {
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
- connection_error(socket.get(), websocketcallbacks, *connection, ec);
+ connection_error(websocketcallbacks, *connection, ec);
});
}
else
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
- connection_error(socket.get(), websocketcallbacks, *connection, ec);
+ connection_error(websocketcallbacks, *connection, ec);
});
}
- void read_write_message_content(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, size_t length,
- WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection, unsigned char opcode) {
+ void read_write_message_content(std::shared_ptr<socket_type> socket,
+ std::shared_ptr<boost::asio::streambuf> read_buffer,
+ size_t length, WebSocketCallbacks& websocketcallbacks,
+ std::shared_ptr<Connection> connection, unsigned char opcode) {
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(4+length),
- [this, socket, read_buffer, length, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
+ [this, socket, read_buffer, length, &websocketcallbacks, connection, opcode]
+ (const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
@@ -318,7 +336,7 @@ namespace SimpleWeb {
status=(byte1<<8)+byte2;
}
- connection_close(socket.get(), websocketcallbacks, *connection, status);
+ connection_close(websocketcallbacks, *connection, status);
return;
}
@@ -329,34 +347,30 @@ namespace SimpleWeb {
read_write_messages(socket, read_buffer, websocketcallbacks, connection);
}
else
- connection_error(socket.get(), websocketcallbacks, *connection, ec);
+ connection_error(websocketcallbacks, *connection, ec);
});
}
- void connection_open(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection) {
- connection_ids_mutex.lock();
- connection_ids.insert(socket);
- connection_ids_mutex.unlock();
+ void connection_open(const WebSocketCallbacks& websocketcallbacks, Connection& connection) {
+ connections_mutex.lock();
+ connections.insert(&connection);
+ connections_mutex.unlock();
if(websocketcallbacks.onopen)
websocketcallbacks.onopen(connection);
}
- void connection_close(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) {
- //((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
- //((socket_type*)socket)->close();
- connection_ids_mutex.lock();
- connection_ids.erase(socket);
- connection_ids_mutex.unlock();
+ void connection_close(const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) {
+ connections_mutex.lock();
+ connections.erase(&connection);
+ connections_mutex.unlock();
if(websocketcallbacks.onclose)
websocketcallbacks.onclose(connection, status);
}
- void connection_error(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) {
- //((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
- //((socket_type*)socket)->close();
- connection_ids_mutex.lock();
- connection_ids.erase(socket);
- connection_ids_mutex.unlock();
+ void connection_error(const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) {
+ connections_mutex.lock();
+ connections.erase(&connection);
+ connections_mutex.unlock();
if(websocketcallbacks.onerror) {
boost::system::error_code ec_tmp=ec;
websocketcallbacks.onerror(connection, ec_tmp);
diff --git a/server_wss.hpp b/server_wss.hpp
index ca1ca0a..2596d47 100644
--- a/server_wss.hpp
+++ b/server_wss.hpp
@@ -11,7 +11,7 @@ namespace SimpleWeb {
class Server<WSS> : public SocketServerBase<WSS> {
public:
Server(unsigned short port, size_t num_threads, const std::string& cert_file, const std::string& private_key_file) :
- SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) {
+ SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) {
context.use_certificate_chain_file(cert_file);
context.use_private_key_file(private_key_file, boost::asio::ssl::context::pem);
}
@@ -29,7 +29,8 @@ namespace SimpleWeb {
accept();
if(!ec) {
- (*socket).async_handshake(boost::asio::ssl::stream_base::server, [this, socket](const boost::system::error_code& ec) {
+ (*socket).async_handshake(boost::asio::ssl::stream_base::server,
+ [this, socket](const boost::system::error_code& ec) {
if(!ec) {
process_request_and_start_connection(socket);
}