Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'intern/cycles/device/device_network.cpp')
-rw-r--r--intern/cycles/device/device_network.cpp1453
1 files changed, 731 insertions, 722 deletions
diff --git a/intern/cycles/device/device_network.cpp b/intern/cycles/device/device_network.cpp
index 6736480e95a..80334ad8f22 100644
--- a/intern/cycles/device/device_network.cpp
+++ b/intern/cycles/device/device_network.cpp
@@ -33,767 +33,776 @@ typedef map<device_ptr, DataVector> DataMap;
typedef vector<RenderTile> TileList;
/* search a list of tiles and find the one that matches the passed render tile */
-static TileList::iterator tile_list_find(TileList& tile_list, RenderTile& tile)
+static TileList::iterator tile_list_find(TileList &tile_list, RenderTile &tile)
{
- for(TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it)
- if(tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample)
- return it;
- return tile_list.end();
+ for (TileList::iterator it = tile_list.begin(); it != tile_list.end(); ++it)
+ if (tile.x == it->x && tile.y == it->y && tile.start_sample == it->start_sample)
+ return it;
+ return tile_list.end();
}
-class NetworkDevice : public Device
-{
-public:
- boost::asio::io_service io_service;
- tcp::socket socket;
- device_ptr mem_counter;
- DeviceTask the_task; /* todo: handle multiple tasks */
-
- thread_mutex rpc_lock;
-
- virtual bool show_samples() const
- {
- return false;
- }
-
- NetworkDevice(DeviceInfo& info, Stats &stats, Profiler &profiler, const char *address)
- : Device(info, stats, profiler, true), socket(io_service)
- {
- error_func = NetworkError();
- stringstream portstr;
- portstr << SERVER_PORT;
-
- tcp::resolver resolver(io_service);
- tcp::resolver::query query(address, portstr.str());
- tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
- tcp::resolver::iterator end;
+class NetworkDevice : public Device {
+ public:
+ boost::asio::io_service io_service;
+ tcp::socket socket;
+ device_ptr mem_counter;
+ DeviceTask the_task; /* todo: handle multiple tasks */
+
+ thread_mutex rpc_lock;
+
+ virtual bool show_samples() const
+ {
+ return false;
+ }
+
+ NetworkDevice(DeviceInfo &info, Stats &stats, Profiler &profiler, const char *address)
+ : Device(info, stats, profiler, true), socket(io_service)
+ {
+ error_func = NetworkError();
+ stringstream portstr;
+ portstr << SERVER_PORT;
+
+ tcp::resolver resolver(io_service);
+ tcp::resolver::query query(address, portstr.str());
+ tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
+ tcp::resolver::iterator end;
+
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ while (error && endpoint_iterator != end) {
+ socket.close();
+ socket.connect(*endpoint_iterator++, error);
+ }
- boost::system::error_code error = boost::asio::error::host_not_found;
- while(error && endpoint_iterator != end)
- {
- socket.close();
- socket.connect(*endpoint_iterator++, error);
- }
-
- if(error)
- error_func.network_error(error.message());
+ if (error)
+ error_func.network_error(error.message());
- mem_counter = 0;
- }
+ mem_counter = 0;
+ }
- ~NetworkDevice()
- {
- RPCSend snd(socket, &error_func, "stop");
- snd.write();
- }
-
- virtual BVHLayoutMask get_bvh_layout_mask() const {
- return BVH_LAYOUT_BVH2;
- }
-
- void mem_alloc(device_memory& mem)
- {
- if(mem.name) {
- VLOG(1) << "Buffer allocate: " << mem.name << ", "
- << string_human_readable_number(mem.memory_size()) << " bytes. ("
- << string_human_readable_size(mem.memory_size()) << ")";
- }
-
- thread_scoped_lock lock(rpc_lock);
+ ~NetworkDevice()
+ {
+ RPCSend snd(socket, &error_func, "stop");
+ snd.write();
+ }
- mem.device_pointer = ++mem_counter;
+ virtual BVHLayoutMask get_bvh_layout_mask() const
+ {
+ return BVH_LAYOUT_BVH2;
+ }
- RPCSend snd(socket, &error_func, "mem_alloc");
- snd.add(mem);
- snd.write();
- }
+ void mem_alloc(device_memory &mem)
+ {
+ if (mem.name) {
+ VLOG(1) << "Buffer allocate: " << mem.name << ", "
+ << string_human_readable_number(mem.memory_size()) << " bytes. ("
+ << string_human_readable_size(mem.memory_size()) << ")";
+ }
- void mem_copy_to(device_memory& mem)
- {
- thread_scoped_lock lock(rpc_lock);
+ thread_scoped_lock lock(rpc_lock);
- RPCSend snd(socket, &error_func, "mem_copy_to");
+ mem.device_pointer = ++mem_counter;
- snd.add(mem);
- snd.write();
- snd.write_buffer(mem.host_pointer, mem.memory_size());
- }
+ RPCSend snd(socket, &error_func, "mem_alloc");
+ snd.add(mem);
+ snd.write();
+ }
- void mem_copy_from(device_memory& mem, int y, int w, int h, int elem)
- {
- thread_scoped_lock lock(rpc_lock);
+ void mem_copy_to(device_memory &mem)
+ {
+ thread_scoped_lock lock(rpc_lock);
- size_t data_size = mem.memory_size();
+ RPCSend snd(socket, &error_func, "mem_copy_to");
- RPCSend snd(socket, &error_func, "mem_copy_from");
+ snd.add(mem);
+ snd.write();
+ snd.write_buffer(mem.host_pointer, mem.memory_size());
+ }
- snd.add(mem);
- snd.add(y);
- snd.add(w);
- snd.add(h);
- snd.add(elem);
- snd.write();
+ void mem_copy_from(device_memory &mem, int y, int w, int h, int elem)
+ {
+ thread_scoped_lock lock(rpc_lock);
- RPCReceive rcv(socket, &error_func);
- rcv.read_buffer(mem.host_pointer, data_size);
- }
+ size_t data_size = mem.memory_size();
- void mem_zero(device_memory& mem)
- {
- thread_scoped_lock lock(rpc_lock);
+ RPCSend snd(socket, &error_func, "mem_copy_from");
- RPCSend snd(socket, &error_func, "mem_zero");
+ snd.add(mem);
+ snd.add(y);
+ snd.add(w);
+ snd.add(h);
+ snd.add(elem);
+ snd.write();
- snd.add(mem);
- snd.write();
- }
+ RPCReceive rcv(socket, &error_func);
+ rcv.read_buffer(mem.host_pointer, data_size);
+ }
- void mem_free(device_memory& mem)
- {
- if(mem.device_pointer) {
- thread_scoped_lock lock(rpc_lock);
-
- RPCSend snd(socket, &error_func, "mem_free");
-
- snd.add(mem);
- snd.write();
-
- mem.device_pointer = 0;
- }
- }
+ void mem_zero(device_memory &mem)
+ {
+ thread_scoped_lock lock(rpc_lock);
- void const_copy_to(const char *name, void *host, size_t size)
- {
- thread_scoped_lock lock(rpc_lock);
+ RPCSend snd(socket, &error_func, "mem_zero");
- RPCSend snd(socket, &error_func, "const_copy_to");
+ snd.add(mem);
+ snd.write();
+ }
- string name_string(name);
-
- snd.add(name_string);
- snd.add(size);
- snd.write();
- snd.write_buffer(host, size);
- }
-
- bool load_kernels(const DeviceRequestedFeatures& requested_features)
- {
- if(error_func.have_error())
- return false;
-
- thread_scoped_lock lock(rpc_lock);
-
- RPCSend snd(socket, &error_func, "load_kernels");
- snd.add(requested_features.experimental);
- snd.add(requested_features.max_closure);
- snd.add(requested_features.max_nodes_group);
- snd.add(requested_features.nodes_features);
- snd.write();
-
- bool result;
- RPCReceive rcv(socket, &error_func);
- rcv.read(result);
-
- return result;
- }
-
- void task_add(DeviceTask& task)
- {
- thread_scoped_lock lock(rpc_lock);
-
- the_task = task;
-
- RPCSend snd(socket, &error_func, "task_add");
- snd.add(task);
- snd.write();
- }
-
- void task_wait()
- {
- thread_scoped_lock lock(rpc_lock);
-
- RPCSend snd(socket, &error_func, "task_wait");
- snd.write();
-
- lock.unlock();
-
- TileList the_tiles;
-
- /* todo: run this threaded for connecting to multiple clients */
- for(;;) {
- if(error_func.have_error())
- break;
-
- RenderTile tile;
-
- lock.lock();
- RPCReceive rcv(socket, &error_func);
-
- if(rcv.name == "acquire_tile") {
- lock.unlock();
-
- /* todo: watch out for recursive calls! */
- if(the_task.acquire_tile(this, tile)) { /* write return as bool */
- the_tiles.push_back(tile);
-
- lock.lock();
- RPCSend snd(socket, &error_func, "acquire_tile");
- snd.add(tile);
- snd.write();
- lock.unlock();
- }
- else {
- lock.lock();
- RPCSend snd(socket, &error_func, "acquire_tile_none");
- snd.write();
- lock.unlock();
- }
- }
- else if(rcv.name == "release_tile") {
- rcv.read(tile);
- lock.unlock();
-
- TileList::iterator it = tile_list_find(the_tiles, tile);
- if(it != the_tiles.end()) {
- tile.buffers = it->buffers;
- the_tiles.erase(it);
- }
-
- assert(tile.buffers != NULL);
-
- the_task.release_tile(tile);
-
- lock.lock();
- RPCSend snd(socket, &error_func, "release_tile");
- snd.write();
- lock.unlock();
- }
- else if(rcv.name == "task_wait_done") {
- lock.unlock();
- break;
- }
- else
- lock.unlock();
- }
- }
-
- void task_cancel()
- {
- thread_scoped_lock lock(rpc_lock);
- RPCSend snd(socket, &error_func, "task_cancel");
- snd.write();
- }
-
- int get_split_task_count(DeviceTask&)
- {
- return 1;
- }
-
-private:
- NetworkError error_func;
+ void mem_free(device_memory &mem)
+ {
+ if (mem.device_pointer) {
+ thread_scoped_lock lock(rpc_lock);
+
+ RPCSend snd(socket, &error_func, "mem_free");
+
+ snd.add(mem);
+ snd.write();
+
+ mem.device_pointer = 0;
+ }
+ }
+
+ void const_copy_to(const char *name, void *host, size_t size)
+ {
+ thread_scoped_lock lock(rpc_lock);
+
+ RPCSend snd(socket, &error_func, "const_copy_to");
+
+ string name_string(name);
+
+ snd.add(name_string);
+ snd.add(size);
+ snd.write();
+ snd.write_buffer(host, size);
+ }
+
+ bool load_kernels(const DeviceRequestedFeatures &requested_features)
+ {
+ if (error_func.have_error())
+ return false;
+
+ thread_scoped_lock lock(rpc_lock);
+
+ RPCSend snd(socket, &error_func, "load_kernels");
+ snd.add(requested_features.experimental);
+ snd.add(requested_features.max_closure);
+ snd.add(requested_features.max_nodes_group);
+ snd.add(requested_features.nodes_features);
+ snd.write();
+
+ bool result;
+ RPCReceive rcv(socket, &error_func);
+ rcv.read(result);
+
+ return result;
+ }
+
+ void task_add(DeviceTask &task)
+ {
+ thread_scoped_lock lock(rpc_lock);
+
+ the_task = task;
+
+ RPCSend snd(socket, &error_func, "task_add");
+ snd.add(task);
+ snd.write();
+ }
+
+ void task_wait()
+ {
+ thread_scoped_lock lock(rpc_lock);
+
+ RPCSend snd(socket, &error_func, "task_wait");
+ snd.write();
+
+ lock.unlock();
+
+ TileList the_tiles;
+
+ /* todo: run this threaded for connecting to multiple clients */
+ for (;;) {
+ if (error_func.have_error())
+ break;
+
+ RenderTile tile;
+
+ lock.lock();
+ RPCReceive rcv(socket, &error_func);
+
+ if (rcv.name == "acquire_tile") {
+ lock.unlock();
+
+ /* todo: watch out for recursive calls! */
+ if (the_task.acquire_tile(this, tile)) { /* write return as bool */
+ the_tiles.push_back(tile);
+
+ lock.lock();
+ RPCSend snd(socket, &error_func, "acquire_tile");
+ snd.add(tile);
+ snd.write();
+ lock.unlock();
+ }
+ else {
+ lock.lock();
+ RPCSend snd(socket, &error_func, "acquire_tile_none");
+ snd.write();
+ lock.unlock();
+ }
+ }
+ else if (rcv.name == "release_tile") {
+ rcv.read(tile);
+ lock.unlock();
+
+ TileList::iterator it = tile_list_find(the_tiles, tile);
+ if (it != the_tiles.end()) {
+ tile.buffers = it->buffers;
+ the_tiles.erase(it);
+ }
+
+ assert(tile.buffers != NULL);
+
+ the_task.release_tile(tile);
+
+ lock.lock();
+ RPCSend snd(socket, &error_func, "release_tile");
+ snd.write();
+ lock.unlock();
+ }
+ else if (rcv.name == "task_wait_done") {
+ lock.unlock();
+ break;
+ }
+ else
+ lock.unlock();
+ }
+ }
+
+ void task_cancel()
+ {
+ thread_scoped_lock lock(rpc_lock);
+ RPCSend snd(socket, &error_func, "task_cancel");
+ snd.write();
+ }
+
+ int get_split_task_count(DeviceTask &)
+ {
+ return 1;
+ }
+
+ private:
+ NetworkError error_func;
};
-Device *device_network_create(DeviceInfo& info, Stats &stats, Profiler &profiler, const char *address)
+Device *device_network_create(DeviceInfo &info,
+ Stats &stats,
+ Profiler &profiler,
+ const char *address)
{
- return new NetworkDevice(info, stats, profiler, address);
+ return new NetworkDevice(info, stats, profiler, address);
}
-void device_network_info(vector<DeviceInfo>& devices)
+void device_network_info(vector<DeviceInfo> &devices)
{
- DeviceInfo info;
+ DeviceInfo info;
- info.type = DEVICE_NETWORK;
- info.description = "Network Device";
- info.id = "NETWORK";
- info.num = 0;
+ info.type = DEVICE_NETWORK;
+ info.description = "Network Device";
+ info.id = "NETWORK";
+ info.num = 0;
- /* todo: get this info from device */
- info.has_volume_decoupled = false;
- info.has_osl = false;
+ /* todo: get this info from device */
+ info.has_volume_decoupled = false;
+ info.has_osl = false;
- devices.push_back(info);
+ devices.push_back(info);
}
class DeviceServer {
-public:
- thread_mutex rpc_lock;
-
- void network_error(const string &message) {
- error_func.network_error(message);
- }
-
- bool have_error() { return error_func.have_error(); }
-
- DeviceServer(Device *device_, tcp::socket& socket_)
- : device(device_), socket(socket_), stop(false), blocked_waiting(false)
- {
- error_func = NetworkError();
- }
-
- void listen()
- {
- /* receive remote function calls */
- for(;;) {
- listen_step();
-
- if(stop)
- break;
- }
- }
-
-protected:
- void listen_step()
- {
- thread_scoped_lock lock(rpc_lock);
- RPCReceive rcv(socket, &error_func);
-
- if(rcv.name == "stop")
- stop = true;
- else
- process(rcv, lock);
- }
-
- /* create a memory buffer for a device buffer and insert it into mem_data */
- DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size)
- {
- /* create a new DataVector and insert it into mem_data */
- pair<DataMap::iterator,bool> data_ins = mem_data.insert(
- DataMap::value_type(client_pointer, DataVector()));
-
- /* make sure it was a unique insertion */
- assert(data_ins.second);
-
- /* get a reference to the inserted vector */
- DataVector &data_v = data_ins.first->second;
-
- /* size the vector */
- data_v.resize(data_size);
-
- return data_v;
- }
-
- DataVector &data_vector_find(device_ptr client_pointer)
- {
- DataMap::iterator i = mem_data.find(client_pointer);
- assert(i != mem_data.end());
- return i->second;
- }
-
- /* setup mapping and reverse mapping of client_pointer<->real_pointer */
- void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer)
- {
- pair<PtrMap::iterator,bool> mapins;
-
- /* insert mapping from client pointer to our real device pointer */
- mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer));
- assert(mapins.second);
-
- /* insert reverse mapping from real our device pointer to client pointer */
- mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer));
- assert(mapins.second);
- }
-
- device_ptr device_ptr_from_client_pointer(device_ptr client_pointer)
- {
- PtrMap::iterator i = ptr_map.find(client_pointer);
- assert(i != ptr_map.end());
- return i->second;
- }
-
- device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer)
- {
- PtrMap::iterator i = ptr_map.find(client_pointer);
- assert(i != ptr_map.end());
-
- device_ptr result = i->second;
-
- /* erase the mapping */
- ptr_map.erase(i);
-
- /* erase the reverse mapping */
- PtrMap::iterator irev = ptr_imap.find(result);
- assert(irev != ptr_imap.end());
- ptr_imap.erase(irev);
-
- /* erase the data vector */
- DataMap::iterator idata = mem_data.find(client_pointer);
- assert(idata != mem_data.end());
- mem_data.erase(idata);
-
- return result;
- }
-
- /* note that the lock must be already acquired upon entry.
- * This is necessary because the caller often peeks at
- * the header and delegates control to here when it doesn't
- * specifically handle the current RPC.
- * The lock must be unlocked before returning */
- void process(RPCReceive& rcv, thread_scoped_lock &lock)
- {
- if(rcv.name == "mem_alloc") {
- string name;
- network_device_memory mem(device);
- rcv.read(mem, name);
- lock.unlock();
-
- /* Allocate host side data buffer. */
- size_t data_size = mem.memory_size();
- device_ptr client_pointer = mem.device_pointer;
-
- DataVector &data_v = data_vector_insert(client_pointer, data_size);
- mem.host_pointer = (data_size)? (void*)&(data_v[0]): 0;
-
- /* Perform the allocation on the actual device. */
- device->mem_alloc(mem);
-
- /* Store a mapping to/from client_pointer and real device pointer. */
- pointer_mapping_insert(client_pointer, mem.device_pointer);
- }
- else if(rcv.name == "mem_copy_to") {
- string name;
- network_device_memory mem(device);
- rcv.read(mem, name);
- lock.unlock();
-
- size_t data_size = mem.memory_size();
- device_ptr client_pointer = mem.device_pointer;
-
- if(client_pointer) {
- /* Lookup existing host side data buffer. */
- DataVector &data_v = data_vector_find(client_pointer);
- mem.host_pointer = (void*)&data_v[0];
-
- /* Translate the client pointer to a real device pointer. */
- mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
- }
- else {
- /* Allocate host side data buffer. */
- DataVector &data_v = data_vector_insert(client_pointer, data_size);
- mem.host_pointer = (data_size)? (void*)&(data_v[0]): 0;
- }
-
- /* Copy data from network into memory buffer. */
- rcv.read_buffer((uint8_t*)mem.host_pointer, data_size);
-
- /* Copy the data from the memory buffer to the device buffer. */
- device->mem_copy_to(mem);
-
- if(!client_pointer) {
- /* Store a mapping to/from client_pointer and real device pointer. */
- pointer_mapping_insert(client_pointer, mem.device_pointer);
- }
- }
- else if(rcv.name == "mem_copy_from") {
- string name;
- network_device_memory mem(device);
- int y, w, h, elem;
-
- rcv.read(mem, name);
- rcv.read(y);
- rcv.read(w);
- rcv.read(h);
- rcv.read(elem);
-
- device_ptr client_pointer = mem.device_pointer;
- mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
-
- DataVector &data_v = data_vector_find(client_pointer);
-
- mem.host_pointer = (device_ptr)&(data_v[0]);
-
- device->mem_copy_from(mem, y, w, h, elem);
-
- size_t data_size = mem.memory_size();
-
- RPCSend snd(socket, &error_func, "mem_copy_from");
- snd.write();
- snd.write_buffer((uint8_t*)mem.host_pointer, data_size);
- lock.unlock();
- }
- else if(rcv.name == "mem_zero") {
- string name;
- network_device_memory mem(device);
- rcv.read(mem, name);
- lock.unlock();
-
- size_t data_size = mem.memory_size();
- device_ptr client_pointer = mem.device_pointer;
-
- if(client_pointer) {
- /* Lookup existing host side data buffer. */
- DataVector &data_v = data_vector_find(client_pointer);
- mem.host_pointer = (void*)&data_v[0];
-
- /* Translate the client pointer to a real device pointer. */
- mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
- }
- else {
- /* Allocate host side data buffer. */
- DataVector &data_v = data_vector_insert(client_pointer, data_size);
- mem.host_pointer = (void*)? (device_ptr)&(data_v[0]): 0;
- }
-
- /* Zero memory. */
- device->mem_zero(mem);
-
- if(!client_pointer) {
- /* Store a mapping to/from client_pointer and real device pointer. */
- pointer_mapping_insert(client_pointer, mem.device_pointer);
- }
- }
- else if(rcv.name == "mem_free") {
- string name;
- network_device_memory mem(device);
-
- rcv.read(mem, name);
- lock.unlock();
-
- device_ptr client_pointer = mem.device_pointer;
-
- mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer);
-
- device->mem_free(mem);
- }
- else if(rcv.name == "const_copy_to") {
- string name_string;
- size_t size;
-
- rcv.read(name_string);
- rcv.read(size);
-
- vector<char> host_vector(size);
- rcv.read_buffer(&host_vector[0], size);
- lock.unlock();
-
- device->const_copy_to(name_string.c_str(), &host_vector[0], size);
- }
- else if(rcv.name == "load_kernels") {
- DeviceRequestedFeatures requested_features;
- rcv.read(requested_features.experimental);
- rcv.read(requested_features.max_closure);
- rcv.read(requested_features.max_nodes_group);
- rcv.read(requested_features.nodes_features);
-
- bool result;
- result = device->load_kernels(requested_features);
- RPCSend snd(socket, &error_func, "load_kernels");
- snd.add(result);
- snd.write();
- lock.unlock();
- }
- else if(rcv.name == "task_add") {
- DeviceTask task;
-
- rcv.read(task);
- lock.unlock();
-
- if(task.buffer)
- task.buffer = device_ptr_from_client_pointer(task.buffer);
-
- if(task.rgba_half)
- task.rgba_half = device_ptr_from_client_pointer(task.rgba_half);
-
- if(task.rgba_byte)
- task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte);
-
- if(task.shader_input)
- task.shader_input = device_ptr_from_client_pointer(task.shader_input);
-
- if(task.shader_output)
- task.shader_output = device_ptr_from_client_pointer(task.shader_output);
-
- task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2);
- task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1);
- task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample, this);
- task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1);
- task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this);
-
- device->task_add(task);
- }
- else if(rcv.name == "task_wait") {
- lock.unlock();
-
- blocked_waiting = true;
- device->task_wait();
- blocked_waiting = false;
-
- lock.lock();
- RPCSend snd(socket, &error_func, "task_wait_done");
- snd.write();
- lock.unlock();
- }
- else if(rcv.name == "task_cancel") {
- lock.unlock();
- device->task_cancel();
- }
- else if(rcv.name == "acquire_tile") {
- AcquireEntry entry;
- entry.name = rcv.name;
- rcv.read(entry.tile);
- acquire_queue.push_back(entry);
- lock.unlock();
- }
- else if(rcv.name == "acquire_tile_none") {
- AcquireEntry entry;
- entry.name = rcv.name;
- acquire_queue.push_back(entry);
- lock.unlock();
- }
- else if(rcv.name == "release_tile") {
- AcquireEntry entry;
- entry.name = rcv.name;
- acquire_queue.push_back(entry);
- lock.unlock();
- }
- else {
- cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n";
- lock.unlock();
- }
- }
-
- bool task_acquire_tile(Device *, RenderTile& tile)
- {
- thread_scoped_lock acquire_lock(acquire_mutex);
-
- bool result = false;
-
- RPCSend snd(socket, &error_func, "acquire_tile");
- snd.write();
-
- do {
- if(blocked_waiting)
- listen_step();
-
- /* todo: avoid busy wait loop */
- thread_scoped_lock lock(rpc_lock);
-
- if(!acquire_queue.empty()) {
- AcquireEntry entry = acquire_queue.front();
- acquire_queue.pop_front();
-
- if(entry.name == "acquire_tile") {
- tile = entry.tile;
-
- if(tile.buffer) tile.buffer = ptr_map[tile.buffer];
-
- result = true;
- break;
- }
- else if(entry.name == "acquire_tile_none") {
- break;
- }
- else {
- cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
- }
- }
- } while(acquire_queue.empty() && !stop && !have_error());
-
- return result;
- }
-
- void task_update_progress_sample()
- {
- ; /* skip */
- }
-
- void task_update_tile_sample(RenderTile&)
- {
- ; /* skip */
- }
-
- void task_release_tile(RenderTile& tile)
- {
- thread_scoped_lock acquire_lock(acquire_mutex);
-
- if(tile.buffer) tile.buffer = ptr_imap[tile.buffer];
-
- {
- thread_scoped_lock lock(rpc_lock);
- RPCSend snd(socket, &error_func, "release_tile");
- snd.add(tile);
- snd.write();
- lock.unlock();
- }
-
- do {
- if(blocked_waiting)
- listen_step();
-
- /* todo: avoid busy wait loop */
- thread_scoped_lock lock(rpc_lock);
-
- if(!acquire_queue.empty()) {
- AcquireEntry entry = acquire_queue.front();
- acquire_queue.pop_front();
-
- if(entry.name == "release_tile") {
- lock.unlock();
- break;
- }
- else {
- cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n";
- }
- }
- } while(acquire_queue.empty() && !stop);
- }
-
- bool task_get_cancel()
- {
- return false;
- }
-
- /* properties */
- Device *device;
- tcp::socket& socket;
-
- /* mapping of remote to local pointer */
- PtrMap ptr_map;
- PtrMap ptr_imap;
- DataMap mem_data;
-
- struct AcquireEntry {
- string name;
- RenderTile tile;
- };
-
- thread_mutex acquire_mutex;
- list<AcquireEntry> acquire_queue;
-
- bool stop;
- bool blocked_waiting;
-private:
- NetworkError error_func;
-
- /* todo: free memory and device (osl) on network error */
-
+ public:
+ thread_mutex rpc_lock;
+
+ void network_error(const string &message)
+ {
+ error_func.network_error(message);
+ }
+
+ bool have_error()
+ {
+ return error_func.have_error();
+ }
+
+ DeviceServer(Device *device_, tcp::socket &socket_)
+ : device(device_), socket(socket_), stop(false), blocked_waiting(false)
+ {
+ error_func = NetworkError();
+ }
+
+ void listen()
+ {
+ /* receive remote function calls */
+ for (;;) {
+ listen_step();
+
+ if (stop)
+ break;
+ }
+ }
+
+ protected:
+ void listen_step()
+ {
+ thread_scoped_lock lock(rpc_lock);
+ RPCReceive rcv(socket, &error_func);
+
+ if (rcv.name == "stop")
+ stop = true;
+ else
+ process(rcv, lock);
+ }
+
+ /* create a memory buffer for a device buffer and insert it into mem_data */
+ DataVector &data_vector_insert(device_ptr client_pointer, size_t data_size)
+ {
+ /* create a new DataVector and insert it into mem_data */
+ pair<DataMap::iterator, bool> data_ins = mem_data.insert(
+ DataMap::value_type(client_pointer, DataVector()));
+
+ /* make sure it was a unique insertion */
+ assert(data_ins.second);
+
+ /* get a reference to the inserted vector */
+ DataVector &data_v = data_ins.first->second;
+
+ /* size the vector */
+ data_v.resize(data_size);
+
+ return data_v;
+ }
+
+ DataVector &data_vector_find(device_ptr client_pointer)
+ {
+ DataMap::iterator i = mem_data.find(client_pointer);
+ assert(i != mem_data.end());
+ return i->second;
+ }
+
+ /* setup mapping and reverse mapping of client_pointer<->real_pointer */
+ void pointer_mapping_insert(device_ptr client_pointer, device_ptr real_pointer)
+ {
+ pair<PtrMap::iterator, bool> mapins;
+
+ /* insert mapping from client pointer to our real device pointer */
+ mapins = ptr_map.insert(PtrMap::value_type(client_pointer, real_pointer));
+ assert(mapins.second);
+
+ /* insert reverse mapping from real our device pointer to client pointer */
+ mapins = ptr_imap.insert(PtrMap::value_type(real_pointer, client_pointer));
+ assert(mapins.second);
+ }
+
+ device_ptr device_ptr_from_client_pointer(device_ptr client_pointer)
+ {
+ PtrMap::iterator i = ptr_map.find(client_pointer);
+ assert(i != ptr_map.end());
+ return i->second;
+ }
+
+ device_ptr device_ptr_from_client_pointer_erase(device_ptr client_pointer)
+ {
+ PtrMap::iterator i = ptr_map.find(client_pointer);
+ assert(i != ptr_map.end());
+
+ device_ptr result = i->second;
+
+ /* erase the mapping */
+ ptr_map.erase(i);
+
+ /* erase the reverse mapping */
+ PtrMap::iterator irev = ptr_imap.find(result);
+ assert(irev != ptr_imap.end());
+ ptr_imap.erase(irev);
+
+ /* erase the data vector */
+ DataMap::iterator idata = mem_data.find(client_pointer);
+ assert(idata != mem_data.end());
+ mem_data.erase(idata);
+
+ return result;
+ }
+
+ /* note that the lock must be already acquired upon entry.
+ * This is necessary because the caller often peeks at
+ * the header and delegates control to here when it doesn't
+ * specifically handle the current RPC.
+ * The lock must be unlocked before returning */
+ void process(RPCReceive &rcv, thread_scoped_lock &lock)
+ {
+ if (rcv.name == "mem_alloc") {
+ string name;
+ network_device_memory mem(device);
+ rcv.read(mem, name);
+ lock.unlock();
+
+ /* Allocate host side data buffer. */
+ size_t data_size = mem.memory_size();
+ device_ptr client_pointer = mem.device_pointer;
+
+ DataVector &data_v = data_vector_insert(client_pointer, data_size);
+ mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0;
+
+ /* Perform the allocation on the actual device. */
+ device->mem_alloc(mem);
+
+ /* Store a mapping to/from client_pointer and real device pointer. */
+ pointer_mapping_insert(client_pointer, mem.device_pointer);
+ }
+ else if (rcv.name == "mem_copy_to") {
+ string name;
+ network_device_memory mem(device);
+ rcv.read(mem, name);
+ lock.unlock();
+
+ size_t data_size = mem.memory_size();
+ device_ptr client_pointer = mem.device_pointer;
+
+ if (client_pointer) {
+ /* Lookup existing host side data buffer. */
+ DataVector &data_v = data_vector_find(client_pointer);
+ mem.host_pointer = (void *)&data_v[0];
+
+ /* Translate the client pointer to a real device pointer. */
+ mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
+ }
+ else {
+ /* Allocate host side data buffer. */
+ DataVector &data_v = data_vector_insert(client_pointer, data_size);
+ mem.host_pointer = (data_size) ? (void *)&(data_v[0]) : 0;
+ }
+
+ /* Copy data from network into memory buffer. */
+ rcv.read_buffer((uint8_t *)mem.host_pointer, data_size);
+
+ /* Copy the data from the memory buffer to the device buffer. */
+ device->mem_copy_to(mem);
+
+ if (!client_pointer) {
+ /* Store a mapping to/from client_pointer and real device pointer. */
+ pointer_mapping_insert(client_pointer, mem.device_pointer);
+ }
+ }
+ else if (rcv.name == "mem_copy_from") {
+ string name;
+ network_device_memory mem(device);
+ int y, w, h, elem;
+
+ rcv.read(mem, name);
+ rcv.read(y);
+ rcv.read(w);
+ rcv.read(h);
+ rcv.read(elem);
+
+ device_ptr client_pointer = mem.device_pointer;
+ mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
+
+ DataVector &data_v = data_vector_find(client_pointer);
+
+ mem.host_pointer = (device_ptr) & (data_v[0]);
+
+ device->mem_copy_from(mem, y, w, h, elem);
+
+ size_t data_size = mem.memory_size();
+
+ RPCSend snd(socket, &error_func, "mem_copy_from");
+ snd.write();
+ snd.write_buffer((uint8_t *)mem.host_pointer, data_size);
+ lock.unlock();
+ }
+ else if (rcv.name == "mem_zero") {
+ string name;
+ network_device_memory mem(device);
+ rcv.read(mem, name);
+ lock.unlock();
+
+ size_t data_size = mem.memory_size();
+ device_ptr client_pointer = mem.device_pointer;
+
+ if (client_pointer) {
+ /* Lookup existing host side data buffer. */
+ DataVector &data_v = data_vector_find(client_pointer);
+ mem.host_pointer = (void *)&data_v[0];
+
+ /* Translate the client pointer to a real device pointer. */
+ mem.device_pointer = device_ptr_from_client_pointer(client_pointer);
+ }
+ else {
+ /* Allocate host side data buffer. */
+ DataVector &data_v = data_vector_insert(client_pointer, data_size);
+ mem.host_pointer = (void *) ? (device_ptr) & (data_v[0]) : 0;
+ }
+
+ /* Zero memory. */
+ device->mem_zero(mem);
+
+ if (!client_pointer) {
+ /* Store a mapping to/from client_pointer and real device pointer. */
+ pointer_mapping_insert(client_pointer, mem.device_pointer);
+ }
+ }
+ else if (rcv.name == "mem_free") {
+ string name;
+ network_device_memory mem(device);
+
+ rcv.read(mem, name);
+ lock.unlock();
+
+ device_ptr client_pointer = mem.device_pointer;
+
+ mem.device_pointer = device_ptr_from_client_pointer_erase(client_pointer);
+
+ device->mem_free(mem);
+ }
+ else if (rcv.name == "const_copy_to") {
+ string name_string;
+ size_t size;
+
+ rcv.read(name_string);
+ rcv.read(size);
+
+ vector<char> host_vector(size);
+ rcv.read_buffer(&host_vector[0], size);
+ lock.unlock();
+
+ device->const_copy_to(name_string.c_str(), &host_vector[0], size);
+ }
+ else if (rcv.name == "load_kernels") {
+ DeviceRequestedFeatures requested_features;
+ rcv.read(requested_features.experimental);
+ rcv.read(requested_features.max_closure);
+ rcv.read(requested_features.max_nodes_group);
+ rcv.read(requested_features.nodes_features);
+
+ bool result;
+ result = device->load_kernels(requested_features);
+ RPCSend snd(socket, &error_func, "load_kernels");
+ snd.add(result);
+ snd.write();
+ lock.unlock();
+ }
+ else if (rcv.name == "task_add") {
+ DeviceTask task;
+
+ rcv.read(task);
+ lock.unlock();
+
+ if (task.buffer)
+ task.buffer = device_ptr_from_client_pointer(task.buffer);
+
+ if (task.rgba_half)
+ task.rgba_half = device_ptr_from_client_pointer(task.rgba_half);
+
+ if (task.rgba_byte)
+ task.rgba_byte = device_ptr_from_client_pointer(task.rgba_byte);
+
+ if (task.shader_input)
+ task.shader_input = device_ptr_from_client_pointer(task.shader_input);
+
+ if (task.shader_output)
+ task.shader_output = device_ptr_from_client_pointer(task.shader_output);
+
+ task.acquire_tile = function_bind(&DeviceServer::task_acquire_tile, this, _1, _2);
+ task.release_tile = function_bind(&DeviceServer::task_release_tile, this, _1);
+ task.update_progress_sample = function_bind(&DeviceServer::task_update_progress_sample,
+ this);
+ task.update_tile_sample = function_bind(&DeviceServer::task_update_tile_sample, this, _1);
+ task.get_cancel = function_bind(&DeviceServer::task_get_cancel, this);
+
+ device->task_add(task);
+ }
+ else if (rcv.name == "task_wait") {
+ lock.unlock();
+
+ blocked_waiting = true;
+ device->task_wait();
+ blocked_waiting = false;
+
+ lock.lock();
+ RPCSend snd(socket, &error_func, "task_wait_done");
+ snd.write();
+ lock.unlock();
+ }
+ else if (rcv.name == "task_cancel") {
+ lock.unlock();
+ device->task_cancel();
+ }
+ else if (rcv.name == "acquire_tile") {
+ AcquireEntry entry;
+ entry.name = rcv.name;
+ rcv.read(entry.tile);
+ acquire_queue.push_back(entry);
+ lock.unlock();
+ }
+ else if (rcv.name == "acquire_tile_none") {
+ AcquireEntry entry;
+ entry.name = rcv.name;
+ acquire_queue.push_back(entry);
+ lock.unlock();
+ }
+ else if (rcv.name == "release_tile") {
+ AcquireEntry entry;
+ entry.name = rcv.name;
+ acquire_queue.push_back(entry);
+ lock.unlock();
+ }
+ else {
+ cout << "Error: unexpected RPC receive call \"" + rcv.name + "\"\n";
+ lock.unlock();
+ }
+ }
+
+ bool task_acquire_tile(Device *, RenderTile &tile)
+ {
+ thread_scoped_lock acquire_lock(acquire_mutex);
+
+ bool result = false;
+
+ RPCSend snd(socket, &error_func, "acquire_tile");
+ snd.write();
+
+ do {
+ if (blocked_waiting)
+ listen_step();
+
+ /* todo: avoid busy wait loop */
+ thread_scoped_lock lock(rpc_lock);
+
+ if (!acquire_queue.empty()) {
+ AcquireEntry entry = acquire_queue.front();
+ acquire_queue.pop_front();
+
+ if (entry.name == "acquire_tile") {
+ tile = entry.tile;
+
+ if (tile.buffer)
+ tile.buffer = ptr_map[tile.buffer];
+
+ result = true;
+ break;
+ }
+ else if (entry.name == "acquire_tile_none") {
+ break;
+ }
+ else {
+ cout << "Error: unexpected acquire RPC receive call \"" + entry.name + "\"\n";
+ }
+ }
+ } while (acquire_queue.empty() && !stop && !have_error());
+
+ return result;
+ }
+
+ void task_update_progress_sample()
+ {
+ ; /* skip */
+ }
+
+ void task_update_tile_sample(RenderTile &)
+ {
+ ; /* skip */
+ }
+
+ void task_release_tile(RenderTile &tile)
+ {
+ thread_scoped_lock acquire_lock(acquire_mutex);
+
+ if (tile.buffer)
+ tile.buffer = ptr_imap[tile.buffer];
+
+ {
+ thread_scoped_lock lock(rpc_lock);
+ RPCSend snd(socket, &error_func, "release_tile");
+ snd.add(tile);
+ snd.write();
+ lock.unlock();
+ }
+
+ do {
+ if (blocked_waiting)
+ listen_step();
+
+ /* todo: avoid busy wait loop */
+ thread_scoped_lock lock(rpc_lock);
+
+ if (!acquire_queue.empty()) {
+ AcquireEntry entry = acquire_queue.front();
+ acquire_queue.pop_front();
+
+ if (entry.name == "release_tile") {
+ lock.unlock();
+ break;
+ }
+ else {
+ cout << "Error: unexpected release RPC receive call \"" + entry.name + "\"\n";
+ }
+ }
+ } while (acquire_queue.empty() && !stop);
+ }
+
+ bool task_get_cancel()
+ {
+ return false;
+ }
+
+ /* properties */
+ Device *device;
+ tcp::socket &socket;
+
+ /* mapping of remote to local pointer */
+ PtrMap ptr_map;
+ PtrMap ptr_imap;
+ DataMap mem_data;
+
+ struct AcquireEntry {
+ string name;
+ RenderTile tile;
+ };
+
+ thread_mutex acquire_mutex;
+ list<AcquireEntry> acquire_queue;
+
+ bool stop;
+ bool blocked_waiting;
+
+ private:
+ NetworkError error_func;
+
+ /* todo: free memory and device (osl) on network error */
};
void Device::server_run()
{
- try {
- /* starts thread that responds to discovery requests */
- ServerDiscovery discovery;
-
- for(;;) {
- /* accept connection */
- boost::asio::io_service io_service;
- tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT));
-
- tcp::socket socket(io_service);
- acceptor.accept(socket);
-
- string remote_address = socket.remote_endpoint().address().to_string();
- printf("Connected to remote client at: %s\n", remote_address.c_str());
-
- DeviceServer server(this, socket);
- server.listen();
-
- printf("Disconnected.\n");
- }
- }
- catch(exception& e) {
- fprintf(stderr, "Network server exception: %s\n", e.what());
- }
+ try {
+ /* starts thread that responds to discovery requests */
+ ServerDiscovery discovery;
+
+ for (;;) {
+ /* accept connection */
+ boost::asio::io_service io_service;
+ tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), SERVER_PORT));
+
+ tcp::socket socket(io_service);
+ acceptor.accept(socket);
+
+ string remote_address = socket.remote_endpoint().address().to_string();
+ printf("Connected to remote client at: %s\n", remote_address.c_str());
+
+ DeviceServer server(this, socket);
+ server.listen();
+
+ printf("Disconnected.\n");
+ }
+ }
+ catch (exception &e) {
+ fprintf(stderr, "Network server exception: %s\n", e.what());
+ }
}
CCL_NAMESPACE_END