diff options
Diffstat (limited to 'intern/cycles/device/device_network.h')
-rw-r--r-- | intern/cycles/device/device_network.h | 843 |
1 files changed, 421 insertions, 422 deletions
diff --git a/intern/cycles/device/device_network.h b/intern/cycles/device/device_network.h index 67626ae177f..5b69b815cc6 100644 --- a/intern/cycles/device/device_network.h +++ b/intern/cycles/device/device_network.h @@ -19,35 +19,35 @@ #ifdef WITH_NETWORK -#include <boost/archive/text_iarchive.hpp> -#include <boost/archive/text_oarchive.hpp> -#include <boost/archive/binary_iarchive.hpp> -#include <boost/archive/binary_oarchive.hpp> -#include <boost/array.hpp> -#include <boost/asio.hpp> -#include <boost/bind.hpp> -#include <boost/serialization/vector.hpp> -#include <boost/thread.hpp> - -#include <iostream> -#include <sstream> -#include <deque> - -#include "render/buffers.h" - -#include "util/util_foreach.h" -#include "util/util_list.h" -#include "util/util_map.h" -#include "util/util_param.h" -#include "util/util_string.h" +# include <boost/archive/text_iarchive.hpp> +# include <boost/archive/text_oarchive.hpp> +# include <boost/archive/binary_iarchive.hpp> +# include <boost/archive/binary_oarchive.hpp> +# include <boost/array.hpp> +# include <boost/asio.hpp> +# include <boost/bind.hpp> +# include <boost/serialization/vector.hpp> +# include <boost/thread.hpp> + +# include <iostream> +# include <sstream> +# include <deque> + +# include "render/buffers.h" + +# include "util/util_foreach.h" +# include "util/util_list.h" +# include "util/util_map.h" +# include "util/util_param.h" +# include "util/util_string.h" CCL_NAMESPACE_BEGIN -using std::cout; using std::cerr; +using std::cout; +using std::exception; using std::hex; using std::setw; -using std::exception; using boost::asio::ip::tcp; @@ -56,436 +56,435 @@ static const int DISCOVER_PORT = 5121; static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP"; static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP"; -#if 0 +# if 0 typedef boost::archive::text_oarchive o_archive; typedef boost::archive::text_iarchive i_archive; -#else +# else typedef boost::archive::binary_oarchive o_archive; typedef boost::archive::binary_iarchive i_archive; -#endif +# endif /* Serialization of device memory */ -class network_device_memory : public device_memory -{ -public: - network_device_memory(Device *device) - : device_memory(device, "", MEM_READ_ONLY) - { - } +class network_device_memory : public device_memory { + public: + network_device_memory(Device *device) : device_memory(device, "", MEM_READ_ONLY) + { + } - ~network_device_memory() - { - device_pointer = 0; - }; + ~network_device_memory() + { + device_pointer = 0; + }; - vector<char> local_data; + vector<char> local_data; }; /* Common netowrk error function / object for both DeviceNetwork and DeviceServer*/ class NetworkError { -public: - NetworkError() { - error = ""; - error_count = 0; - } - - ~NetworkError() {} - - void network_error(const string& message) { - error = message; - error_count += 1; - } - - bool have_error() { - return true ? error_count > 0 : false; - } - -private: - string error; - int error_count; + public: + NetworkError() + { + error = ""; + error_count = 0; + } + + ~NetworkError() + { + } + + void network_error(const string &message) + { + error = message; + error_count += 1; + } + + bool have_error() + { + return true ? error_count > 0 : false; + } + + private: + string error; + int error_count; }; - /* Remote procedure call Send */ class RPCSend { -public: - RPCSend(tcp::socket& socket_, NetworkError* e, const string& name_ = "") - : name(name_), socket(socket_), archive(archive_stream), sent(false) - { - archive & name_; - error_func = e; - fprintf(stderr, "rpc send %s\n", name.c_str()); - } - - ~RPCSend() - { - } - - void add(const device_memory& mem) - { - archive & mem.data_type & mem.data_elements & mem.data_size; - archive & mem.data_width & mem.data_height & mem.data_depth & mem.device_pointer; - archive & mem.type & string(mem.name); - archive & mem.interpolation & mem.extension; - archive & mem.device_pointer; - } - - template<typename T> void add(const T& data) - { - archive & data; - } - - void add(const DeviceTask& task) - { - int type = (int)task.type; - archive & type & task.x & task.y & task.w & task.h; - archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples; - archive & task.offset & task.stride; - archive & task.shader_input & task.shader_output & task.shader_eval_type; - archive & task.shader_x & task.shader_w; - archive & task.need_finish_queue; - } - - void add(const RenderTile& tile) - { - archive & tile.x & tile.y & tile.w & tile.h; - archive & tile.start_sample & tile.num_samples & tile.sample; - archive & tile.resolution & tile.offset & tile.stride; - archive & tile.buffer; - } - - void write() - { - boost::system::error_code error; - - /* get string from stream */ - string archive_str = archive_stream.str(); - - /* first send fixed size header with size of following data */ - ostringstream header_stream; - header_stream << setw(8) << hex << archive_str.size(); - string header_str = header_stream.str(); - - boost::asio::write(socket, - boost::asio::buffer(header_str), - boost::asio::transfer_all(), error); - - if(error.value()) - error_func->network_error(error.message()); - - /* then send actual data */ - boost::asio::write(socket, - boost::asio::buffer(archive_str), - boost::asio::transfer_all(), error); - - if(error.value()) - error_func->network_error(error.message()); - - sent = true; - } - - void write_buffer(void *buffer, size_t size) - { - boost::system::error_code error; - - boost::asio::write(socket, - boost::asio::buffer(buffer, size), - boost::asio::transfer_all(), error); - - if(error.value()) - error_func->network_error(error.message()); - } - -protected: - string name; - tcp::socket& socket; - ostringstream archive_stream; - o_archive archive; - bool sent; - NetworkError *error_func; + public: + RPCSend(tcp::socket &socket_, NetworkError *e, const string &name_ = "") + : name(name_), socket(socket_), archive(archive_stream), sent(false) + { + archive &name_; + error_func = e; + fprintf(stderr, "rpc send %s\n", name.c_str()); + } + + ~RPCSend() + { + } + + void add(const device_memory &mem) + { + archive &mem.data_type &mem.data_elements &mem.data_size; + archive &mem.data_width &mem.data_height &mem.data_depth &mem.device_pointer; + archive &mem.type &string(mem.name); + archive &mem.interpolation &mem.extension; + archive &mem.device_pointer; + } + + template<typename T> void add(const T &data) + { + archive &data; + } + + void add(const DeviceTask &task) + { + int type = (int)task.type; + archive &type &task.x &task.y &task.w &task.h; + archive &task.rgba_byte &task.rgba_half &task.buffer &task.sample &task.num_samples; + archive &task.offset &task.stride; + archive &task.shader_input &task.shader_output &task.shader_eval_type; + archive &task.shader_x &task.shader_w; + archive &task.need_finish_queue; + } + + void add(const RenderTile &tile) + { + archive &tile.x &tile.y &tile.w &tile.h; + archive &tile.start_sample &tile.num_samples &tile.sample; + archive &tile.resolution &tile.offset &tile.stride; + archive &tile.buffer; + } + + void write() + { + boost::system::error_code error; + + /* get string from stream */ + string archive_str = archive_stream.str(); + + /* first send fixed size header with size of following data */ + ostringstream header_stream; + header_stream << setw(8) << hex << archive_str.size(); + string header_str = header_stream.str(); + + boost::asio::write( + socket, boost::asio::buffer(header_str), boost::asio::transfer_all(), error); + + if (error.value()) + error_func->network_error(error.message()); + + /* then send actual data */ + boost::asio::write( + socket, boost::asio::buffer(archive_str), boost::asio::transfer_all(), error); + + if (error.value()) + error_func->network_error(error.message()); + + sent = true; + } + + void write_buffer(void *buffer, size_t size) + { + boost::system::error_code error; + + boost::asio::write( + socket, boost::asio::buffer(buffer, size), boost::asio::transfer_all(), error); + + if (error.value()) + error_func->network_error(error.message()); + } + + protected: + string name; + tcp::socket &socket; + ostringstream archive_stream; + o_archive archive; + bool sent; + NetworkError *error_func; }; /* Remote procedure call Receive */ class RPCReceive { -public: - RPCReceive(tcp::socket& socket_, NetworkError* e ) - : socket(socket_), archive_stream(NULL), archive(NULL) - { - error_func = e; - /* read head with fixed size */ - vector<char> header(8); - boost::system::error_code error; - size_t len = boost::asio::read(socket, boost::asio::buffer(header), error); - - if(error.value()) { - error_func->network_error(error.message()); - } - - /* verify if we got something */ - if(len == header.size()) { - /* decode header */ - string header_str(&header[0], header.size()); - istringstream header_stream(header_str); - - size_t data_size; - - if((header_stream >> hex >> data_size)) { - - vector<char> data(data_size); - size_t len = boost::asio::read(socket, boost::asio::buffer(data), error); - - if(error.value()) - error_func->network_error(error.message()); - - - if(len == data_size) { - archive_str = (data.size())? string(&data[0], data.size()): string(""); - - archive_stream = new istringstream(archive_str); - archive = new i_archive(*archive_stream); - - *archive & name; - fprintf(stderr, "rpc receive %s\n", name.c_str()); - } - else { - error_func->network_error("Network receive error: data size doesn't match header"); - } - } - else { - error_func->network_error("Network receive error: can't decode data size from header"); - } - } - else { - error_func->network_error("Network receive error: invalid header size"); - } - } - - ~RPCReceive() - { - delete archive; - delete archive_stream; - } - - void read(network_device_memory& mem, string& name) - { - *archive & mem.data_type & mem.data_elements & mem.data_size; - *archive & mem.data_width & mem.data_height & mem.data_depth & mem.device_pointer; - *archive & mem.type & name; - *archive & mem.interpolation & mem.extension; - *archive & mem.device_pointer; - - mem.name = name.c_str(); - mem.host_pointer = 0; - - /* Can't transfer OpenGL texture over network. */ - if(mem.type == MEM_PIXELS) { - mem.type = MEM_READ_WRITE; - } - } - - template<typename T> void read(T& data) - { - *archive & data; - } - - void read_buffer(void *buffer, size_t size) - { - boost::system::error_code error; - size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size), error); - - if(error.value()) { - error_func->network_error(error.message()); - } - - if(len != size) - cout << "Network receive error: buffer size doesn't match expected size\n"; - } - - void read(DeviceTask& task) - { - int type; - - *archive & type & task.x & task.y & task.w & task.h; - *archive & task.rgba_byte & task.rgba_half & task.buffer & task.sample & task.num_samples; - *archive & task.offset & task.stride; - *archive & task.shader_input & task.shader_output & task.shader_eval_type; - *archive & task.shader_x & task.shader_w; - *archive & task.need_finish_queue; - - task.type = (DeviceTask::Type)type; - } - - void read(RenderTile& tile) - { - *archive & tile.x & tile.y & tile.w & tile.h; - *archive & tile.start_sample & tile.num_samples & tile.sample; - *archive & tile.resolution & tile.offset & tile.stride; - *archive & tile.buffer; - - tile.buffers = NULL; - } - - string name; - -protected: - tcp::socket& socket; - string archive_str; - istringstream *archive_stream; - i_archive *archive; - NetworkError *error_func; + public: + RPCReceive(tcp::socket &socket_, NetworkError *e) + : socket(socket_), archive_stream(NULL), archive(NULL) + { + error_func = e; + /* read head with fixed size */ + vector<char> header(8); + boost::system::error_code error; + size_t len = boost::asio::read(socket, boost::asio::buffer(header), error); + + if (error.value()) { + error_func->network_error(error.message()); + } + + /* verify if we got something */ + if (len == header.size()) { + /* decode header */ + string header_str(&header[0], header.size()); + istringstream header_stream(header_str); + + size_t data_size; + + if ((header_stream >> hex >> data_size)) { + + vector<char> data(data_size); + size_t len = boost::asio::read(socket, boost::asio::buffer(data), error); + + if (error.value()) + error_func->network_error(error.message()); + + if (len == data_size) { + archive_str = (data.size()) ? string(&data[0], data.size()) : string(""); + + archive_stream = new istringstream(archive_str); + archive = new i_archive(*archive_stream); + + *archive &name; + fprintf(stderr, "rpc receive %s\n", name.c_str()); + } + else { + error_func->network_error("Network receive error: data size doesn't match header"); + } + } + else { + error_func->network_error("Network receive error: can't decode data size from header"); + } + } + else { + error_func->network_error("Network receive error: invalid header size"); + } + } + + ~RPCReceive() + { + delete archive; + delete archive_stream; + } + + void read(network_device_memory &mem, string &name) + { + *archive &mem.data_type &mem.data_elements &mem.data_size; + *archive &mem.data_width &mem.data_height &mem.data_depth &mem.device_pointer; + *archive &mem.type &name; + *archive &mem.interpolation &mem.extension; + *archive &mem.device_pointer; + + mem.name = name.c_str(); + mem.host_pointer = 0; + + /* Can't transfer OpenGL texture over network. */ + if (mem.type == MEM_PIXELS) { + mem.type = MEM_READ_WRITE; + } + } + + template<typename T> void read(T &data) + { + *archive &data; + } + + void read_buffer(void *buffer, size_t size) + { + boost::system::error_code error; + size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size), error); + + if (error.value()) { + error_func->network_error(error.message()); + } + + if (len != size) + cout << "Network receive error: buffer size doesn't match expected size\n"; + } + + void read(DeviceTask &task) + { + int type; + + *archive &type &task.x &task.y &task.w &task.h; + *archive &task.rgba_byte &task.rgba_half &task.buffer &task.sample &task.num_samples; + *archive &task.offset &task.stride; + *archive &task.shader_input &task.shader_output &task.shader_eval_type; + *archive &task.shader_x &task.shader_w; + *archive &task.need_finish_queue; + + task.type = (DeviceTask::Type)type; + } + + void read(RenderTile &tile) + { + *archive &tile.x &tile.y &tile.w &tile.h; + *archive &tile.start_sample &tile.num_samples &tile.sample; + *archive &tile.resolution &tile.offset &tile.stride; + *archive &tile.buffer; + + tile.buffers = NULL; + } + + string name; + + protected: + tcp::socket &socket; + string archive_str; + istringstream *archive_stream; + i_archive *archive; + NetworkError *error_func; }; /* Server auto discovery */ class ServerDiscovery { -public: - explicit ServerDiscovery(bool discover = false) - : listen_socket(io_service), collect_servers(false) - { - /* setup listen socket */ - listen_endpoint.address(boost::asio::ip::address_v4::any()); - listen_endpoint.port(DISCOVER_PORT); - - listen_socket.open(listen_endpoint.protocol()); - - boost::asio::socket_base::reuse_address option(true); - listen_socket.set_option(option); - - listen_socket.bind(listen_endpoint); - - /* setup receive callback */ - async_receive(); - - /* start server discovery */ - if(discover) { - collect_servers = true; - servers.clear(); - - broadcast_message(DISCOVER_REQUEST_MSG); - } - - /* start thread */ - work = new boost::asio::io_service::work(io_service); - thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)); - } - - ~ServerDiscovery() - { - io_service.stop(); - thread->join(); - delete thread; - delete work; - } - - vector<string> get_server_list() - { - vector<string> result; - - mutex.lock(); - result = vector<string>(servers.begin(), servers.end()); - mutex.unlock(); - - return result; - } - -private: - void handle_receive_from(const boost::system::error_code& error, size_t size) - { - if(error) { - cout << "Server discovery receive error: " << error.message() << "\n"; - return; - } - - if(size > 0) { - string msg = string(receive_buffer, size); - - /* handle incoming message */ - if(collect_servers) { - if(msg == DISCOVER_REPLY_MSG) { - string address = receive_endpoint.address().to_string(); - - mutex.lock(); - - /* add address if it's not already in the list */ - bool found = std::find(servers.begin(), servers.end(), - address) != servers.end(); - - if(!found) - servers.push_back(address); - - mutex.unlock(); - } - } - else { - /* reply to request */ - if(msg == DISCOVER_REQUEST_MSG) - broadcast_message(DISCOVER_REPLY_MSG); - } - } - - async_receive(); - } - - void async_receive() - { - listen_socket.async_receive_from( - boost::asio::buffer(receive_buffer), receive_endpoint, - boost::bind(&ServerDiscovery::handle_receive_from, this, - boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); - } - - void broadcast_message(const string& msg) - { - /* setup broadcast socket */ - boost::asio::ip::udp::socket socket(io_service); - - socket.open(boost::asio::ip::udp::v4()); - - boost::asio::socket_base::broadcast option(true); - socket.set_option(option); - - boost::asio::ip::udp::endpoint broadcast_endpoint( - boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT); - - /* broadcast message */ - socket.send_to(boost::asio::buffer(msg), broadcast_endpoint); - } - - /* network service and socket */ - boost::asio::io_service io_service; - boost::asio::ip::udp::endpoint listen_endpoint; - boost::asio::ip::udp::socket listen_socket; - - /* threading */ - boost::thread *thread; - boost::asio::io_service::work *work; - boost::mutex mutex; - - /* buffer and endpoint for receiving messages */ - char receive_buffer[256]; - boost::asio::ip::udp::endpoint receive_endpoint; - - // os, version, devices, status, host name, group name, ip as far as fields go - struct ServerInfo { - string cycles_version; - string os; - int device_count; - string status; - string host_name; - string group_name; - string host_addr; - }; - - /* collection of server addresses in list */ - bool collect_servers; - vector<string> servers; + public: + explicit ServerDiscovery(bool discover = false) + : listen_socket(io_service), collect_servers(false) + { + /* setup listen socket */ + listen_endpoint.address(boost::asio::ip::address_v4::any()); + listen_endpoint.port(DISCOVER_PORT); + + listen_socket.open(listen_endpoint.protocol()); + + boost::asio::socket_base::reuse_address option(true); + listen_socket.set_option(option); + + listen_socket.bind(listen_endpoint); + + /* setup receive callback */ + async_receive(); + + /* start server discovery */ + if (discover) { + collect_servers = true; + servers.clear(); + + broadcast_message(DISCOVER_REQUEST_MSG); + } + + /* start thread */ + work = new boost::asio::io_service::work(io_service); + thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)); + } + + ~ServerDiscovery() + { + io_service.stop(); + thread->join(); + delete thread; + delete work; + } + + vector<string> get_server_list() + { + vector<string> result; + + mutex.lock(); + result = vector<string>(servers.begin(), servers.end()); + mutex.unlock(); + + return result; + } + + private: + void handle_receive_from(const boost::system::error_code &error, size_t size) + { + if (error) { + cout << "Server discovery receive error: " << error.message() << "\n"; + return; + } + + if (size > 0) { + string msg = string(receive_buffer, size); + + /* handle incoming message */ + if (collect_servers) { + if (msg == DISCOVER_REPLY_MSG) { + string address = receive_endpoint.address().to_string(); + + mutex.lock(); + + /* add address if it's not already in the list */ + bool found = std::find(servers.begin(), servers.end(), address) != servers.end(); + + if (!found) + servers.push_back(address); + + mutex.unlock(); + } + } + else { + /* reply to request */ + if (msg == DISCOVER_REQUEST_MSG) + broadcast_message(DISCOVER_REPLY_MSG); + } + } + + async_receive(); + } + + void async_receive() + { + listen_socket.async_receive_from(boost::asio::buffer(receive_buffer), + receive_endpoint, + boost::bind(&ServerDiscovery::handle_receive_from, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + void broadcast_message(const string &msg) + { + /* setup broadcast socket */ + boost::asio::ip::udp::socket socket(io_service); + + socket.open(boost::asio::ip::udp::v4()); + + boost::asio::socket_base::broadcast option(true); + socket.set_option(option); + + boost::asio::ip::udp::endpoint broadcast_endpoint( + boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT); + + /* broadcast message */ + socket.send_to(boost::asio::buffer(msg), broadcast_endpoint); + } + + /* network service and socket */ + boost::asio::io_service io_service; + boost::asio::ip::udp::endpoint listen_endpoint; + boost::asio::ip::udp::socket listen_socket; + + /* threading */ + boost::thread *thread; + boost::asio::io_service::work *work; + boost::mutex mutex; + + /* buffer and endpoint for receiving messages */ + char receive_buffer[256]; + boost::asio::ip::udp::endpoint receive_endpoint; + + // os, version, devices, status, host name, group name, ip as far as fields go + struct ServerInfo { + string cycles_version; + string os; + int device_count; + string status; + string host_name; + string group_name; + string host_addr; + }; + + /* collection of server addresses in list */ + bool collect_servers; + vector<string> servers; }; CCL_NAMESPACE_END #endif -#endif /* __DEVICE_NETWORK_H__ */ +#endif /* __DEVICE_NETWORK_H__ */ |