diff options
Diffstat (limited to 'intern/cycles/device/device_network.cpp')
-rw-r--r-- | intern/cycles/device/device_network.cpp | 1453 |
1 files changed, 731 insertions, 722 deletions
diff --git a/intern/cycles/device/device_network.cpp b/intern/cycles/device/device_network.cpp index 6736480e95a..80334ad8f22 100644 --- a/intern/cycles/device/device_network.cpp +++ b/intern/cycles/device/device_network.cpp @@ -33,767 +33,776 @@ typedef map<device_ptr, DataVector> DataMap; typedef vector<RenderTile> TileList; /* search a list of tiles and find the one that matches the passed render tile */ -static TileList::iterator tile_list_find(TileList& tile_list, RenderTile& tile) +static TileList::iterator tile_list_find(TileList &tile_list, RenderTile &tile) { - for(TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it) - if(tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample) - return it; - return tile_list.end(); + for (TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it) + if (tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample) + return it; + return tile_list.end(); } -class NetworkDevice : public Device -{ -public: - boost::asio::io_service io_service; - tcp::socket socket; - device_ptr mem_counter; - DeviceTask the_task; /* todo: handle multiple tasks */ - - thread_mutex rpc_lock; - - virtual bool show_samples() const - { - return false; - } - - NetworkDevice(DeviceInfo& info, Stats &stats, Profiler &profiler, const char *address) - : Device(info, stats, profiler, true), socket(io_service) - { - error_func = NetworkError(); - stringstream portstr; - portstr << SERVER_PORT; - - tcp::resolver resolver(io_service); - tcp::resolver::query query(address, portstr.str()); - tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); - tcp::resolver::iterator end; +class NetworkDevice : public Device { + public: + boost::asio::io_service io_service; + tcp::socket socket; + device_ptr mem_counter; + DeviceTask the_task; /* todo: handle multiple tasks */ + + thread_mutex rpc_lock; + + virtual bool show_samples() const + { + return false; + } + + NetworkDevice(DeviceInfo &info, Stats &stats, Profiler &profiler, const char *address) + : Device(info, stats, profiler, true), socket(io_service) + { + error_func = NetworkError(); + stringstream portstr; + portstr << SERVER_PORT; + + tcp::resolver resolver(io_service); + tcp::resolver::query query(address, portstr.str()); + tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); + tcp::resolver::iterator end; + + boost::system::error_code error = boost::asio::error::host_not_found; + while (error && endpoint_iterator != end) { + socket.close(); + socket.connect(*endpoint_iterator++, error); + } - boost::system::error_code error = boost::asio::error::host_not_found; - while(error && endpoint_iterator != end) - { - socket.close(); - socket.connect(*endpoint_iterator++, error); - } - - if(error) - error_func.network_error(error.message()); + if (error) + error_func.network_error(error.message()); - mem_counter = 0; - } + mem_counter = 0; + } - ~NetworkDevice() - { - RPCSend snd(socket, &error_func, "stop"); - snd.write(); - } - - virtual BVHLayoutMask get_bvh_layout_mask() const { - return BVH_LAYOUT_BVH2; - } - - void mem_alloc(device_memory& mem) - { - if(mem.name) { - VLOG(1) << "Buffer allocate: " << mem.name << ", " - << string_human_readable_number(mem.memory_size()) << " bytes. (" - << string_human_readable_size(mem.memory_size()) << ")"; - } - - thread_scoped_lock lock(rpc_lock); + ~NetworkDevice() + { + RPCSend snd(socket, &error_func, "stop"); + snd.write(); + } - mem.device_pointer = ++mem_counter; + virtual BVHLayoutMask get_bvh_layout_mask() const + { + return BVH_LAYOUT_BVH2; + } - RPCSend snd(socket, &error_func, "mem_alloc"); - snd.add(mem); - snd.write(); - } + void mem_alloc(device_memory &mem) + { + if (mem.name) { + VLOG(1) << "Buffer allocate: " << mem.name << ", " + << string_human_readable_number(mem.memory_size()) << " bytes. (" + << string_human_readable_size(mem.memory_size()) << ")"; + } - void mem_copy_to(device_memory& mem) - { - thread_scoped_lock lock(rpc_lock); + thread_scoped_lock lock(rpc_lock); - RPCSend snd(socket, &error_func, "mem_copy_to"); + mem.device_pointer = ++mem_counter; - snd.add(mem); - snd.write(); - snd.write_buffer(mem.host_pointer, mem.memory_size()); - } + RPCSend snd(socket, &error_func, "mem_alloc"); + snd.add(mem); + snd.write(); + } - void mem_copy_from(device_memory& mem, int y, int w, int h, int elem) - { - thread_scoped_lock lock(rpc_lock); + void mem_copy_to(device_memory &mem) + { + thread_scoped_lock lock(rpc_lock); - size_t data_size = mem.memory_size(); + RPCSend snd(socket, &error_func, "mem_copy_to"); - RPCSend snd(socket, &error_func, "mem_copy_from"); + snd.add(mem); + snd.write(); + snd.write_buffer(mem.host_pointer, mem.memory_size()); + } - snd.add(mem); - snd.add(y); - snd.add(w); - snd.add(h); - snd.add(elem); - snd.write(); + void mem_copy_from(device_memory &mem, int y, int w, int h, int elem) + { + thread_scoped_lock lock(rpc_lock); - RPCReceive rcv(socket, &error_func); - rcv.read_buffer(mem.host_pointer, data_size); - } + size_t data_size = mem.memory_size(); - void mem_zero(device_memory& mem) - { - thread_scoped_lock lock(rpc_lock); + RPCSend snd(socket, &error_func, "mem_copy_from"); - RPCSend snd(socket, &error_func, "mem_zero"); + snd.add(mem); + snd.add(y); + snd.add(w); + snd.add(h); + snd.add(elem); + snd.write(); - snd.add(mem); - snd.write(); - } + RPCReceive rcv(socket, &error_func); + rcv.read_buffer(mem.host_pointer, data_size); + } - void mem_free(device_memory& mem) - { - if(mem.device_pointer) { - thread_scoped_lock lock(rpc_lock); - - RPCSend snd(socket, &error_func, "mem_free"); - - snd.add(mem); - snd.write(); - - mem.device_pointer = 0; - } - } + void mem_zero(device_memory &mem) + { + thread_scoped_lock lock(rpc_lock); - void const_copy_to(const char *name, void *host, size_t size) - { - thread_scoped_lock lock(rpc_lock); + RPCSend snd(socket, &error_func, "mem_zero"); - RPCSend snd(socket, &error_func, "const_copy_to"); + snd.add(mem); + snd.write(); + } - string name_string(name); - - snd.add(name_string); - snd.add(size); - snd.write(); - snd.write_buffer(host, size); - } - - bool load_kernels(const DeviceRequestedFeatures& requested_features) - { - if(error_func.have_error()) - return false; - - thread_scoped_lock lock(rpc_lock); - - RPCSend snd(socket, &error_func, "load_kernels"); - snd.add(requested_features.experimental); - snd.add(requested_features.max_closure); - snd.add(requested_features.max_nodes_group); - snd.add(requested_features.nodes_features); - snd.write(); - - bool result; - RPCReceive rcv(socket, &error_func); - rcv.read(result); - - return result; - } - - void task_add(DeviceTask& task) - { - thread_scoped_lock lock(rpc_lock); - - the_task = task; - - RPCSend snd(socket, &error_func, "task_add"); - snd.add(task); - snd.write(); - } - - void task_wait() - { - thread_scoped_lock lock(rpc_lock); - - RPCSend snd(socket, &error_func, "task_wait"); - snd.write(); - - lock.unlock(); - - TileList the_tiles; - - /* todo: run this threaded for connecting to multiple clients */ - for(;;) { - if(error_func.have_error()) - break; - - RenderTile tile; - - lock.lock(); - RPCReceive rcv(socket, &error_func); - - if(rcv.name == "acquire_tile") { - lock.unlock(); - - /* todo: watch out for recursive calls! */ - if(the_task.acquire_tile(this, tile)) { /* write return as bool */ - the_tiles.push_back(tile); - - lock.lock(); - RPCSend snd(socket, &error_func, "acquire_tile"); - snd.add(tile); - snd.write(); - lock.unlock(); - } - else { - lock.lock(); - RPCSend snd(socket, &error_func, "acquire_tile_none"); - snd.write(); - lock.unlock(); - } - } - else if(rcv.name == "release_tile") { - rcv.read(tile); - lock.unlock(); - - TileList::iterator it = tile_list_find(the_tiles, tile); - if(it != the_tiles.end()) { - tile.buffers = it->buffers; - the_tiles.erase(it); - } - - assert(tile.buffers != NULL); - - the_task.release_tile(tile); - - lock.lock(); - RPCSend snd(socket, &error_func, "release_tile"); - snd.write(); - lock.unlock(); - } - else if(rcv.name == "task_wait_done") { - lock.unlock(); - break; - } - else - lock.unlock(); - } - } - - void task_cancel() - { - thread_scoped_lock lock(rpc_lock); - RPCSend snd(socket, &error_func, "task_cancel"); - snd.write(); - } - - int get_split_task_count(DeviceTask&) - { - return 1; - } - -private: - NetworkError error_func; + void mem_free(device_memory &mem) + { + if (mem.device_pointer) { + thread_scoped_lock lock(rpc_lock); + + RPCSend snd(socket, &error_func, "mem_free"); + + snd.add(mem); + snd.write(); + + mem.device_pointer = 0; + } + } + + void const_copy_to(const char *name, void *host, size_t size) + { + thread_scoped_lock lock(rpc_lock); + + RPCSend snd(socket, &error_func, "const_copy_to"); + + string name_string(name); + + snd.add(name_string); + snd.add(size); + snd.write(); + snd.write_buffer(host, size); + } + + bool load_kernels(const DeviceRequestedFeatures &requested_features) + { + if (error_func.have_error()) + return false; + + thread_scoped_lock lock(rpc_lock); + + RPCSend snd(socket, &error_func, "load_kernels"); + snd.add(requested_features.experimental); + snd.add(requested_features.max_closure); + snd.add(requested_features.max_nodes_group); + snd.add(requested_features.nodes_features); + snd.write(); + + bool result; + RPCReceive rcv(socket, &error_func); + rcv.read(result); + + return result; + } + + void task_add(DeviceTask &task) + { + thread_scoped_lock lock(rpc_lock); + + the_task = task; + + RPCSend snd(socket, &error_func, "task_add"); + snd.add(task); + snd.write(); + } + + void task_wait() + { + thread_scoped_lock lock(rpc_lock); + + RPCSend snd(socket, &error_func, "task_wait"); + snd.write(); + + lock.unlock(); + + TileList the_tiles; + + /* todo: run this threaded for connecting to multiple clients */ + for (;;) { + if (error_func.have_error()) + break; + + RenderTile tile; + + lock.lock(); + RPCReceive rcv(socket, &error_func); + + if (rcv.name == "acquire_tile") { + lock.unlock(); + + /* todo: watch out for recursive calls! */ + if (the_task.acquire_tile(this, tile)) { /* write return as bool */ + the_tiles.push_back(tile); + + lock.lock(); + RPCSend snd(socket, &error_func, "acquire_tile"); + snd.add(tile); + snd.write(); + lock.unlock(); + } + else { + lock.lock(); + RPCSend snd(socket, &error_func, "acquire_tile_none"); + snd.write(); + lock.unlock(); + } + } + else if (rcv.name == "release_tile") { + rcv.read(tile); + lock.unlock(); + + TileList::iterator it = tile_list_find(the_tiles, tile); + if (it != the_tiles.end()) { + tile.buffers = it->buffers; + the_tiles.erase(it); + } + + assert(tile.buffers != NULL); + + the_task.release_tile(tile); + + lock.lock(); + RPCSend snd(socket, &error_func, "release_tile"); + snd.write(); + lock.unlock(); + } + else if (rcv.name == "task_wait_done") { + lock.unlock(); + break; + } + else + lock.unlock(); + } + } + + void task_cancel() + { + thread_scoped_lock lock(rpc_lock); + RPCSend snd(socket, &error_func, "task_cancel"); + snd.write(); + } + + int get_split_task_count(DeviceTask &) + { + return 1; + } + + private: + NetworkError error_func; }; -Device *device_network_create(DeviceInfo& info, Stats &stats, Profiler &profiler, const char *address) +Device *device_network_create(DeviceInfo &info, + Stats &stats, + Profiler &profiler, + const char *address) { - return new NetworkDevice(info, stats, profiler, address); + return new NetworkDevice(info, stats, profiler, address); } -void device_network_info(vector<DeviceInfo>& devices) +void device_network_info(vector<DeviceInfo> &devices) { - DeviceInfo info; + DeviceInfo info; - info.type = DEVICE_NETWORK; - info.description = "Network Device"; - info.id = "NETWORK"; - info.num = 0; + info.type = DEVICE_NETWORK; + info.description = "Network Device"; + info.id = "NETWORK"; + info.num = 0; - /* todo: get this info from device */ - info.has_volume_decoupled = false; - info.has_osl = false; + /* todo: get this info from device */ + info.has_volume_decoupled = false; + info.has_osl = false; - devices.push_back(info); + devices.push_back(info); } class DeviceServer { -public: - thread_mutex rpc_lock; - - void network_error(const string &message) { - error_func.network_error(message); - } - - bool have_error() { return error_func.have_error(); } - - DeviceServer(Device *device_, tcp::socket& socket_) - : device(device_), socket(socket_), stop(false), blocked_waiting(false) - { - error_func = NetworkError(); - } - - void listen() - { - /* receive remote function calls */ - for(;;) { - listen_step(); - - if(stop) - break; - } - } - -protected: - void listen_step() - { - thread_scoped_lock lock(rpc_lock); - RPCReceive rcv(socket, &error_func); - - if(rcv.name == "stop") - stop = true; - else - process(rcv, lock); - } - - /* create a memory buffer for a device buffer and insert it into mem_data */ - DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size) - { - /* create a new DataVector and insert it into mem_data */ - pair<DataMap::iterator,bool> data_ins = mem_data.insert( - DataMap::value_type(client_pointer, DataVector())); - - /* make sure it was a unique insertion */ - assert(data_ins.second); - - /* get a reference to the inserted vector */ - DataVector &data_v = data_ins.first->second; - - /* size the vector */ - data_v.resize(data_size); - - return data_v; - } - - DataVector &data_vector_find(device_ptr client_pointer) - { - DataMap::iterator i = mem_data.find(client_pointer); - assert(i != mem_data.end()); - return i->second; - } - - /* setup mapping and reverse mapping of client_pointer<->real_pointer */ - void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer) - { - pair<PtrMap::iterator,bool> mapins; - - /* insert mapping from client pointer to our real device pointer */ - mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer)); - assert(mapins.second); - - /* insert reverse mapping from real our device pointer to client pointer */ - mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer)); - assert(mapins.second); - } - - device_ptr device_ptr_from_client_pointer(device_ptr client_pointer) - { - PtrMap::iterator i = ptr_map.find(client_pointer); - assert(i != ptr_map.end()); - return i->second; - } - - device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer) - { - PtrMap::iterator i = ptr_map.find(client_pointer); - assert(i != ptr_map.end()); - - device_ptr result = i->second; - - /* erase the mapping */ - ptr_map.erase(i); - - /* erase the reverse mapping */ - PtrMap::iterator irev = ptr_imap.find(result); - assert(irev != ptr_imap.end()); - ptr_imap.erase(irev); - - /* erase the data vector */ - DataMap::iterator idata = mem_data.find(client_pointer); - assert(idata != mem_data.end()); - mem_data.erase(idata); - - return result; - } - - /* note that the lock must be already acquired upon entry. - * This is necessary because the caller often peeks at - * the header and delegates control to here when it doesn't - * specifically handle the current RPC. - * The lock must be unlocked before returning */ - void process(RPCReceive& rcv, thread_scoped_lock &lock) - { - if(rcv.name == "mem_alloc") { - string name; - network_device_memory mem(device); - rcv.read(mem, name); - lock.unlock(); - - /* Allocate host side data buffer. */ - size_t data_size = mem.memory_size(); - device_ptr client_pointer = mem.device_pointer; - - DataVector &data_v = data_vector_insert(client_pointer, data_size); - mem.host_pointer = (data_size)? (void*)&(data_v[0]): 0; - - /* Perform the allocation on the actual device. */ - device->mem_alloc(mem); - - /* Store a mapping to/from client_pointer and real device pointer. */ - pointer_mapping_insert(client_pointer, mem.device_pointer); - } - else if(rcv.name == "mem_copy_to") { - string name; - network_device_memory mem(device); - rcv.read(mem, name); - lock.unlock(); - - size_t data_size = mem.memory_size(); - device_ptr client_pointer = mem.device_pointer; - - if(client_pointer) { - /* Lookup existing host side data buffer. */ - DataVector &data_v = data_vector_find(client_pointer); - mem.host_pointer = (void*)&data_v[0]; - - /* Translate the client pointer to a real device pointer. */ - mem.device_pointer = device_ptr_from_client_pointer(client_pointer); - } - else { - /* Allocate host side data buffer. */ - DataVector &data_v = data_vector_insert(client_pointer, data_size); - mem.host_pointer = (data_size)? (void*)&(data_v[0]): 0; - } - - /* Copy data from network into memory buffer. */ - rcv.read_buffer((uint8_t*)mem.host_pointer, data_size); - - /* Copy the data from the memory buffer to the device buffer. */ - device->mem_copy_to(mem); - - if(!client_pointer) { - /* Store a mapping to/from client_pointer and real device pointer. */ - pointer_mapping_insert(client_pointer, mem.device_pointer); - } - } - else if(rcv.name == "mem_copy_from") { - string name; - network_device_memory mem(device); - int y, w, h, elem; - - rcv.read(mem, name); - rcv.read(y); - rcv.read(w); - rcv.read(h); - rcv.read(elem); - - device_ptr client_pointer = mem.device_pointer; - mem.device_pointer = device_ptr_from_client_pointer(client_pointer); - - DataVector &data_v = data_vector_find(client_pointer); - - mem.host_pointer = (device_ptr)&(data_v[0]); - - device->mem_copy_from(mem, y, w, h, elem); - - size_t data_size = mem.memory_size(); - - RPCSend snd(socket, &error_func, "mem_copy_from"); - snd.write(); - snd.write_buffer((uint8_t*)mem.host_pointer, data_size); - lock.unlock(); - } - else if(rcv.name == "mem_zero") { - string name; - network_device_memory mem(device); - rcv.read(mem, name); - lock.unlock(); - - size_t data_size = mem.memory_size(); - device_ptr client_pointer = mem.device_pointer; - - if(client_pointer) { - /* Lookup existing host side data buffer. */ - DataVector &data_v = data_vector_find(client_pointer); - mem.host_pointer = (void*)&data_v[0]; - - /* Translate the client pointer to a real device pointer. */ - mem.device_pointer = device_ptr_from_client_pointer(client_pointer); - } - else { - /* Allocate host side data buffer. */ - DataVector &data_v = data_vector_insert(client_pointer, data_size); - mem.host_pointer = (void*)? (device_ptr)&(data_v[0]): 0; - } - - /* Zero memory. */ - device->mem_zero(mem); - - if(!client_pointer) { - /* Store a mapping to/from client_pointer and real device pointer. */ - pointer_mapping_insert(client_pointer, mem.device_pointer); - } - } - else if(rcv.name == "mem_free") { - string name; - network_device_memory mem(device); - - rcv.read(mem, name); - lock.unlock(); - - device_ptr client_pointer = mem.device_pointer; - - mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer); - - device->mem_free(mem); - } - else if(rcv.name == "const_copy_to") { - string name_string; - size_t size; - - rcv.read(name_string); - rcv.read(size); - - vector<char> host_vector(size); - rcv.read_buffer(&host_vector[0], size); - lock.unlock(); - - device->const_copy_to(name_string.c_str(), &host_vector[0], size); - } - else if(rcv.name == "load_kernels") { - DeviceRequestedFeatures requested_features; - rcv.read(requested_features.experimental); - rcv.read(requested_features.max_closure); - rcv.read(requested_features.max_nodes_group); - rcv.read(requested_features.nodes_features); - - bool result; - result = device->load_kernels(requested_features); - RPCSend snd(socket, &error_func, "load_kernels"); - snd.add(result); - snd.write(); - lock.unlock(); - } - else if(rcv.name == "task_add") { - DeviceTask task; - - rcv.read(task); - lock.unlock(); - - if(task.buffer) - task.buffer = device_ptr_from_client_pointer(task.buffer); - - if(task.rgba_half) - task.rgba_half = device_ptr_from_client_pointer(task.rgba_half); - - if(task.rgba_byte) - task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte); - - if(task.shader_input) - task.shader_input = device_ptr_from_client_pointer(task.shader_input); - - if(task.shader_output) - task.shader_output = device_ptr_from_client_pointer(task.shader_output); - - task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2); - task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1); - task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample, this); - task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1); - task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this); - - device->task_add(task); - } - else if(rcv.name == "task_wait") { - lock.unlock(); - - blocked_waiting = true; - device->task_wait(); - blocked_waiting = false; - - lock.lock(); - RPCSend snd(socket, &error_func, "task_wait_done"); - snd.write(); - lock.unlock(); - } - else if(rcv.name == "task_cancel") { - lock.unlock(); - device->task_cancel(); - } - else if(rcv.name == "acquire_tile") { - AcquireEntry entry; - entry.name = rcv.name; - rcv.read(entry.tile); - acquire_queue.push_back(entry); - lock.unlock(); - } - else if(rcv.name == "acquire_tile_none") { - AcquireEntry entry; - entry.name = rcv.name; - acquire_queue.push_back(entry); - lock.unlock(); - } - else if(rcv.name == "release_tile") { - AcquireEntry entry; - entry.name = rcv.name; - acquire_queue.push_back(entry); - lock.unlock(); - } - else { - cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n"; - lock.unlock(); - } - } - - bool task_acquire_tile(Device *, RenderTile& tile) - { - thread_scoped_lock acquire_lock(acquire_mutex); - - bool result = false; - - RPCSend snd(socket, &error_func, "acquire_tile"); - snd.write(); - - do { - if(blocked_waiting) - listen_step(); - - /* todo: avoid busy wait loop */ - thread_scoped_lock lock(rpc_lock); - - if(!acquire_queue.empty()) { - AcquireEntry entry = acquire_queue.front(); - acquire_queue.pop_front(); - - if(entry.name == "acquire_tile") { - tile = entry.tile; - - if(tile.buffer) tile.buffer = ptr_map[tile.buffer]; - - result = true; - break; - } - else if(entry.name == "acquire_tile_none") { - break; - } - else { - cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n"; - } - } - } while(acquire_queue.empty() && !stop && !have_error()); - - return result; - } - - void task_update_progress_sample() - { - ; /* skip */ - } - - void task_update_tile_sample(RenderTile&) - { - ; /* skip */ - } - - void task_release_tile(RenderTile& tile) - { - thread_scoped_lock acquire_lock(acquire_mutex); - - if(tile.buffer) tile.buffer = ptr_imap[tile.buffer]; - - { - thread_scoped_lock lock(rpc_lock); - RPCSend snd(socket, &error_func, "release_tile"); - snd.add(tile); - snd.write(); - lock.unlock(); - } - - do { - if(blocked_waiting) - listen_step(); - - /* todo: avoid busy wait loop */ - thread_scoped_lock lock(rpc_lock); - - if(!acquire_queue.empty()) { - AcquireEntry entry = acquire_queue.front(); - acquire_queue.pop_front(); - - if(entry.name == "release_tile") { - lock.unlock(); - break; - } - else { - cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n"; - } - } - } while(acquire_queue.empty() && !stop); - } - - bool task_get_cancel() - { - return false; - } - - /* properties */ - Device *device; - tcp::socket& socket; - - /* mapping of remote to local pointer */ - PtrMap ptr_map; - PtrMap ptr_imap; - DataMap mem_data; - - struct AcquireEntry { - string name; - RenderTile tile; - }; - - thread_mutex acquire_mutex; - list<AcquireEntry> acquire_queue; - - bool stop; - bool blocked_waiting; -private: - NetworkError error_func; - - /* todo: free memory and device (osl) on network error */ - + public: + thread_mutex rpc_lock; + + void network_error(const string &message) + { + error_func.network_error(message); + } + + bool have_error() + { + return error_func.have_error(); + } + + DeviceServer(Device *device_, tcp::socket &socket_) + : device(device_), socket(socket_), stop(false), blocked_waiting(false) + { + error_func = NetworkError(); + } + + void listen() + { + /* receive remote function calls */ + for (;;) { + listen_step(); + + if (stop) + break; + } + } + + protected: + void listen_step() + { + thread_scoped_lock lock(rpc_lock); + RPCReceive rcv(socket, &error_func); + + if (rcv.name == "stop") + stop = true; + else + process(rcv, lock); + } + + /* create a memory buffer for a device buffer and insert it into mem_data */ + DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size) + { + /* create a new DataVector and insert it into mem_data */ + pair<DataMap::iterator, bool> data_ins = mem_data.insert( + DataMap::value_type(client_pointer, DataVector())); + + /* make sure it was a unique insertion */ + assert(data_ins.second); + + /* get a reference to the inserted vector */ + DataVector &data_v = data_ins.first->second; + + /* size the vector */ + data_v.resize(data_size); + + return data_v; + } + + DataVector &data_vector_find(device_ptr client_pointer) + { + DataMap::iterator i = mem_data.find(client_pointer); + assert(i != mem_data.end()); + return i->second; + } + + /* setup mapping and reverse mapping of client_pointer<->real_pointer */ + void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer) + { + pair<PtrMap::iterator, bool> mapins; + + /* insert mapping from client pointer to our real device pointer */ + mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer)); + assert(mapins.second); + + /* insert reverse mapping from real our device pointer to client pointer */ + mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer)); + assert(mapins.second); + } + + device_ptr device_ptr_from_client_pointer(device_ptr client_pointer) + { + PtrMap::iterator i = ptr_map.find(client_pointer); + assert(i != ptr_map.end()); + return i->second; + } + + device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer) + { + PtrMap::iterator i = ptr_map.find(client_pointer); + assert(i != ptr_map.end()); + + device_ptr result = i->second; + + /* erase the mapping */ + ptr_map.erase(i); + + /* erase the reverse mapping */ + PtrMap::iterator irev = ptr_imap.find(result); + assert(irev != ptr_imap.end()); + ptr_imap.erase(irev); + + /* erase the data vector */ + DataMap::iterator idata = mem_data.find(client_pointer); + assert(idata != mem_data.end()); + mem_data.erase(idata); + + return result; + } + + /* note that the lock must be already acquired upon entry. + * This is necessary because the caller often peeks at + * the header and delegates control to here when it doesn't + * specifically handle the current RPC. + * The lock must be unlocked before returning */ + void process(RPCReceive &rcv, thread_scoped_lock &lock) + { + if (rcv.name == "mem_alloc") { + string name; + network_device_memory mem(device); + rcv.read(mem, name); + lock.unlock(); + + /* Allocate host side data buffer. */ + size_t data_size = mem.memory_size(); + device_ptr client_pointer = mem.device_pointer; + + DataVector &data_v = data_vector_insert(client_pointer, data_size); + mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0; + + /* Perform the allocation on the actual device. */ + device->mem_alloc(mem); + + /* Store a mapping to/from client_pointer and real device pointer. */ + pointer_mapping_insert(client_pointer, mem.device_pointer); + } + else if (rcv.name == "mem_copy_to") { + string name; + network_device_memory mem(device); + rcv.read(mem, name); + lock.unlock(); + + size_t data_size = mem.memory_size(); + device_ptr client_pointer = mem.device_pointer; + + if (client_pointer) { + /* Lookup existing host side data buffer. */ + DataVector &data_v = data_vector_find(client_pointer); + mem.host_pointer = (void *)&data_v[0]; + + /* Translate the client pointer to a real device pointer. */ + mem.device_pointer = device_ptr_from_client_pointer(client_pointer); + } + else { + /* Allocate host side data buffer. */ + DataVector &data_v = data_vector_insert(client_pointer, data_size); + mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0; + } + + /* Copy data from network into memory buffer. */ + rcv.read_buffer((uint8_t *)mem.host_pointer, data_size); + + /* Copy the data from the memory buffer to the device buffer. */ + device->mem_copy_to(mem); + + if (!client_pointer) { + /* Store a mapping to/from client_pointer and real device pointer. */ + pointer_mapping_insert(client_pointer, mem.device_pointer); + } + } + else if (rcv.name == "mem_copy_from") { + string name; + network_device_memory mem(device); + int y, w, h, elem; + + rcv.read(mem, name); + rcv.read(y); + rcv.read(w); + rcv.read(h); + rcv.read(elem); + + device_ptr client_pointer = mem.device_pointer; + mem.device_pointer = device_ptr_from_client_pointer(client_pointer); + + DataVector &data_v = data_vector_find(client_pointer); + + mem.host_pointer = (device_ptr) & (data_v[0]); + + device->mem_copy_from(mem, y, w, h, elem); + + size_t data_size = mem.memory_size(); + + RPCSend snd(socket, &error_func, "mem_copy_from"); + snd.write(); + snd.write_buffer((uint8_t *)mem.host_pointer, data_size); + lock.unlock(); + } + else if (rcv.name == "mem_zero") { + string name; + network_device_memory mem(device); + rcv.read(mem, name); + lock.unlock(); + + size_t data_size = mem.memory_size(); + device_ptr client_pointer = mem.device_pointer; + + if (client_pointer) { + /* Lookup existing host side data buffer. */ + DataVector &data_v = data_vector_find(client_pointer); + mem.host_pointer = (void *)&data_v[0]; + + /* Translate the client pointer to a real device pointer. */ + mem.device_pointer = device_ptr_from_client_pointer(client_pointer); + } + else { + /* Allocate host side data buffer. */ + DataVector &data_v = data_vector_insert(client_pointer, data_size); + mem.host_pointer = (void *) ? (device_ptr) & (data_v[0]) : 0; + } + + /* Zero memory. */ + device->mem_zero(mem); + + if (!client_pointer) { + /* Store a mapping to/from client_pointer and real device pointer. */ + pointer_mapping_insert(client_pointer, mem.device_pointer); + } + } + else if (rcv.name == "mem_free") { + string name; + network_device_memory mem(device); + + rcv.read(mem, name); + lock.unlock(); + + device_ptr client_pointer = mem.device_pointer; + + mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer); + + device->mem_free(mem); + } + else if (rcv.name == "const_copy_to") { + string name_string; + size_t size; + + rcv.read(name_string); + rcv.read(size); + + vector<char> host_vector(size); + rcv.read_buffer(&host_vector[0], size); + lock.unlock(); + + device->const_copy_to(name_string.c_str(), &host_vector[0], size); + } + else if (rcv.name == "load_kernels") { + DeviceRequestedFeatures requested_features; + rcv.read(requested_features.experimental); + rcv.read(requested_features.max_closure); + rcv.read(requested_features.max_nodes_group); + rcv.read(requested_features.nodes_features); + + bool result; + result = device->load_kernels(requested_features); + RPCSend snd(socket, &error_func, "load_kernels"); + snd.add(result); + snd.write(); + lock.unlock(); + } + else if (rcv.name == "task_add") { + DeviceTask task; + + rcv.read(task); + lock.unlock(); + + if (task.buffer) + task.buffer = device_ptr_from_client_pointer(task.buffer); + + if (task.rgba_half) + task.rgba_half = device_ptr_from_client_pointer(task.rgba_half); + + if (task.rgba_byte) + task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte); + + if (task.shader_input) + task.shader_input = device_ptr_from_client_pointer(task.shader_input); + + if (task.shader_output) + task.shader_output = device_ptr_from_client_pointer(task.shader_output); + + task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2); + task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1); + task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample, + this); + task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1); + task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this); + + device->task_add(task); + } + else if (rcv.name == "task_wait") { + lock.unlock(); + + blocked_waiting = true; + device->task_wait(); + blocked_waiting = false; + + lock.lock(); + RPCSend snd(socket, &error_func, "task_wait_done"); + snd.write(); + lock.unlock(); + } + else if (rcv.name == "task_cancel") { + lock.unlock(); + device->task_cancel(); + } + else if (rcv.name == "acquire_tile") { + AcquireEntry entry; + entry.name = rcv.name; + rcv.read(entry.tile); + acquire_queue.push_back(entry); + lock.unlock(); + } + else if (rcv.name == "acquire_tile_none") { + AcquireEntry entry; + entry.name = rcv.name; + acquire_queue.push_back(entry); + lock.unlock(); + } + else if (rcv.name == "release_tile") { + AcquireEntry entry; + entry.name = rcv.name; + acquire_queue.push_back(entry); + lock.unlock(); + } + else { + cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n"; + lock.unlock(); + } + } + + bool task_acquire_tile(Device *, RenderTile &tile) + { + thread_scoped_lock acquire_lock(acquire_mutex); + + bool result = false; + + RPCSend snd(socket, &error_func, "acquire_tile"); + snd.write(); + + do { + if (blocked_waiting) + listen_step(); + + /* todo: avoid busy wait loop */ + thread_scoped_lock lock(rpc_lock); + + if (!acquire_queue.empty()) { + AcquireEntry entry = acquire_queue.front(); + acquire_queue.pop_front(); + + if (entry.name == "acquire_tile") { + tile = entry.tile; + + if (tile.buffer) + tile.buffer = ptr_map[tile.buffer]; + + result = true; + break; + } + else if (entry.name == "acquire_tile_none") { + break; + } + else { + cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n"; + } + } + } while (acquire_queue.empty() && !stop && !have_error()); + + return result; + } + + void task_update_progress_sample() + { + ; /* skip */ + } + + void task_update_tile_sample(RenderTile &) + { + ; /* skip */ + } + + void task_release_tile(RenderTile &tile) + { + thread_scoped_lock acquire_lock(acquire_mutex); + + if (tile.buffer) + tile.buffer = ptr_imap[tile.buffer]; + + { + thread_scoped_lock lock(rpc_lock); + RPCSend snd(socket, &error_func, "release_tile"); + snd.add(tile); + snd.write(); + lock.unlock(); + } + + do { + if (blocked_waiting) + listen_step(); + + /* todo: avoid busy wait loop */ + thread_scoped_lock lock(rpc_lock); + + if (!acquire_queue.empty()) { + AcquireEntry entry = acquire_queue.front(); + acquire_queue.pop_front(); + + if (entry.name == "release_tile") { + lock.unlock(); + break; + } + else { + cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n"; + } + } + } while (acquire_queue.empty() && !stop); + } + + bool task_get_cancel() + { + return false; + } + + /* properties */ + Device *device; + tcp::socket &socket; + + /* mapping of remote to local pointer */ + PtrMap ptr_map; + PtrMap ptr_imap; + DataMap mem_data; + + struct AcquireEntry { + string name; + RenderTile tile; + }; + + thread_mutex acquire_mutex; + list<AcquireEntry> acquire_queue; + + bool stop; + bool blocked_waiting; + + private: + NetworkError error_func; + + /* todo: free memory and device (osl) on network error */ }; void Device::server_run() { - try { - /* starts thread that responds to discovery requests */ - ServerDiscovery discovery; - - for(;;) { - /* accept connection */ - boost::asio::io_service io_service; - tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT)); - - tcp::socket socket(io_service); - acceptor.accept(socket); - - string remote_address = socket.remote_endpoint().address().to_string(); - printf("Connected to remote client at: %s\n", remote_address.c_str()); - - DeviceServer server(this, socket); - server.listen(); - - printf("Disconnected.\n"); - } - } - catch(exception& e) { - fprintf(stderr, "Network server exception: %s\n", e.what()); - } + try { + /* starts thread that responds to discovery requests */ + ServerDiscovery discovery; + + for (;;) { + /* accept connection */ + boost::asio::io_service io_service; + tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT)); + + tcp::socket socket(io_service); + acceptor.accept(socket); + + string remote_address = socket.remote_endpoint().address().to_string(); + printf("Connected to remote client at: %s\n", remote_address.c_str()); + + DeviceServer server(this, socket); + server.listen(); + + printf("Disconnected.\n"); + } + } + catch (exception &e) { + fprintf(stderr, "Network server exception: %s\n", e.what()); + } } CCL_NAMESPACE_END |