From 2bb986132e429b57104dc9cf4949d6e38f8fc053 Mon Sep 17 00:00:00 2001 From: Martin Poirier Date: Tue, 22 Mar 2011 01:42:06 +0000 Subject: Moving netrender to addons --- netrender/__init__.py | 81 ++++ netrender/balancing.py | 195 +++++++++ netrender/client.py | 376 ++++++++++++++++ netrender/master.py | 1079 ++++++++++++++++++++++++++++++++++++++++++++++ netrender/master_html.py | 315 ++++++++++++++ netrender/model.py | 360 ++++++++++++++++ netrender/netrender.css | 88 ++++ netrender/netrender.js | 146 +++++++ netrender/operators.py | 564 ++++++++++++++++++++++++ netrender/repath.py | 150 +++++++ netrender/slave.py | 349 +++++++++++++++ netrender/thumbnail.py | 81 ++++ netrender/ui.py | 547 +++++++++++++++++++++++ netrender/utils.py | 314 ++++++++++++++ netrender/versioning.py | 108 +++++ 15 files changed, 4753 insertions(+) create mode 100644 netrender/__init__.py create mode 100644 netrender/balancing.py create mode 100644 netrender/client.py create mode 100644 netrender/master.py create mode 100644 netrender/master_html.py create mode 100644 netrender/model.py create mode 100644 netrender/netrender.css create mode 100644 netrender/netrender.js create mode 100644 netrender/operators.py create mode 100644 netrender/repath.py create mode 100644 netrender/slave.py create mode 100644 netrender/thumbnail.py create mode 100644 netrender/ui.py create mode 100644 netrender/utils.py create mode 100644 netrender/versioning.py (limited to 'netrender') diff --git a/netrender/__init__.py b/netrender/__init__.py new file mode 100644 index 00000000..5ae4b774 --- /dev/null +++ b/netrender/__init__.py @@ -0,0 +1,81 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +# This directory is a Python package. + +bl_info = { + "name": "Network Renderer", + "author": "Martin Poirier", + "version": (1, 3), + "blender": (2, 5, 6), + "api": 35011, + "location": "Render > Engine > Network Render", + "description": "Distributed rendering for Blender", + "warning": "Stable but still work in progress", + "wiki_url": "http://wiki.blender.org/index.php/Doc:2.5/Manual/Render/Engines/Netrender", + "category": "Render"} + + +# To support reload properly, try to access a package var, if it's there, reload everything +if "init_data" in locals(): + import imp + imp.reload(model) + imp.reload(operators) + imp.reload(client) + imp.reload(slave) + imp.reload(master) + imp.reload(master_html) + imp.reload(utils) + imp.reload(balancing) + imp.reload(ui) + imp.reload(repath) + imp.reload(versioning) +else: + from netrender import model + from netrender import operators + from netrender import client + from netrender import slave + from netrender import master + from netrender import master_html + from netrender import utils + from netrender import balancing + from netrender import ui + from netrender import repath + from netrender import versioning + +jobs = [] +slaves = [] +blacklist = [] + +init_file = "" +valid_address = False +init_data = True + + +def register(): + import bpy + bpy.utils.register_module(__name__) + + scene = bpy.context.scene + if scene: + ui.init_data(scene.network_render) + + +def unregister(): + import bpy + bpy.utils.unregister_module(__name__) diff --git a/netrender/balancing.py b/netrender/balancing.py new file mode 100644 index 00000000..dde3ad53 --- /dev/null +++ b/netrender/balancing.py @@ -0,0 +1,195 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import time + +from netrender.utils import * +import netrender.model + +class RatingRule: + def __init__(self): + self.enabled = True + + def id(self): + return str(id(self)) + + def rate(self, job): + return 0 + +class ExclusionRule: + def __init__(self): + self.enabled = True + + def id(self): + return str(id(self)) + + def test(self, job): + return False + +class PriorityRule: + def __init__(self): + self.enabled = True + + def id(self): + return str(id(self)) + + def test(self, job): + return False + +class Balancer: + def __init__(self): + self.rules = [] + self.priorities = [] + self.exceptions = [] + + def ruleByID(self, rule_id): + for rule in self.rules: + if rule.id() == rule_id: + return rule + for rule in self.priorities: + if rule.id() == rule_id: + return rule + for rule in self.exceptions: + if rule.id() == rule_id: + return rule + + return None + + def addRule(self, rule): + self.rules.append(rule) + + def addPriority(self, priority): + self.priorities.append(priority) + + def addException(self, exception): + self.exceptions.append(exception) + + def applyRules(self, job): + return sum((rule.rate(job) for rule in self.rules if rule.enabled)) + + def applyPriorities(self, job): + for priority in self.priorities: + if priority.enabled and priority.test(job): + return True # priorities are first + + return False + + def applyExceptions(self, job): + for exception in self.exceptions: + if exception.enabled and exception.test(job): + return True # exceptions are last + + return False + + def sortKey(self, job): + return (1 if self.applyExceptions(job) else 0, # exceptions after + 0 if self.applyPriorities(job) else 1, # priorities first + self.applyRules(job)) + + def balance(self, jobs): + if jobs: + # use inline copy to make sure the list is still accessible while sorting + jobs[:] = sorted(jobs, key=self.sortKey) + return jobs[0] + else: + return None + +# ========================== + +class RatingUsage(RatingRule): + def __str__(self): + return "Usage per job" + + def rate(self, job): + # less usage is better + return job.usage / job.priority + +class RatingUsageByCategory(RatingRule): + def __init__(self, get_jobs): + super().__init__() + self.getJobs = get_jobs + + def __str__(self): + return "Usage per category" + + def rate(self, job): + total_category_usage = sum([j.usage for j in self.getJobs() if j.category == job.category]) + maximum_priority = max([j.priority for j in self.getJobs() if j.category == job.category]) + + # less usage is better + return total_category_usage / maximum_priority + +class NewJobPriority(PriorityRule): + def __init__(self, limit = 1): + super().__init__() + self.limit = limit + + def setLimit(self, value): + self.limit = int(value) + + def str_limit(self): + return "less than %i frame%s done" % (self.limit, "s" if self.limit > 1 else "") + + def __str__(self): + return "Priority to new jobs" + + def test(self, job): + return job.countFrames(status = DONE) < self.limit + +class MinimumTimeBetweenDispatchPriority(PriorityRule): + def __init__(self, limit = 10): + super().__init__() + self.limit = limit + + def setLimit(self, value): + self.limit = int(value) + + def str_limit(self): + return "more than %i minute%s since last" % (self.limit, "s" if self.limit > 1 else "") + + def __str__(self): + return "Priority to jobs that haven't been dispatched recently" + + def test(self, job): + return job.countFrames(status = DISPATCHED) == 0 and (time.time() - job.last_dispatched) / 60 > self.limit + +class ExcludeQueuedEmptyJob(ExclusionRule): + def __str__(self): + return "Exclude non queued or empty jobs" + + def test(self, job): + return job.status != JOB_QUEUED or job.countFrames(status = QUEUED) == 0 + +class ExcludeSlavesLimit(ExclusionRule): + def __init__(self, count_jobs, count_slaves, limit = 0.75): + super().__init__() + self.count_jobs = count_jobs + self.count_slaves = count_slaves + self.limit = limit + + def setLimit(self, value): + self.limit = float(value) + + def str_limit(self): + return "more than %.0f%% of all slaves" % (self.limit * 100) + + def __str__(self): + return "Exclude jobs that would use too many slaves" + + def test(self, job): + return not ( self.count_jobs() == 1 or self.count_slaves() <= 1 or float(job.countSlaves() + 1) / self.count_slaves() <= self.limit ) diff --git a/netrender/client.py b/netrender/client.py new file mode 100644 index 00000000..710706f7 --- /dev/null +++ b/netrender/client.py @@ -0,0 +1,376 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import bpy +import sys, os, re +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib +import json + +import netrender +import netrender.model +import netrender.slave as slave +import netrender.master as master +from netrender.utils import * + +def addFluidFiles(job, path): + if os.path.exists(path): + pattern = re.compile("fluidsurface_(final|preview)_([0-9]+)\.(bobj|bvel)\.gz") + + for fluid_file in sorted(os.listdir(path)): + match = pattern.match(fluid_file) + + if match: + # fluid frames starts at 0, which explains the +1 + # This is stupid + current_frame = int(match.groups()[1]) + 1 + job.addFile(path + fluid_file, current_frame, current_frame) + +def addPointCache(job, ob, point_cache, default_path): + if not point_cache.use_disk_cache: + return + + + name = point_cache.name + if name == "": + name = "".join(["%02X" % ord(c) for c in ob.name]) + + cache_path = bpy.path.abspath(point_cache.filepath) if point_cache.use_external else default_path + + index = "%02i" % point_cache.index + + if os.path.exists(cache_path): + pattern = re.compile(name + "_([0-9]+)_" + index + "\.bphys") + + cache_files = [] + + for cache_file in sorted(os.listdir(cache_path)): + match = pattern.match(cache_file) + + if match: + cache_frame = int(match.groups()[0]) + cache_files.append((cache_frame, cache_file)) + + cache_files.sort() + + if len(cache_files) == 1: + cache_frame, cache_file = cache_files[0] + job.addFile(cache_path + cache_file, cache_frame, cache_frame) + else: + for i in range(len(cache_files)): + current_item = cache_files[i] + next_item = cache_files[i+1] if i + 1 < len(cache_files) else None + previous_item = cache_files[i - 1] if i > 0 else None + + current_frame, current_file = current_item + + if not next_item and not previous_item: + job.addFile(cache_path + current_file, current_frame, current_frame) + elif next_item and not previous_item: + next_frame = next_item[0] + job.addFile(cache_path + current_file, current_frame, next_frame - 1) + elif not next_item and previous_item: + previous_frame = previous_item[0] + job.addFile(cache_path + current_file, previous_frame + 1, current_frame) + else: + next_frame = next_item[0] + previous_frame = previous_item[0] + job.addFile(cache_path + current_file, previous_frame + 1, next_frame - 1) + +def fillCommonJobSettings(job, job_name, netsettings): + job.name = job_name + job.category = netsettings.job_category + + for slave in netrender.blacklist: + job.blacklist.append(slave.id) + + job.chunks = netsettings.chunks + job.priority = netsettings.priority + + if netsettings.job_type == "JOB_BLENDER": + job.type = netrender.model.JOB_BLENDER + elif netsettings.job_type == "JOB_PROCESS": + job.type = netrender.model.JOB_PROCESS + elif netsettings.job_type == "JOB_VCS": + job.type = netrender.model.JOB_VCS + +def clientSendJob(conn, scene, anim = False): + netsettings = scene.network_render + if netsettings.job_type == "JOB_BLENDER": + return clientSendJobBlender(conn, scene, anim) + elif netsettings.job_type == "JOB_VCS": + return clientSendJobVCS(conn, scene, anim) + +def clientSendJobVCS(conn, scene, anim = False): + netsettings = scene.network_render + job = netrender.model.RenderJob() + + if anim: + for f in range(scene.frame_start, scene.frame_end + 1): + job.addFrame(f) + else: + job.addFrame(scene.frame_current) + + filename = bpy.data.filepath + + if not filename.startswith(netsettings.vcs_wpath): + # this is an error, need better way to handle this + return + + filename = filename[len(netsettings.vcs_wpath):] + + if filename[0] in (os.sep, os.altsep): + filename = filename[1:] + + print("CREATING VCS JOB", filename) + + job.addFile(filename, signed=False) + + job_name = netsettings.job_name + path, name = os.path.split(filename) + if job_name == "[default]": + job_name = name + + + fillCommonJobSettings(job, job_name, netsettings) + + # VCS Specific code + job.version_info = netrender.model.VersioningInfo() + job.version_info.system = netsettings.vcs_system + job.version_info.wpath = netsettings.vcs_wpath + job.version_info.rpath = netsettings.vcs_rpath + job.version_info.revision = netsettings.vcs_revision + + # try to send path first + conn.request("POST", "/job", json.dumps(job.serialize())) + response = conn.getresponse() + response.read() + + job_id = response.getheader("job-id") + + # a VCS job is always good right now, need error handling + + return job_id + +def clientSendJobBlender(conn, scene, anim = False): + netsettings = scene.network_render + job = netrender.model.RenderJob() + + if anim: + for f in range(scene.frame_start, scene.frame_end + 1): + job.addFrame(f) + else: + job.addFrame(scene.frame_current) + + filename = bpy.data.filepath + + if not os.path.exists(filename): + raise RuntimeError("Current filepath not defined") + + job.addFile(filename) + + job_name = netsettings.job_name + path, name = os.path.split(filename) + if job_name == "[default]": + job_name = name + + ########################### + # LIBRARIES + ########################### + for lib in bpy.data.libraries: + file_path = bpy.path.abspath(lib.filepath) + if os.path.exists(file_path): + job.addFile(file_path) + + ########################### + # IMAGES + ########################### + for image in bpy.data.images: + if image.source == "FILE" and not image.packed_file: + file_path = bpy.path.abspath(image.filepath) + if os.path.exists(file_path): + job.addFile(file_path) + + tex_path = os.path.splitext(file_path)[0] + ".tex" + if os.path.exists(tex_path): + job.addFile(tex_path) + + ########################### + # FLUID + POINT CACHE + ########################### + root, ext = os.path.splitext(name) + default_path = path + os.sep + "blendcache_" + root + os.sep # need an API call for that + + for object in bpy.data.objects: + for modifier in object.modifiers: + if modifier.type == 'FLUID_SIMULATION' and modifier.settings.type == "DOMAIN": + addFluidFiles(job, bpy.path.abspath(modifier.settings.filepath)) + elif modifier.type == "CLOTH": + addPointCache(job, object, modifier.point_cache, default_path) + elif modifier.type == "SOFT_BODY": + addPointCache(job, object, modifier.point_cache, default_path) + elif modifier.type == "SMOKE" and modifier.smoke_type == "TYPE_DOMAIN": + addPointCache(job, object, modifier.domain_settings.point_cache, default_path) + elif modifier.type == "MULTIRES" and modifier.is_external: + file_path = bpy.path.abspath(modifier.filepath) + job.addFile(file_path) + + # particles modifier are stupid and don't contain data + # we have to go through the object property + for psys in object.particle_systems: + addPointCache(job, object, psys.point_cache, default_path) + + #print(job.files) + + fillCommonJobSettings(job, job_name, netsettings) + + # try to send path first + conn.request("POST", "/job", json.dumps(job.serialize())) + response = conn.getresponse() + response.read() + + job_id = response.getheader("job-id") + + # if not ACCEPTED (but not processed), send files + if response.status == http.client.ACCEPTED: + for rfile in job.files: + f = open(rfile.filepath, "rb") + conn.request("PUT", fileURL(job_id, rfile.index), f) + f.close() + response = conn.getresponse() + response.read() + + # server will reply with ACCEPTED until all files are found + + return job_id + +def requestResult(conn, job_id, frame): + conn.request("GET", renderURL(job_id, frame)) + +class NetworkRenderEngine(bpy.types.RenderEngine): + bl_idname = 'NET_RENDER' + bl_label = "Network Render" + bl_use_postprocess = False + def render(self, scene): + if scene.network_render.mode == "RENDER_CLIENT": + self.render_client(scene) + elif scene.network_render.mode == "RENDER_SLAVE": + self.render_slave(scene) + elif scene.network_render.mode == "RENDER_MASTER": + self.render_master(scene) + else: + print("UNKNOWN OPERATION MODE") + + def render_master(self, scene): + netsettings = scene.network_render + + address = "" if netsettings.server_address == "[default]" else netsettings.server_address + + master.runMaster((address, netsettings.server_port), netsettings.use_master_broadcast, netsettings.use_master_clear, bpy.path.abspath(netsettings.path), self.update_stats, self.test_break) + + + def render_slave(self, scene): + slave.render_slave(self, scene.network_render, scene.render.threads) + + def render_client(self, scene): + netsettings = scene.network_render + self.update_stats("", "Network render client initiation") + + + conn = clientConnection(netsettings.server_address, netsettings.server_port) + + if conn: + # Sending file + + self.update_stats("", "Network render exporting") + + new_job = False + + job_id = netsettings.job_id + + # reading back result + + self.update_stats("", "Network render waiting for results") + + + requestResult(conn, job_id, scene.frame_current) + response = conn.getresponse() + buf = response.read() + + if response.status == http.client.NO_CONTENT: + new_job = True + netsettings.job_id = clientSendJob(conn, scene) + job_id = netsettings.job_id + + requestResult(conn, job_id, scene.frame_current) + response = conn.getresponse() + buf = response.read() + + while response.status == http.client.ACCEPTED and not self.test_break(): + time.sleep(1) + requestResult(conn, job_id, scene.frame_current) + response = conn.getresponse() + buf = response.read() + + # cancel new jobs (animate on network) on break + if self.test_break() and new_job: + conn.request("POST", cancelURL(job_id)) + response = conn.getresponse() + response.read() + print( response.status, response.reason ) + netsettings.job_id = 0 + + if response.status != http.client.OK: + conn.close() + return + + r = scene.render + x= int(r.resolution_x*r.resolution_percentage*0.01) + y= int(r.resolution_y*r.resolution_percentage*0.01) + + result_path = os.path.join(bpy.path.abspath(netsettings.path), "output.exr") + + folder = os.path.split(result_path)[0] + + if not os.path.exists(folder): + os.mkdir(folder) + + f = open(result_path, "wb") + + f.write(buf) + + f.close() + + result = self.begin_result(0, 0, x, y) + result.load_from_file(result_path) + self.end_result(result) + + conn.close() + +def compatible(module): + module = __import__("bl_ui." + module) + for subclass in module.__dict__.values(): + try: subclass.COMPAT_ENGINES.add('NET_RENDER') + except: pass + del module + +compatible("properties_world") +compatible("properties_material") +compatible("properties_data_mesh") +compatible("properties_data_camera") +compatible("properties_texture") diff --git a/netrender/master.py b/netrender/master.py new file mode 100644 index 00000000..793e3bb5 --- /dev/null +++ b/netrender/master.py @@ -0,0 +1,1079 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import http, http.client, http.server, urllib, socket, socketserver, threading +import subprocess, shutil, time, hashlib +import pickle +import select # for select.error +import json + +from netrender.utils import * +import netrender.model +import netrender.balancing +import netrender.master_html +import netrender.thumbnail as thumbnail + +class MRenderFile(netrender.model.RenderFile): + def __init__(self, filepath, index, start, end, signature): + super().__init__(filepath, index, start, end, signature) + self.found = False + + def test(self): + self.found = os.path.exists(self.filepath) + if self.found and self.signature != None: + found_signature = hashFile(self.filepath) + self.found = self.signature == found_signature + + return self.found + + +class MRenderSlave(netrender.model.RenderSlave): + def __init__(self, name, address, stats): + super().__init__() + self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest() + self.name = name + self.address = address + self.stats = stats + self.last_seen = time.time() + + self.job = None + self.job_frames = [] + + netrender.model.RenderSlave._slave_map[self.id] = self + + def seen(self): + self.last_seen = time.time() + + def finishedFrame(self, frame_number): + self.job_frames.remove(frame_number) + if not self.job_frames: + self.job = None + +class MRenderJob(netrender.model.RenderJob): + def __init__(self, job_id, job_info): + super().__init__(job_info) + self.id = job_id + self.last_dispatched = time.time() + + # force one chunk for process jobs + if self.type == netrender.model.JOB_PROCESS: + self.chunks = 1 + + # Force WAITING status on creation + self.status = JOB_WAITING + + # special server properties + self.last_update = 0 + self.save_path = "" + self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files] + + def initInfo(self): + if not self.resolution: + self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"])) + + def save(self): + if self.save_path: + f = open(os.path.join(self.save_path, "job.txt"), "w") + f.write(json.dumps(self.serialize())) + f.close() + + def edit(self, info_map): + if "status" in info_map: + self.status = info_map["status"] + + if "priority" in info_map: + self.priority = info_map["priority"] + + if "chunks" in info_map: + self.chunks = info_map["chunks"] + + def testStart(self): + # Don't test files for versionned jobs + if not self.version_info: + for f in self.files: + if not f.test(): + return False + + self.start() + self.initInfo() + return True + + def testFinished(self): + for f in self.frames: + if f.status == QUEUED or f.status == DISPATCHED: + break + else: + self.status = JOB_FINISHED + + def pause(self, status = None): + if self.status not in {JOB_PAUSED, JOB_QUEUED}: + return + + if status is None: + self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED + elif status: + self.status = JOB_QUEUED + else: + self.status = JOB_PAUSED + + def start(self): + self.status = JOB_QUEUED + + def addLog(self, frames): + log_name = "_".join(("%06d" % f for f in frames)) + ".log" + log_path = os.path.join(self.save_path, log_name) + + for number in frames: + frame = self[number] + if frame: + frame.log_path = log_path + + def addFrame(self, frame_number, command): + frame = MRenderFrame(frame_number, command) + self.frames.append(frame) + return frame + + def reset(self, all): + for f in self.frames: + f.reset(all) + + def getFrames(self): + frames = [] + for f in self.frames: + if f.status == QUEUED: + self.last_dispatched = time.time() + frames.append(f) + if len(frames) >= self.chunks: + break + + return frames + +class MRenderFrame(netrender.model.RenderFrame): + def __init__(self, frame, command): + super().__init__() + self.number = frame + self.slave = None + self.time = 0 + self.status = QUEUED + self.command = command + + self.log_path = None + + def reset(self, all): + if all or self.status == ERROR: + self.log_path = None + self.slave = None + self.time = 0 + self.status = QUEUED + + +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)") +render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr") +thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg") +log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log") +reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)") +cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)") +pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)") +edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)") + +class RenderHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, format, *args): + # override because the original calls self.address_string(), which + # is extremely slow due to some timeout.. + sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args)) + + def getInfoMap(self): + length = int(self.headers['content-length']) + + if length > 0: + msg = str(self.rfile.read(length), encoding='utf8') + return json.loads(msg) + else: + return {} + + def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"): + self.send_response(code) + self.send_header("Content-type", content) + + for key, value in headers.items(): + self.send_header(key, value) + + self.end_headers() + + def do_HEAD(self): + + if self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + job = self.server.getJobID(job_id) + if job: + frame = job[job_frame] + + + if frame: + self.send_head(http.client.OK) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + + def do_GET(self): + + if self.path == "/version": + self.send_head() + self.server.stats("", "Version check") + self.wfile.write(VERSION) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/render"): + match = render_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.ACCEPTED) + elif frame.status == DONE: + self.server.stats("", "Sending result to client") + + filename = os.path.join(job.save_path, "%06d.exr" % frame_number) + + f = open(filename, 'rb') + self.send_head(content = "image/x-exr") + shutil.copyfileobj(f, self.wfile) + f.close() + elif frame.status == ERROR: + self.send_head(http.client.PARTIAL_CONTENT) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/thumb"): + match = thumb_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.ACCEPTED) + elif frame.status == DONE: + filename = os.path.join(job.save_path, "%06d.exr" % frame_number) + + thumbname = thumbnail.generate(filename) + + if thumbname: + f = open(thumbname, 'rb') + self.send_head(content = "image/jpeg") + shutil.copyfileobj(f, self.wfile) + f.close() + else: # thumbnail couldn't be generated + self.send_head(http.client.PARTIAL_CONTENT) + return + elif frame.status == ERROR: + self.send_head(http.client.PARTIAL_CONTENT) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/log"): + match = log_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if not frame.log_path or frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.PROCESSING) + else: + self.server.stats("", "Sending log to client") + f = open(frame.log_path, 'rb') + + self.send_head(content = "text/plain") + + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid URL + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + if job_id: + + job = self.server.getJobID(job_id) + if job: + if job_frame != -1: + frame = job[frame] + + if frame: + message = frame.serialize() + else: + # no such frame + self.send_heat(http.client.NO_CONTENT) + return + else: + message = job.serialize() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + return + else: # status of all jobs + message = [] + + for job in self.server: + message.append(job.serialize()) + + + self.server.stats("", "Sending status") + self.send_head() + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/job": + self.server.balance() + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job, frames = self.server.newDispatch(slave_id) + + if job and frames: + for f in frames: + print("dispatch", f.number) + f.status = DISPATCHED + f.slave = slave + + slave.job = job + slave.job_frames = [f.number for f in frames] + + self.send_head(headers={"job-id": job.id}) + + message = job.serialize(frames) + + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + + self.server.stats("", "Sending job to slave") + else: + # no job available, return error code + slave.job = None + slave.job_frames = [] + + self.send_head(http.client.ACCEPTED) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/file"): + match = file_pattern.match(self.path) + + if match: + slave_id = self.headers['slave-id'] + slave = self.server.getSeenSlave(slave_id) + + if not slave: + # invalid slave id + print("invalid slave id") + + job_id = match.groups()[0] + file_index = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + render_file = job.files[file_index] + + if render_file: + self.server.stats("", "Sending file to slave") + f = open(render_file.filepath, 'rb') + + self.send_head() + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such file + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slaves": + message = [] + + self.server.stats("", "Sending slaves status") + + for slave in self.server.slaves: + message.append(slave.serialize()) + + self.send_head() + + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + else: + # hand over the rest to the html section + netrender.master_html.get(self) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_POST(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path == "/job": + + length = int(self.headers['content-length']) + + job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8'))) + + job_id = self.server.nextJobID() + + job = MRenderJob(job_id, job_info) + + for frame in job_info.frames: + frame = job.addFrame(frame.number, frame.command) + + self.server.addJob(job) + + headers={"job-id": job_id} + + if job.testStart(): + self.server.stats("", "New job, started") + self.send_head(headers=headers) + else: + self.server.stats("", "New job, missing files (%i total)" % len(job.files)) + self.send_head(http.client.ACCEPTED, headers=headers) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/edit"): + match = edit_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + info_map = self.getInfoMap() + + job.edit(info_map) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/balance_limit": + info_map = self.getInfoMap() + for rule_id, limit in info_map.items(): + try: + rule = self.server.balancer.ruleByID(rule_id) + if rule: + rule.setLimit(limit) + except: + pass # invalid type + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/balance_enable": + info_map = self.getInfoMap() + for rule_id, enabled in info_map.items(): + rule = self.server.balancer.ruleByID(rule_id) + if rule: + rule.enabled = enabled + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/cancel"): + match = cancel_pattern.match(self.path) + + if match: + info_map = self.getInfoMap() + clear = info_map.get("clear", False) + + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + self.server.stats("", "Cancelling job") + self.server.removeJob(job, clear) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/pause"): + match = pause_pattern.match(self.path) + + if match: + info_map = self.getInfoMap() + status = info_map.get("status", None) + + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + self.server.stats("", "Pausing job") + job.pause(status) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/clear": + # cancel all jobs + info_map = self.getInfoMap() + clear = info_map.get("clear", False) + + self.server.stats("", "Clearing jobs") + self.server.clear(clear) + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/reset"): + match = reset_pattern.match(self.path) + + if match: + all = match.groups()[0] == 'all' + job_id = match.groups()[1] + job_frame = int(match.groups()[2]) + + job = self.server.getJobID(job_id) + + if job: + if job_frame != 0: + + frame = job[job_frame] + if frame: + self.server.stats("", "Reset job frame") + frame.reset(all) + self.send_head() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + + else: + self.server.stats("", "Reset job") + job.reset(all) + self.send_head() + + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slave": + length = int(self.headers['content-length']) + job_frame_string = self.headers['job-frame'] + + self.server.stats("", "New slave connected") + + slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False) + + slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats) + + self.send_head(headers = {"slave-id": slave_id}) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/log": + length = int(self.headers['content-length']) + + log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8'))) + + slave_id = log_info.slave_id + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job = self.server.getJobID(log_info.job_id) + + if job: + self.server.stats("", "Log announcement") + job.addLog(log_info.frames) + self.send_head(http.client.OK) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_PUT(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path.startswith("/file"): + match = file_pattern.match(self.path) + + if match: + self.server.stats("", "Receiving job") + + length = int(self.headers['content-length']) + job_id = match.groups()[0] + file_index = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + + render_file = job.files[file_index] + + if render_file: + main_file = job.files[0].filepath # filename of the first file + + main_path, main_name = os.path.split(main_file) + + if file_index > 0: + file_path = prefixPath(job.save_path, render_file.filepath, main_path) + else: + file_path = os.path.join(job.save_path, main_name) + + buf = self.rfile.read(length) + + # add same temp file + renames as slave + + f = open(file_path, "wb") + f.write(buf) + f.close() + del buf + + render_file.filepath = file_path # set the new path + + if job.testStart(): + self.server.stats("", "File upload, starting job") + self.send_head(http.client.OK) + else: + self.server.stats("", "File upload, file missings") + self.send_head(http.client.ACCEPTED) + else: # invalid file + print("file not found", job_id, file_index) + self.send_head(http.client.NO_CONTENT) + else: # job not found + print("job not found", job_id, file_index) + self.send_head(http.client.NO_CONTENT) + else: # invalid url + print("no match") + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/render": + self.server.stats("", "Receiving render result") + + # need some message content here or the slave doesn't like it + self.wfile.write(bytes("foo", encoding='utf8')) + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + job_result = int(self.headers['job-result']) + job_time = float(self.headers['job-time']) + + frame = job[job_frame] + + if frame: + if job.hasRenderResult(): + if job_result == DONE: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb') + f.write(buf) + f.close() + + del buf + elif job_result == ERROR: + # blacklist slave on this job on error + # slaves might already be in blacklist if errors on the whole chunk + if not slave.id in job.blacklist: + job.blacklist.append(slave.id) + + slave.finishedFrame(job_frame) + + frame.status = job_result + frame.time = job_time + + job.testFinished() + + self.send_head() + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/thumb": + self.server.stats("", "Receiving thumbnail result") + + # need some message content here or the slave doesn't like it + self.wfile.write(bytes("foo", encoding='utf8')) + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + + frame = job[job_frame] + + if frame: + if job.hasRenderResult(): + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb') + f.write(buf) + f.close() + + del buf + + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/log"): + self.server.stats("", "Receiving log file") + + match = log_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(match.groups()[1]) + + frame = job[job_frame] + + if frame and frame.log_path: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(frame.log_path, 'ab') + f.write(buf) + f.close() + + del buf + + self.server.getSeenSlave(self.headers['slave-id']) + + self.send_head() + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + +class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + def __init__(self, address, handler_class, path, subdir=True): + super().__init__(address, handler_class) + self.jobs = [] + self.jobs_map = {} + self.slaves = [] + self.slaves_map = {} + self.job_id = 0 + + if subdir: + self.path = os.path.join(path, "master_" + str(os.getpid())) + else: + self.path = path + + self.slave_timeout = 5 # 5 mins: need a parameter for that + + self.balancer = netrender.balancing.Balancer() + self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs)) + self.balancer.addRule(netrender.balancing.RatingUsage()) + self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob()) + self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9)) + self.balancer.addPriority(netrender.balancing.NewJobPriority()) + self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2)) + + if not os.path.exists(self.path): + os.mkdir(self.path) + + def restore(self, jobs, slaves, balancer = None): + self.jobs = jobs + self.jobs_map = {} + + for job in self.jobs: + self.jobs_map[job.id] = job + self.job_id = max(self.job_id, int(job.id)) + + self.slaves = slaves + for slave in self.slaves: + self.slaves_map[slave.id] = slave + + if balancer: + self.balancer = balancer + + + def nextJobID(self): + self.job_id += 1 + return str(self.job_id) + + def addSlave(self, name, address, stats): + slave = MRenderSlave(name, address, stats) + self.slaves.append(slave) + self.slaves_map[slave.id] = slave + + return slave.id + + def removeSlave(self, slave): + self.slaves.remove(slave) + self.slaves_map.pop(slave.id) + + def getSlave(self, slave_id): + return self.slaves_map.get(slave_id) + + def getSeenSlave(self, slave_id): + slave = self.getSlave(slave_id) + if slave: + slave.seen() + + return slave + + def timeoutSlaves(self): + removed = [] + + t = time.time() + + for slave in self.slaves: + if (t - slave.last_seen) / 60 > self.slave_timeout: + removed.append(slave) + + if slave.job: + for f in slave.job_frames: + slave.job[f].status = ERROR + + for slave in removed: + self.removeSlave(slave) + + def updateUsage(self): + blend = 0.5 + for job in self.jobs: + job.usage *= (1 - blend) + + if self.slaves: + slave_usage = blend / self.countSlaves() + + for slave in self.slaves: + if slave.job: + slave.job.usage += slave_usage + + + def clear(self, clear_files = False): + removed = self.jobs[:] + + for job in removed: + self.removeJob(job, clear_files) + + def balance(self): + self.balancer.balance(self.jobs) + + def getJobs(self): + return self.jobs + + def countJobs(self, status = JOB_QUEUED): + total = 0 + for j in self.jobs: + if j.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(self.slaves) + + def removeJob(self, job, clear_files = False): + self.jobs.remove(job) + self.jobs_map.pop(job.id) + + if clear_files: + shutil.rmtree(job.save_path) + + for slave in self.slaves: + if slave.job == job: + slave.job = None + slave.job_frames = [] + + def addJob(self, job): + self.jobs.append(job) + self.jobs_map[job.id] = job + + # create job directory + job.save_path = os.path.join(self.path, "job_" + job.id) + if not os.path.exists(job.save_path): + os.mkdir(job.save_path) + + job.save() + + def getJobID(self, id): + return self.jobs_map.get(id) + + def __iter__(self): + for job in self.jobs: + yield job + + def newDispatch(self, slave_id): + if self.jobs: + for job in self.jobs: + if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist: + return job, job.getFrames() + + return None, None + +def clearMaster(path): + shutil.rmtree(path) + +def createMaster(address, clear, path): + filepath = os.path.join(path, "blender_master.data") + + if not clear and os.path.exists(filepath): + print("loading saved master:", filepath) + with open(filepath, 'rb') as f: + path, jobs, slaves = pickle.load(f) + + httpd = RenderMasterServer(address, RenderHandler, path, subdir=False) + httpd.restore(jobs, slaves) + + return httpd + + return RenderMasterServer(address, RenderHandler, path) + +def saveMaster(path, httpd): + filepath = os.path.join(path, "blender_master.data") + + with open(filepath, 'wb') as f: + pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL) + +def runMaster(address, broadcast, clear, path, update_stats, test_break): + httpd = createMaster(address, clear, path) + httpd.timeout = 1 + httpd.stats = update_stats + + if broadcast: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + start_time = time.time() - 2 + + while not test_break(): + try: + httpd.handle_request() + except select.error: + pass + + if time.time() - start_time >= 2: # need constant here + httpd.timeoutSlaves() + + httpd.updateUsage() + + if broadcast: + print("broadcasting address") + s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('', 8000)) + start_time = time.time() + + httpd.server_close() + if clear: + clearMaster(httpd.path) + else: + saveMaster(path, httpd) + diff --git a/netrender/master_html.py b/netrender/master_html.py new file mode 100644 index 00000000..87727320 --- /dev/null +++ b/netrender/master_html.py @@ -0,0 +1,315 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import os +import re +import shutil +from netrender.utils import * +import netrender.model + +src_folder = os.path.split(__file__)[0] + +def get(handler): + def output(text): + handler.wfile.write(bytes(text, encoding='utf8')) + + def head(title, refresh = False): + output("") + if refresh: + output("") + output("") +# output("") + output("") + output(title) + output("") + output("") + + + def link(text, url, script=""): + return "%s" % (url, script, text) + + def tag(name, text, attr=""): + return "<%s %s>%s" % (name, attr, text, name) + + def startTable(border=1, class_style = None, caption = None): + output("") + + if caption: + output("" % caption) + + def headerTable(*headers): + output("") + + for c in headers: + output("") + + output("") + + def rowTable(*data, id = None, class_style = None, extra = None): + output("") + + for c in data: + output("") + + output("") + + def endTable(): + output("
%s
" + c + "
" + str(c) + "
") + + def checkbox(title, value, script=""): + return """""" % (title, "checked" if value else "", ("onclick=\"%s\"" % script) if script else "") + + if handler.path == "/html/netrender.js": + f = open(os.path.join(src_folder, "netrender.js"), 'rb') + + handler.send_head(content = "text/javascript") + shutil.copyfileobj(f, handler.wfile) + + f.close() + elif handler.path == "/html/netrender.css": + f = open(os.path.join(src_folder, "netrender.css"), 'rb') + + handler.send_head(content = "text/css") + shutil.copyfileobj(f, handler.wfile) + + f.close() + elif handler.path == "/html" or handler.path == "/": + handler.send_head(content = "text/html") + head("NetRender", refresh = True) + + output("

Jobs

") + + startTable() + headerTable( + " ", + "id", + "name", + "category", + "type", + "chunks", + "priority", + "usage", + "wait", + "status", + "length", + "done", + "dispatched", + "error", + "priority", + "exception" + ) + + handler.server.balance() + + for job in handler.server.jobs: + results = job.framesStatus() + rowTable( + """""" % job.id + + """""" % job.id + + """""" % job.id, + job.id, + link(job.name, "/html/job" + job.id), + job.category if job.category else "None", + netrender.model.JOB_TYPES[job.type], + str(job.chunks) + + """""" % (job.id, job.chunks + 1) + + """""" % (job.id, job.chunks - 1, "disabled=True" if job.chunks == 1 else ""), + str(job.priority) + + """""" % (job.id, job.priority + 1) + + """""" % (job.id, job.priority - 1, "disabled=True" if job.priority == 1 else ""), + "%0.1f%%" % (job.usage * 100), + "%is" % int(time.time() - job.last_dispatched), + job.statusText(), + len(job), + results[DONE], + results[DISPATCHED], + str(results[ERROR]) + + """""" % (job.id, "disabled=True" if not results[ERROR] else ""), + "yes" if handler.server.balancer.applyPriorities(job) else "no", + "yes" if handler.server.balancer.applyExceptions(job) else "no" + ) + + endTable() + + output("

Slaves

") + + startTable() + headerTable("name", "address", "last seen", "stats", "job") + + for slave in handler.server.slaves: + rowTable(slave.name, slave.address[0], time.ctime(slave.last_seen), slave.stats, link(slave.job.name, "/html/job" + slave.job.id) if slave.job else "None") + + endTable() + + output("

Configuration

") + + output("""""") + + startTable(caption = "Rules", class_style = "rules") + + headerTable("type", "enabled", "description", "limit") + + for rule in handler.server.balancer.rules: + rowTable( + "rating", + checkbox("", rule.enabled, "balance_enable('%s', '%s')" % (rule.id(), str(not rule.enabled).lower())), + rule, + rule.str_limit() + + """""" % (rule.id(), str(rule.limit)) if hasattr(rule, "limit") else " " + ) + + for rule in handler.server.balancer.priorities: + rowTable( + "priority", + checkbox("", rule.enabled, "balance_enable('%s', '%s')" % (rule.id(), str(not rule.enabled).lower())), + rule, + rule.str_limit() + + """""" % (rule.id(), str(rule.limit)) if hasattr(rule, "limit") else " " + ) + + for rule in handler.server.balancer.exceptions: + rowTable( + "exception", + checkbox("", rule.enabled, "balance_enable('%s', '%s')" % (rule.id(), str(not rule.enabled).lower())), + rule, + rule.str_limit() + + """""" % (rule.id(), str(rule.limit)) if hasattr(rule, "limit") else " " + ) + + endTable() + + output("") + + elif handler.path.startswith("/html/job"): + handler.send_head(content = "text/html") + job_id = handler.path[9:] + + head("NetRender") + + job = handler.server.getJobID(job_id) + + if job: + output("

Render Information

") + + job.initInfo() + + startTable() + + rowTable("resolution", "%ix%i at %i%%" % job.resolution) + + endTable() + + + if job.type == netrender.model.JOB_BLENDER: + output("

Files

") + + startTable() + headerTable("path") + + tot_cache = 0 + tot_fluid = 0 + + rowTable(job.files[0].filepath) + rowTable("Other Files", class_style = "toggle", extra = "onclick='toggleDisplay(".other", "none", "table-row")'") + + for file in job.files: + if file.filepath.endswith(".bphys"): + tot_cache += 1 + elif file.filepath.endswith(".bobj.gz") or file.filepath.endswith(".bvel.gz"): + tot_fluid += 1 + else: + if file != job.files[0]: + rowTable(file.filepath, class_style = "other") + + if tot_cache > 0: + rowTable("%i physic cache files" % tot_cache, class_style = "toggle", extra = "onclick='toggleDisplay(".cache", "none", "table-row")'") + for file in job.files: + if file.filepath.endswith(".bphys"): + rowTable(os.path.split(file.filepath)[1], class_style = "cache") + + if tot_fluid > 0: + rowTable("%i fluid bake files" % tot_fluid, class_style = "toggle", extra = "onclick='toggleDisplay(".fluid", "none", "table-row")'") + for file in job.files: + if file.filepath.endswith(".bobj.gz") or file.filepath.endswith(".bvel.gz"): + rowTable(os.path.split(file.filepath)[1], class_style = "fluid") + + endTable() + elif job.type == netrender.model.JOB_VCS: + output("

Versioning

") + + startTable() + + rowTable("System", job.version_info.system.name) + rowTable("Remote Path", job.version_info.rpath) + rowTable("Working Path", job.version_info.wpath) + rowTable("Revision", job.version_info.revision) + rowTable("Render File", job.files[0].filepath) + + endTable() + + if job.blacklist: + output("

Blacklist

") + + startTable() + headerTable("name", "address") + + for slave_id in job.blacklist: + slave = handler.server.slaves_map[slave_id] + rowTable(slave.name, slave.address[0]) + + endTable() + + output("

Frames

") + + startTable() + headerTable("no", "status", "render time", "slave", "log", "result", "") + + for frame in job.frames: + rowTable( + frame.number, + frame.statusText(), + "%.1fs" % frame.time, + frame.slave.name if frame.slave else " ", + link("view log", logURL(job_id, frame.number)) if frame.log_path else " ", + link("view result", renderURL(job_id, frame.number)) + " [" + + tag("span", "show", attr="class='thumb' onclick='showThumb(%s, %i)'" % (job.id, frame.number)) + "]" if frame.status == DONE else " ", + "" % (frame.number, job.id, frame.number) + ) + + endTable() + else: + output("no such job") + + output("") + diff --git a/netrender/model.py b/netrender/model.py new file mode 100644 index 00000000..5fc0bc2a --- /dev/null +++ b/netrender/model.py @@ -0,0 +1,360 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +import netrender.versioning as versioning +from netrender.utils import * + +class LogFile: + def __init__(self, job_id = 0, slave_id = 0, frames = []): + self.job_id = job_id + self.slave_id = slave_id + self.frames = frames + + def serialize(self): + return { + "job_id": self.job_id, + "slave_id": self.slave_id, + "frames": self.frames + } + + @staticmethod + def materialize(data): + if not data: + return None + + logfile = LogFile() + logfile.job_id = data["job_id"] + logfile.slave_id = data["slave_id"] + logfile.frames = data["frames"] + + return logfile + +class RenderSlave: + _slave_map = {} + + def __init__(self): + self.id = "" + self.name = "" + self.address = ("",0) + self.stats = "" + self.total_done = 0 + self.total_error = 0 + self.last_seen = 0.0 + + def serialize(self): + return { + "id": self.id, + "name": self.name, + "address": self.address, + "stats": self.stats, + "total_done": self.total_done, + "total_error": self.total_error, + "last_seen": self.last_seen + } + + @staticmethod + def materialize(data, cache = True): + if not data: + return None + + slave_id = data["id"] + + if cache and slave_id in RenderSlave._slave_map: + return RenderSlave._slave_map[slave_id] + + slave = RenderSlave() + slave.id = slave_id + slave.name = data["name"] + slave.address = data["address"] + slave.stats = data["stats"] + slave.total_done = data["total_done"] + slave.total_error = data["total_error"] + slave.last_seen = data["last_seen"] + + if cache: + RenderSlave._slave_map[slave_id] = slave + + return slave + +JOB_BLENDER = 1 +JOB_PROCESS = 2 +JOB_VCS = 3 + +JOB_TYPES = { + JOB_BLENDER: "Blender", + JOB_PROCESS: "Process", + JOB_VCS: "Versioned", + } + +class VersioningInfo: + def __init__(self, info = None): + self._system = None + self.wpath = "" + self.rpath = "" + self.revision = "" + + @property + def system(self): + return self._system + + @system.setter + def system(self, value): + self._system = versioning.SYSTEMS[value] + + def update(self): + self.system.update(self) + + def serialize(self): + return { + "wpath": self.wpath, + "rpath": self.rpath, + "revision": self.revision, + "system": self.system.name + } + + @staticmethod + def generate(system, path): + vs = VersioningInfo() + vs.wpath = path + vs.system = system + + vs.rpath = vs.system.path(path) + vs.revision = vs.system.revision(path) + + return vs + + + @staticmethod + def materialize(data): + if not data: + return None + + vs = VersioningInfo() + vs.wpath = data["wpath"] + vs.rpath = data["rpath"] + vs.revision = data["revision"] + vs.system = data["system"] + + return vs + + +class RenderFile: + def __init__(self, filepath = "", index = 0, start = -1, end = -1, signature=0): + self.filepath = filepath + self.original_path = filepath + self.signature = signature + self.index = index + self.start = start + self.end = end + + def serialize(self): + return { + "filepath": self.filepath, + "original_path": self.original_path, + "index": self.index, + "start": self.start, + "end": self.end, + "signature": self.signature + } + + @staticmethod + def materialize(data): + if not data: + return None + + rfile = RenderFile(data["filepath"], data["index"], data["start"], data["end"], data["signature"]) + rfile.original_path = data["original_path"] + + return rfile + +class RenderJob: + def __init__(self, job_info = None): + self.id = "" + self.type = JOB_BLENDER + self.name = "" + self.category = "None" + self.status = JOB_WAITING + self.files = [] + self.chunks = 0 + self.priority = 0 + self.blacklist = [] + + self.version_info = None + + self.resolution = None + + self.usage = 0.0 + self.last_dispatched = 0.0 + self.frames = [] + + if job_info: + self.type = job_info.type + self.name = job_info.name + self.category = job_info.category + self.status = job_info.status + self.files = job_info.files + self.chunks = job_info.chunks + self.priority = job_info.priority + self.blacklist = job_info.blacklist + self.version_info = job_info.version_info + + def hasRenderResult(self): + return self.type in (JOB_BLENDER, JOB_VCS) + + def rendersWithBlender(self): + return self.type in (JOB_BLENDER, JOB_VCS) + + def addFile(self, file_path, start=-1, end=-1, signed=True): + if signed: + signature = hashFile(file_path) + else: + signature = None + self.files.append(RenderFile(file_path, len(self.files), start, end, signature)) + + def addFrame(self, frame_number, command = ""): + frame = RenderFrame(frame_number, command) + self.frames.append(frame) + return frame + + def __len__(self): + return len(self.frames) + + def countFrames(self, status=QUEUED): + total = 0 + for f in self.frames: + if f.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(set((frame.slave for frame in self.frames if frame.status == DISPATCHED))) + + def statusText(self): + return JOB_STATUS_TEXT[self.status] + + def framesStatus(self): + results = { + QUEUED: 0, + DISPATCHED: 0, + DONE: 0, + ERROR: 0 + } + + for frame in self.frames: + results[frame.status] += 1 + + return results + + def __contains__(self, frame_number): + for f in self.frames: + if f.number == frame_number: + return True + else: + return False + + def __getitem__(self, frame_number): + for f in self.frames: + if f.number == frame_number: + return f + else: + return None + + def serialize(self, frames = None): + min_frame = min((f.number for f in frames)) if frames else -1 + max_frame = max((f.number for f in frames)) if frames else -1 + return { + "id": self.id, + "type": self.type, + "name": self.name, + "category": self.category, + "status": self.status, + "files": [f.serialize() for f in self.files if f.start == -1 or not frames or (f.start <= max_frame and f.end >= min_frame)], + "frames": [f.serialize() for f in self.frames if not frames or f in frames], + "chunks": self.chunks, + "priority": self.priority, + "usage": self.usage, + "blacklist": self.blacklist, + "last_dispatched": self.last_dispatched, + "version_info": self.version_info.serialize() if self.version_info else None, + "resolution": self.resolution + } + + @staticmethod + def materialize(data): + if not data: + return None + + job = RenderJob() + job.id = data["id"] + job.type = data["type"] + job.name = data["name"] + job.category = data["category"] + job.status = data["status"] + job.files = [RenderFile.materialize(f) for f in data["files"]] + job.frames = [RenderFrame.materialize(f) for f in data["frames"]] + job.chunks = data["chunks"] + job.priority = data["priority"] + job.usage = data["usage"] + job.blacklist = data["blacklist"] + job.last_dispatched = data["last_dispatched"] + job.resolution = data["resolution"] + + version_info = data.get("version_info", None) + if version_info: + job.version_info = VersioningInfo.materialize(version_info) + + return job + +class RenderFrame: + def __init__(self, number = 0, command = ""): + self.number = number + self.time = 0 + self.status = QUEUED + self.slave = None + self.command = command + + def statusText(self): + return FRAME_STATUS_TEXT[self.status] + + def serialize(self): + return { + "number": self.number, + "time": self.time, + "status": self.status, + "slave": None if not self.slave else self.slave.serialize(), + "command": self.command + } + + @staticmethod + def materialize(data): + if not data: + return None + + frame = RenderFrame() + frame.number = data["number"] + frame.time = data["time"] + frame.status = data["status"] + frame.slave = RenderSlave.materialize(data["slave"]) + frame.command = data["command"] + + return frame diff --git a/netrender/netrender.css b/netrender/netrender.css new file mode 100644 index 00000000..0c54690e --- /dev/null +++ b/netrender/netrender.css @@ -0,0 +1,88 @@ +body { + background-color:#eee; + font-size:12px; + font-family: "Lucida Sans","Lucida Sans Unicode","Lucida Grande",Lucida,sans-serif; + +} +a { + /*text-decoration:none;*/ + color:#666; +} +a:hover { + color:#000; +} +h2 { + background-color:#ddd; + font-size:120%; + padding:5px; +} + +h2 { + background-color:#ddd; + font-size:110%; + padding:5px; +} + +table { + text-align:center; + border:0; + background-color:#ddd; + padding: 0px; + margin: 0px; +} +thead{ + font-size:90%; + color:#555; + background-color:#ccc; +} +td { + border:0; + padding:2px; + padding-left:10px; + padding-right:10px; + margin-left:20px; + background-color:#ddd; +} +td:hover { + background-color:#ccc; +} +tr { + border:0; +} +button { + color: #111; + width: auto; + height: auto; +} + +.toggle { + text-decoration: underline; + cursor: pointer; +} + +.cache { + display: none; +} + +.fluid { + display: none; +} + +.other { + display: none; +} + +.rules { + width: 60em; + text-align: left; +} + +img.thumb { + display: none; + cursor: pointer; +} + +span.thumb { + text-decoration: underline; + cursor: pointer; +} diff --git a/netrender/netrender.js b/netrender/netrender.js new file mode 100644 index 00000000..1024a169 --- /dev/null +++ b/netrender/netrender.js @@ -0,0 +1,146 @@ +lastFrame = -1 +maxFrame = -1 +minFrame = -1 + +function request(url, data) +{ + xmlhttp = new XMLHttpRequest(); + xmlhttp.open("POST", url, false); + xmlhttp.send(data); + window.location.reload() +} + +function edit(id, info) +{ + request("/edit_" + id, info) +} + +function clear_jobs() +{ + var r=confirm("Also delete files on master?"); + + if (r==true) { + request('/clear', '{"clear":true}'); + } else { + request('/clear', '{"clear":false}'); + } +} + +function cancel_job(id) +{ + var r=confirm("Also delete files on master?"); + + if (r==true) { + request('/cancel_' + id, '{"clear":true}'); + } else { + request('/cancel_' + id, '{"clear":false}'); + } +} + +function balance_edit(id, old_value) +{ + var new_value = prompt("New limit", old_value); + if (new_value != null && new_value != "") { + request("/balance_limit", '{"' + id + '":"' + new_value + '"}'); + } +} + +function balance_enable(id, value) +{ + request("/balance_enable", '{"' + id + '":' + value + "}"); +} + +function showThumb(job, frame) +{ + if (lastFrame != -1) { + if (maxFrame != -1 && minFrame != -1) { + if (frame >= minFrame && frame <= maxFrame) { + for(i = minFrame; i <= maxFrame; i=i+1) { + toggleThumb(job, i); + } + minFrame = -1; + maxFrame = -1; + lastFrame = -1; + } else if (frame > maxFrame) { + for(i = maxFrame+1; i <= frame; i=i+1) { + toggleThumb(job, i); + } + maxFrame = frame; + lastFrame = frame; + } else { + for(i = frame; i <= minFrame-1; i=i+1) { + toggleThumb(job, i); + } + minFrame = frame; + lastFrame = frame; + } + } else if (frame == lastFrame) { + toggleThumb(job, frame); + } else if (frame < lastFrame) { + minFrame = frame; + maxFrame = lastFrame; + + for(i = minFrame; i <= maxFrame-1; i=i+1) { + toggleThumb(job, i); + } + lastFrame = frame; + } else { + minFrame = lastFrame; + maxFrame = frame; + + for(i = minFrame+1; i <= maxFrame; i=i+1) { + toggleThumb(job, i); + } + lastFrame = frame; + } + } else { + toggleThumb(job, frame); + } +} + +function toggleThumb(job, frame) +{ + img = document.images["thumb" + frame]; + url = "/thumb_" + job + "_" + frame + ".jpg" + + if (img.style.display == "block") { + img.style.display = "none"; + img.src = ""; + lastFrame = -1; + } else { + img.src = url; + img.style.display = "block"; + lastFrame = frame; + } +} + +function returnObjById( id ) +{ + if (document.getElementById) + var returnVar = document.getElementById(id); + else if (document.all) + var returnVar = document.all[id]; + else if (document.layers) + var returnVar = document.layers[id]; + return returnVar; +} + +function toggleDisplay( className, value1, value2 ) +{ + style = getStyle(className) + + if (style.style["display"] == value1) { + style.style["display"] = value2; + } else { + style.style["display"] = value1; + } +} + +function getStyle(className) { + var classes = document.styleSheets[0].rules || document.styleSheets[0].cssRules + for(var x=0;x 0): + netsettings.jobs.remove(0) + + netrender.jobs = [] + + for j in jobs: + netrender.jobs.append(j) + netsettings.jobs.add() + job = netsettings.jobs[-1] + + j.results = j.framesStatus() # cache frame status + + job.name = j.name + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class RENDER_OT_netclientblacklistslave(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + bl_idname = "render.netclientblacklistslave" + bl_label = "Client Blacklist Slave" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + if netsettings.active_slave_index >= 0: + + # deal with data + slave = netrender.slaves.pop(netsettings.active_slave_index) + netrender.blacklist.append(slave) + + # deal with rna + netsettings.slaves_blacklist.add() + netsettings.slaves_blacklist[-1].name = slave.name + + netsettings.slaves.remove(netsettings.active_slave_index) + netsettings.active_slave_index = -1 + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class RENDER_OT_netclientwhitelistslave(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + bl_idname = "render.netclientwhitelistslave" + bl_label = "Client Whitelist Slave" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + if netsettings.active_blacklisted_slave_index >= 0: + + # deal with data + slave = netrender.blacklist.pop(netsettings.active_blacklisted_slave_index) + netrender.slaves.append(slave) + + # deal with rna + netsettings.slaves.add() + netsettings.slaves[-1].name = slave.name + + netsettings.slaves_blacklist.remove(netsettings.active_blacklisted_slave_index) + netsettings.active_blacklisted_slave_index = -1 + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + + +class RENDER_OT_netclientslaves(bpy.types.Operator): + '''Refresh status about available Render slaves''' + bl_idname = "render.netclientslaves" + bl_label = "Client Slaves" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(netsettings.server_address, netsettings.server_port, self.report) + + if conn: + conn.request("GET", "/slaves") + + response = conn.getresponse() + content = response.read() + print( response.status, response.reason ) + + slaves = (netrender.model.RenderSlave.materialize(s) for s in json.loads(str(content, encoding='utf8'))) + + while(len(netsettings.slaves) > 0): + netsettings.slaves.remove(0) + + netrender.slaves = [] + + for s in slaves: + for i in range(len(netrender.blacklist)): + slave = netrender.blacklist[i] + if slave.id == s.id: + netrender.blacklist[i] = s + netsettings.slaves_blacklist[i].name = s.name + break + else: + netrender.slaves.append(s) + + netsettings.slaves.add() + slave = netsettings.slaves[-1] + slave.name = s.name + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class RENDER_OT_netclientcancel(bpy.types.Operator): + '''Cancel the selected network rendering job.''' + bl_idname = "render.netclientcancel" + bl_label = "Client Cancel" + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return netsettings.active_job_index >= 0 and len(netsettings.jobs) > 0 + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(netsettings.server_address, netsettings.server_port, self.report) + + if conn: + job = netrender.jobs[netsettings.active_job_index] + + conn.request("POST", cancelURL(job.id), json.dumps({'clear':False})) + + response = conn.getresponse() + response.read() + print( response.status, response.reason ) + + netsettings.jobs.remove(netsettings.active_job_index) + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class RENDER_OT_netclientcancelall(bpy.types.Operator): + '''Cancel all running network rendering jobs.''' + bl_idname = "render.netclientcancelall" + bl_label = "Client Cancel All" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(netsettings.server_address, netsettings.server_port, self.report) + + if conn: + conn.request("POST", "/clear", json.dumps({'clear':False})) + + response = conn.getresponse() + response.read() + print( response.status, response.reason ) + + while(len(netsettings.jobs) > 0): + netsettings.jobs.remove(0) + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class netclientdownload(bpy.types.Operator): + '''Download render results from the network''' + bl_idname = "render.netclientdownload" + bl_label = "Client Download" + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return netsettings.active_job_index >= 0 and len(netsettings.jobs) > netsettings.active_job_index + + def execute(self, context): + netsettings = context.scene.network_render + rd = context.scene.render + + conn = clientConnection(netsettings.server_address, netsettings.server_port, self.report) + + if conn: + job_id = netrender.jobs[netsettings.active_job_index].id + + conn.request("GET", "/status", headers={"job-id":job_id}) + + response = conn.getresponse() + + if response.status != http.client.OK: + self.report('ERROR', "Job ID %i not defined on master" % job_id) + return {'ERROR'} + + content = response.read() + + job = netrender.model.RenderJob.materialize(json.loads(str(content, encoding='utf8'))) + + conn.close() + + finished_frames = [] + + nb_error = 0 + nb_missing = 0 + + for frame in job.frames: + if frame.status == DONE: + finished_frames.append(frame.number) + elif frame.status == ERROR: + nb_error += 1 + else: + nb_missing += 1 + + if not finished_frames: + return + + frame_ranges = [] + + first = None + last = None + + for i in range(len(finished_frames)): + current = finished_frames[i] + + if not first: + first = current + last = current + elif last + 1 == current: + last = current + + if last + 1 < current or i + 1 == len(finished_frames): + if first < last: + frame_ranges.append((first, last)) + else: + frame_ranges.append((first,)) + + first = current + last = current + + getResults(netsettings.server_address, netsettings.server_port, job_id, job.resolution[0], job.resolution[1], job.resolution[2], frame_ranges) + + if nb_error and nb_missing: + self.report('ERROR', "Results downloaded but skipped %i frames with errors and %i unfinished frames" % (nb_error, nb_missing)) + elif nb_error: + self.report('ERROR', "Results downloaded but skipped %i frames with errors" % nb_error) + elif nb_missing: + self.report('WARNING', "Results downloaded but skipped %i unfinished frames" % nb_missing) + else: + self.report('INFO', "All results downloaded") + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class netclientscan(bpy.types.Operator): + '''Listen on network for master server broadcasting its address and port.''' + bl_idname = "render.netclientscan" + bl_label = "Client Scan" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + address, port = clientScan(self.report) + + if address: + scene = context.scene + netsettings = scene.network_render + netsettings.server_address = address + netsettings.server_port = port + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + +class netclientvcsguess(bpy.types.Operator): + '''Guess VCS setting for the current file''' + bl_idname = "render.netclientvcsguess" + bl_label = "VCS Guess" + + @classmethod + def poll(cls, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + system = versioning.SYSTEMS.get(netsettings.vcs_system, None) + + if system: + wpath, name = os.path.split(os.path.abspath(bpy.data.filepath)) + + rpath = system.path(wpath) + revision = system.revision(wpath) + + netsettings.vcs_wpath = wpath + netsettings.vcs_rpath = rpath + netsettings.vcs_revision = revision + + + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) + + +class netclientweb(bpy.types.Operator): + '''Open new window with information about running rendering jobs''' + bl_idname = "render.netclientweb" + bl_label = "Open Master Monitor" + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return netsettings.server_address != "[default]" + + def execute(self, context): + netsettings = context.scene.network_render + + + # open connection to make sure server exists + conn = clientConnection(netsettings.server_address, netsettings.server_port, self.report) + + if conn: + conn.close() + + webbrowser.open("http://%s:%i" % (netsettings.server_address, netsettings.server_port)) + + return {'FINISHED'} + + def invoke(self, context, event): + return self.execute(context) diff --git a/netrender/repath.py b/netrender/repath.py new file mode 100644 index 00000000..3ac9636b --- /dev/null +++ b/netrender/repath.py @@ -0,0 +1,150 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import subprocess + +import bpy + +from netrender.utils import * +import netrender.model + +BLENDER_PATH = sys.argv[0] + +def reset(job): + main_file = job.files[0] + + job_full_path = main_file.filepath + + if os.path.exists(job_full_path + ".bak"): + os.remove(job_full_path) # repathed file + os.renames(job_full_path + ".bak", job_full_path) + +def update(job): + paths = [] + + main_file = job.files[0] + + job_full_path = main_file.filepath + + + path, ext = os.path.splitext(job_full_path) + + new_path = path + ".remap" + ext + + # Disable for now. Partial repath should work anyway + #all = main_file.filepath != main_file.original_path + all = False + + for rfile in job.files[1:]: + if all or rfile.original_path != rfile.filepath: + paths.append(rfile.original_path) + paths.append(rfile.filepath) + + # Only update if needed + if paths: + process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-P", __file__, "--", new_path] + paths, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + process.wait() + + os.renames(job_full_path, job_full_path + ".bak") + os.renames(new_path, job_full_path) + +def process(paths): + def processPointCache(point_cache): + point_cache.use_external = False + + def processFluid(fluid): + new_path = path_map.get(fluid.filepath, None) + if new_path: + fluid.path = new_path + + path_map = {} + for i in range(0, len(paths), 2): + # special case for point cache + if paths[i].endswith(".bphys"): + pass # Don't need them in the map, they all use the default external path + # NOTE: This is probably not correct all the time, need to be fixed. + # special case for fluids + elif paths[i].endswith(".bobj.gz"): + path_map[os.path.split(paths[i])[0]] = os.path.split(paths[i+1])[0] + else: + path_map[os.path.split(paths[i])[1]] = paths[i+1] + + # TODO original paths aren't really the orignal path (they are the normalized path + # so we repath using the filenames only. + + ########################### + # LIBRARIES + ########################### + for lib in bpy.data.libraries: + file_path = bpy.path.abspath(lib.filepath) + new_path = path_map.get(os.path.split(file_path)[1], None) + if new_path: + lib.filepath = new_path + + ########################### + # IMAGES + ########################### + for image in bpy.data.images: + if image.source == "FILE" and not image.packed_file: + file_path = bpy.path.abspath(image.filepath) + new_path = path_map.get(os.path.split(file_path)[1], None) + if new_path: + image.filepath = new_path + + + ########################### + # FLUID + POINT CACHE + ########################### + for object in bpy.data.objects: + for modifier in object.modifiers: + if modifier.type == 'FLUID_SIMULATION' and modifier.settings.type == "DOMAIN": + processFluid(settings) + elif modifier.type == "CLOTH": + processPointCache(modifier.point_cache) + elif modifier.type == "SOFT_BODY": + processPointCache(modifier.point_cache) + elif modifier.type == "SMOKE" and modifier.smoke_type == "TYPE_DOMAIN": + processPointCache(modifier.domain_settings.point_cache_low) + if modifier.domain_settings.use_high_resolution: + processPointCache(modifier.domain_settings.point_cache_high) + elif modifier.type == "MULTIRES" and modifier.is_external: + file_path = bpy.path.abspath(modifier.filepath) + new_path = path_map.get(file_path, None) + if new_path: + modifier.filepath = new_path + + # particles modifier are stupid and don't contain data + # we have to go through the object property + for psys in object.particle_systems: + processPointCache(psys.point_cache) + + +if __name__ == "__main__": + try: + i = sys.argv.index("--") + except: + i = 0 + + if i: + new_path = sys.argv[i+1] + args = sys.argv[i+2:] + + process(args) + + bpy.ops.wm.save_as_mainfile(filepath=new_path, check_existing=False) diff --git a/netrender/slave.py b/netrender/slave.py new file mode 100644 index 00000000..b05de0af --- /dev/null +++ b/netrender/slave.py @@ -0,0 +1,349 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os, platform, shutil +import http, http.client, http.server, urllib +import subprocess, time +import json + +import bpy + +from netrender.utils import * +import netrender.model +import netrender.repath +import netrender.thumbnail as thumbnail + +BLENDER_PATH = sys.argv[0] + +CANCEL_POLL_SPEED = 2 +MAX_TIMEOUT = 10 +INCREMENT_TIMEOUT = 1 +MAX_CONNECT_TRY = 10 +try: + system = platform.system() +except UnicodeDecodeError: + import sys + system = sys.platform + +if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5 + import ctypes + def SetErrorMode(): + val = ctypes.windll.kernel32.SetErrorMode(0x0002) + ctypes.windll.kernel32.SetErrorMode(val | 0x0002) + return val + + def RestoreErrorMode(val): + ctypes.windll.kernel32.SetErrorMode(val) +else: + def SetErrorMode(): + return 0 + + def RestoreErrorMode(val): + pass + +def clearSlave(path): + shutil.rmtree(path) + +def slave_Info(): + sysname, nodename, release, version, machine, processor = platform.uname() + slave = netrender.model.RenderSlave() + slave.name = nodename + slave.stats = sysname + " " + release + " " + machine + " " + processor + return slave + +def testCancel(conn, job_id, frame_number): + conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)}) + + # canceled if job isn't found anymore + if responseStatus(conn) == http.client.NO_CONTENT: + return True + else: + return False + +def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None): + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path) + + found = os.path.exists(job_full_path) + + if found and rfile.signature != None: + found_signature = hashFile(job_full_path) + found = found_signature == rfile.signature + + if not found: + print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) + + if not found: + # Force prefix path if not found + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) + temp_path = os.path.join(JOB_PREFIX, "slave.temp") + conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id}) + response = conn.getresponse() + + if response.status != http.client.OK: + return None # file for job not returned by server, need to return an error code to server + + f = open(temp_path, "wb") + buf = response.read(1024) + + while buf: + f.write(buf) + buf = response.read(1024) + + f.close() + + os.renames(temp_path, job_full_path) + + rfile.filepath = job_full_path + + return job_full_path + +def breakable_timeout(timeout): + for i in range(timeout): + time.sleep(1) + if engine.test_break(): + break + +def render_slave(engine, netsettings, threads): + timeout = 1 + + bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break) + + engine.update_stats("", "Network render node initiation") + + conn = clientConnection(netsettings.server_address, netsettings.server_port) + + if not conn: + timeout = 1 + print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY) + bisleep.reset() + + for i in range(MAX_CONNECT_TRY): + bisleep.sleep() + + conn = clientConnection(netsettings.server_address, netsettings.server_port) + + if conn or engine.test_break(): + break + + print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current)) + + if conn: + conn.request("POST", "/slave", json.dumps(slave_Info().serialize())) + response = conn.getresponse() + response.read() + + slave_id = response.getheader("slave-id") + + NODE_PREFIX = os.path.join(bpy.path.abspath(netsettings.path), "slave_" + slave_id) + if not os.path.exists(NODE_PREFIX): + os.mkdir(NODE_PREFIX) + + engine.update_stats("", "Network render connected to master, waiting for jobs") + + while not engine.test_break(): + conn.request("GET", "/job", headers={"slave-id":slave_id}) + response = conn.getresponse() + + if response.status == http.client.OK: + bisleep.reset() + + job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8'))) + engine.update_stats("", "Network render processing job from master") + + JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id) + if not os.path.exists(JOB_PREFIX): + os.mkdir(JOB_PREFIX) + + # set tempdir for fsaa temp files + # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting + os.environ["TMP"] = JOB_PREFIX + + + if job.type == netrender.model.JOB_BLENDER: + job_path = job.files[0].filepath # path of main file + main_path, main_file = os.path.split(job_path) + + job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX) + print("Fullpath", job_full_path) + print("File:", main_file, "and %i other files" % (len(job.files) - 1,)) + + for rfile in job.files[1:]: + testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path) + print("\t", rfile.filepath) + + netrender.repath.update(job) + + engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) + elif job.type == netrender.model.JOB_VCS: + if not job.version_info: + # Need to return an error to server, incorrect job type + pass + + job_path = job.files[0].filepath # path of main file + main_path, main_file = os.path.split(job_path) + + job.version_info.update() + + # For VCS jobs, file path is relative to the working copy path + job_full_path = os.path.join(job.version_info.wpath, job_path) + + engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) + + # announce log to master + logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames]) + conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8')) + response = conn.getresponse() + response.read() + + + first_frame = job.frames[0].number + + # start render + start_t = time.time() + + if job.rendersWithBlender(): + frame_args = [] + + for frame in job.frames: + print("frame", frame.number) + frame_args += ["-f", str(frame.number)] + + val = SetErrorMode() + process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + elif job.type == netrender.model.JOB_PROCESS: + command = job.frames[0].command + val = SetErrorMode() + process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + + headers = {"slave-id":slave_id} + + cancelled = False + stdout = bytes() + run_t = time.time() + while not cancelled and process.poll() is None: + stdout += process.stdout.read(1024) + current_t = time.time() + cancelled = engine.test_break() + if current_t - run_t > CANCEL_POLL_SPEED: + + # update logs if needed + if stdout: + # (only need to update on one frame, they are linked + conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + response = conn.getresponse() + response.read() + + # Also output on console + if netsettings.use_slave_output_log: + print(str(stdout, encoding='utf8'), end="") + + stdout = bytes() + + run_t = current_t + if testCancel(conn, job.id, first_frame): + cancelled = True + + if job.type == netrender.model.JOB_BLENDER: + netrender.repath.reset(job) + + # read leftovers if needed + stdout += process.stdout.read() + + if cancelled: + # kill process if needed + if process.poll() is None: + try: + process.terminate() + except OSError: + pass + continue # to next frame + + # flush the rest of the logs + if stdout: + # Also output on console + if netsettings.use_slave_thumb: + print(str(stdout, encoding='utf8'), end="") + + # (only need to update on one frame, they are linked + conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + total_t = time.time() - start_t + + avg_t = total_t / len(job.frames) + + status = process.returncode + + print("status", status) + + headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)} + + + if status == 0: # non zero status is error + headers["job-result"] = str(DONE) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + if job.hasRenderResult(): + # send image back to server + + filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number) + + # thumbnail first + if netsettings.use_slave_thumb: + thumbname = thumbnail.generate(filename) + + if thumbname: + f = open(thumbname, 'rb') + conn.request("PUT", "/thumb", f, headers=headers) + f.close() + responseStatus(conn) + + f = open(filename, 'rb') + conn.request("PUT", "/render", f, headers=headers) + f.close() + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + elif job.type == netrender.model.JOB_PROCESS: + conn.request("PUT", "/render", headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + else: + headers["job-result"] = str(ERROR) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + # send error result back to server + conn.request("PUT", "/render", headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + engine.update_stats("", "Network render connected to master, waiting for jobs") + else: + bisleep.sleep() + + conn.close() + + if netsettings.use_slave_clear: + clearSlave(NODE_PREFIX) + +if __name__ == "__main__": + pass diff --git a/netrender/thumbnail.py b/netrender/thumbnail.py new file mode 100644 index 00000000..2ead6e82 --- /dev/null +++ b/netrender/thumbnail.py @@ -0,0 +1,81 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import subprocess + +import bpy + +def generate(filename, external=True): + if external: + process = subprocess.Popen([sys.argv[0], "-b", "-noaudio", "-P", __file__, "--", filename], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + while process.poll() is None: + process.stdout.read(1024) # empty buffer to be sure + process.stdout.read() + + return _thumbname(filename) + else: + return _internal(filename) + +def _thumbname(filename): + root = os.path.splitext(filename)[0] + return root + ".jpg" + +def _internal(filename): + imagename = os.path.split(filename)[1] + thumbname = _thumbname(filename) + + if os.path.exists(thumbname): + return thumbname + + if bpy: + scene = bpy.data.scenes[0] # FIXME, this is dodgy! + scene.render.file_format = "JPEG" + scene.render.file_quality = 90 + + # remove existing image, if there's a leftover (otherwise open changes the name) + if imagename in bpy.data.images: + img = bpy.data.images[imagename] + bpy.data.images.remove(img) + + bpy.ops.image.open(filepath=filename) + img = bpy.data.images[imagename] + + img.save_render(thumbname, scene=scene) + + img.user_clear() + bpy.data.images.remove(img) + + try: + process = subprocess.Popen(["convert", thumbname, "-resize", "300x300", thumbname]) + process.wait() + return thumbname + except Exception as exp: + print("Error while generating thumbnail") + print(exp) + + return None + +if __name__ == "__main__": + import bpy + try: + start = sys.argv.index("--") + 1 + except ValueError: + start = 0 + for filename in sys.argv[start:]: + generate(filename, external=False) diff --git a/netrender/ui.py b/netrender/ui.py new file mode 100644 index 00000000..f6eb94f5 --- /dev/null +++ b/netrender/ui.py @@ -0,0 +1,547 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import bpy +import sys, os +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +import netrender +import netrender.slave as slave +import netrender.master as master + +from netrender.utils import * + +VERSION = b"0.3" + +PATH_PREFIX = "/tmp/" + +QUEUED = 0 +DISPATCHED = 1 +DONE = 2 +ERROR = 3 + +LAST_ADDRESS_TEST = 0 + +def base_poll(cls, context): + rd = context.scene.render + return (rd.use_game_engine==False) and (rd.engine in cls.COMPAT_ENGINES) + + +def init_file(): + if netrender.init_file != bpy.data.filepath: + netrender.init_file = bpy.data.filepath + netrender.init_data = True + netrender.valid_address = False + +def init_data(netsettings): + init_file() + + if netrender.init_data: + netrender.init_data = False + + netsettings.active_slave_index = 0 + while(len(netsettings.slaves) > 0): + netsettings.slaves.remove(0) + + netsettings.active_blacklisted_slave_index = 0 + while(len(netsettings.slaves_blacklist) > 0): + netsettings.slaves_blacklist.remove(0) + + netsettings.active_job_index = 0 + while(len(netsettings.jobs) > 0): + netsettings.jobs.remove(0) + +def verify_address(netsettings): + global LAST_ADDRESS_TEST + init_file() + + if LAST_ADDRESS_TEST + 30 < time.time(): + LAST_ADDRESS_TEST = time.time() + + try: + conn = clientConnection(netsettings.server_address, netsettings.server_port, scan = False, timeout = 1) + except: + conn = None + + if conn: + netrender.valid_address = True + conn.close() + else: + netrender.valid_address = False + + return netrender.valid_address + +class NeedValidAddress(): + @classmethod + def poll(cls, context): + return super().poll(context) and verify_address(context.scene.network_render) + +class NetRenderButtonsPanel(): + bl_space_type = "PROPERTIES" + bl_region_type = "WINDOW" + bl_context = "render" + # COMPAT_ENGINES must be defined in each subclass, external engines can add themselves here + + @classmethod + def poll(cls, context): + rd = context.scene.render + return rd.engine == 'NET_RENDER' and rd.use_game_engine == False + +# Setting panel, use in the scene for now. +class RENDER_PT_network_settings(NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Network Settings" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + return super().poll(context) + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + verify_address(netsettings) + + layout.prop(netsettings, "mode", expand=True) + + if netsettings.mode in ("RENDER_MASTER", "RENDER_SLAVE"): + layout.operator("render.netclientstart", icon='PLAY') + + layout.prop(netsettings, "path") + + split = layout.split(percentage=0.7) + + col = split.column() + col.label(text="Server Address:") + col.prop(netsettings, "server_address", text="") + + col = split.column() + col.label(text="Port:") + col.prop(netsettings, "server_port", text="") + + if netsettings.mode != "RENDER_MASTER": + layout.operator("render.netclientscan", icon='FILE_REFRESH', text="") + + if not netrender.valid_address: + layout.label(text="No master at specified address") + + layout.operator("render.netclientweb", icon='QUESTION') + +class RENDER_PT_network_slave_settings(NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Slave Settings" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_SLAVE" + + def draw(self, context): + layout = self.layout + + rd = context.scene.render + netsettings = context.scene.network_render + + layout.prop(netsettings, "use_slave_clear") + layout.prop(netsettings, "use_slave_thumb") + layout.prop(netsettings, "use_slave_output_log") + layout.label(text="Threads:") + layout.prop(rd, "threads_mode", expand=True) + + col = layout.column() + col.enabled = rd.threads_mode == 'FIXED' + col.prop(rd, "threads") + +class RENDER_PT_network_master_settings(NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Master Settings" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_MASTER" + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + layout.prop(netsettings, "use_master_broadcast") + layout.prop(netsettings, "use_master_clear") + +class RENDER_PT_network_job(NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Job Settings" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + verify_address(netsettings) + + if netsettings.server_address != "[default]": + layout.operator("render.netclientanim", icon='RENDER_ANIMATION') + layout.operator("render.netclientsend", icon='FILE_BLEND') + layout.operator("render.netclientsendframe", icon='RENDER_STILL') + if netsettings.job_id: + row = layout.row() + row.operator("render.render", text="Get Image", icon='RENDER_STILL') + row.operator("render.render", text="Get Animation", icon='RENDER_ANIMATION').animation = True + + split = layout.split(percentage=0.3) + + col = split.column() + col.label(text="Type:") + col.label(text="Name:") + col.label(text="Category:") + + col = split.column() + col.prop(netsettings, "job_type", text="") + col.prop(netsettings, "job_name", text="") + col.prop(netsettings, "job_category", text="") + + row = layout.row() + row.prop(netsettings, "priority") + row.prop(netsettings, "chunks") + +class RENDER_PT_network_job_vcs(NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "VCS Job Settings" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + scene = context.scene + return (super().poll(context) + and scene.network_render.mode == "RENDER_CLIENT" + and scene.network_render.job_type == "JOB_VCS") + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + layout.operator("render.netclientvcsguess", icon='FILE_REFRESH', text="") + + layout.prop(netsettings, "vcs_system") + layout.prop(netsettings, "vcs_revision") + layout.prop(netsettings, "vcs_rpath") + layout.prop(netsettings, "vcs_wpath") + +class RENDER_PT_network_slaves(NeedValidAddress, NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Slaves Status" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return super().poll(context) and netsettings.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + row = layout.row() + row.template_list(netsettings, "slaves", netsettings, "active_slave_index", rows=2) + + sub = row.column(align=True) + sub.operator("render.netclientslaves", icon='FILE_REFRESH', text="") + sub.operator("render.netclientblacklistslave", icon='ZOOMOUT', text="") + + if len(netrender.slaves) > netsettings.active_slave_index >= 0: + layout.separator() + + slave = netrender.slaves[netsettings.active_slave_index] + + layout.label(text="Name: " + slave.name) + layout.label(text="Address: " + slave.address[0]) + layout.label(text="Seen: " + time.ctime(slave.last_seen)) + layout.label(text="Stats: " + slave.stats) + +class RENDER_PT_network_slaves_blacklist(NeedValidAddress, NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Slaves Blacklist" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return super().poll(context) and netsettings.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + row = layout.row() + row.template_list(netsettings, "slaves_blacklist", netsettings, "active_blacklisted_slave_index", rows=2) + + sub = row.column(align=True) + sub.operator("render.netclientwhitelistslave", icon='ZOOMOUT', text="") + + if len(netrender.blacklist) > netsettings.active_blacklisted_slave_index >= 0: + layout.separator() + + slave = netrender.blacklist[netsettings.active_blacklisted_slave_index] + + layout.label(text="Name: " + slave.name) + layout.label(text="Address: " + slave.address[0]) + layout.label(text="Seen: " + time.ctime(slave.last_seen)) + layout.label(text="Stats: " + slave.stats) + +class RENDER_PT_network_jobs(NeedValidAddress, NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Jobs" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return super().poll(context) and netsettings.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + netsettings = context.scene.network_render + + row = layout.row() + row.template_list(netsettings, "jobs", netsettings, "active_job_index", rows=2) + + sub = row.column(align=True) + sub.operator("render.netclientstatus", icon='FILE_REFRESH', text="") + sub.operator("render.netclientcancel", icon='ZOOMOUT', text="") + sub.operator("render.netclientcancelall", icon='PANEL_CLOSE', text="") + sub.operator("render.netclientdownload", icon='RENDER_ANIMATION', text="") + + if len(netrender.jobs) > netsettings.active_job_index >= 0: + layout.separator() + + job = netrender.jobs[netsettings.active_job_index] + + layout.label(text="Name: %s" % job.name) + layout.label(text="Length: %04i" % len(job)) + layout.label(text="Done: %04i" % job.results[DONE]) + layout.label(text="Error: %04i" % job.results[ERROR]) + +import bl_ui.properties_render as properties_render +class RENDER_PT_network_output(NeedValidAddress, NetRenderButtonsPanel, bpy.types.Panel): + bl_label = "Output" + COMPAT_ENGINES = {'NET_RENDER'} + + @classmethod + def poll(cls, context): + netsettings = context.scene.network_render + return super().poll(context) and netsettings.mode == "RENDER_CLIENT" + + draw = properties_render.RENDER_PT_output.draw + + +class NetRenderSlave(bpy.types.PropertyGroup): + @classmethod + def register(NetRenderSlave): + from bpy.props import PointerProperty, StringProperty, BoolProperty, EnumProperty, IntProperty, CollectionProperty + + NetRenderSlave.name = StringProperty( + name="Name of the slave", + description="", + maxlen = 64, + default = "") + +class NetRenderJob(bpy.types.PropertyGroup): + @classmethod + def register(NetRenderJob): + from bpy.props import PointerProperty, StringProperty, BoolProperty, EnumProperty, IntProperty, CollectionProperty + + NetRenderJob.name = StringProperty( + name="Name of the job", + description="", + maxlen = 128, + default = "") + +class NetRenderSettings(bpy.types.PropertyGroup): + @classmethod + def register(NetRenderSettings): + from bpy.props import PointerProperty, StringProperty, BoolProperty, EnumProperty, IntProperty, CollectionProperty + + NetRenderSettings.server_address = StringProperty( + name="Server address", + description="IP or name of the master render server", + maxlen = 128, + default = "[default]") + + NetRenderSettings.server_port = IntProperty( + name="Server port", + description="port of the master render server", + default = 8000, + min=1, + max=65535) + + NetRenderSettings.use_master_broadcast = BoolProperty( + name="Broadcast", + description="broadcast master server address on local network", + default = True) + + NetRenderSettings.use_slave_clear = BoolProperty( + name="Clear on exit", + description="delete downloaded files on exit", + default = True) + + NetRenderSettings.use_slave_thumb = BoolProperty( + name="Generate thumbnails", + description="Generate thumbnails on slaves instead of master", + default = False) + + NetRenderSettings.use_slave_output_log = BoolProperty( + name="Output render log on console", + description="Output render text log to console as well as sending it to the master", + default = True) + + NetRenderSettings.use_master_clear = BoolProperty( + name="Clear on exit", + description="delete saved files on exit", + default = False) + + default_path = os.environ.get("TEMP") + + if not default_path: + if os.name == 'nt': + default_path = "c:/tmp/" + else: + default_path = "/tmp/" + elif not default_path.endswith(os.sep): + default_path += os.sep + + NetRenderSettings.path = StringProperty( + name="Path", + description="Path for temporary files", + maxlen = 128, + default = default_path, + subtype='FILE_PATH') + + NetRenderSettings.job_type = EnumProperty( + items=( + ("JOB_BLENDER", "Blender", "Standard Blender Job"), + ("JOB_PROCESS", "Process", "Custom Process Job"), + ("JOB_VCS", "VCS", "Version Control System Managed Job"), + ), + name="Job Type", + description="Type of render job", + default="JOB_BLENDER") + + NetRenderSettings.job_name = StringProperty( + name="Job name", + description="Name of the job", + maxlen = 128, + default = "[default]") + + NetRenderSettings.job_category = StringProperty( + name="Job category", + description="Category of the job", + maxlen = 128, + default = "") + + NetRenderSettings.chunks = IntProperty( + name="Chunks", + description="Number of frame to dispatch to each slave in one chunk", + default = 5, + min=1, + max=65535) + + NetRenderSettings.priority = IntProperty( + name="Priority", + description="Priority of the job", + default = 1, + min=1, + max=10) + + NetRenderSettings.vcs_wpath = StringProperty( + name="Working Copy", + description="Path of the local working copy", + maxlen = 1024, + default = "") + + NetRenderSettings.vcs_rpath = StringProperty( + name="Remote Path", + description="Path of the server copy (protocol specific)", + maxlen = 1024, + default = "") + + NetRenderSettings.vcs_revision = StringProperty( + name="Revision", + description="Revision for this job", + maxlen = 256, + default = "") + + NetRenderSettings.vcs_system = StringProperty( + name="VCS", + description="Version Control System", + maxlen = 64, + default = "Subversion") + + NetRenderSettings.job_id = StringProperty( + name="Network job id", + description="id of the last sent render job", + maxlen = 64, + default = "") + + NetRenderSettings.active_slave_index = IntProperty( + name="Index of the active slave", + description="", + default = -1, + min= -1, + max=65535) + + NetRenderSettings.active_blacklisted_slave_index = IntProperty( + name="Index of the active slave", + description="", + default = -1, + min= -1, + max=65535) + + NetRenderSettings.active_job_index = IntProperty( + name="Index of the active job", + description="", + default = -1, + min= -1, + max=65535) + + NetRenderSettings.mode = EnumProperty( + items=( + ("RENDER_CLIENT", "Client", "Act as render client"), + ("RENDER_MASTER", "Master", "Act as render master"), + ("RENDER_SLAVE", "Slave", "Act as render slave"), + ), + name="Network mode", + description="Mode of operation of this instance", + default="RENDER_CLIENT") + + NetRenderSettings.slaves = CollectionProperty(type=NetRenderSlave, name="Slaves", description="") + NetRenderSettings.slaves_blacklist = CollectionProperty(type=NetRenderSlave, name="Slaves Blacklist", description="") + NetRenderSettings.jobs = CollectionProperty(type=NetRenderJob, name="Job List", description="") + + bpy.types.Scene.network_render = PointerProperty(type=NetRenderSettings, name="Network Render", description="Network Render Settings") + + @classmethod + def unregister(cls): + del bpy.types.Scene.network_render diff --git a/netrender/utils.py b/netrender/utils.py new file mode 100644 index 00000000..602f6cf3 --- /dev/null +++ b/netrender/utils.py @@ -0,0 +1,314 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import re +import http, http.client, http.server, urllib, socket +import subprocess, shutil, time, hashlib, zlib + +import netrender, netrender.model + + +try: + import bpy +except: + bpy = None + +VERSION = bytes(".".join((str(n) for n in netrender.bl_info["version"])), encoding='utf8') + +# Jobs status +JOB_WAITING = 0 # before all data has been entered +JOB_PAUSED = 1 # paused by user +JOB_FINISHED = 2 # finished rendering +JOB_QUEUED = 3 # ready to be dispatched + +JOB_STATUS_TEXT = { + JOB_WAITING: "Waiting", + JOB_PAUSED: "Paused", + JOB_FINISHED: "Finished", + JOB_QUEUED: "Queued" + } + + +# Frames status +QUEUED = 0 +DISPATCHED = 1 +DONE = 2 +ERROR = 3 + +FRAME_STATUS_TEXT = { + QUEUED: "Queued", + DISPATCHED: "Dispatched", + DONE: "Done", + ERROR: "Error" + } + +class DirectoryContext: + def __init__(self, path): + self.path = path + + def __enter__(self): + self.curdir = os.path.abspath(os.curdir) + os.chdir(self.path) + + def __exit__(self, exc_type, exc_value, traceback): + os.chdir(self.curdir) + +class BreakableIncrementedSleep: + def __init__(self, increment, default_timeout, max_timeout, break_fct): + self.increment = increment + self.default = default_timeout + self.max = max_timeout + self.current = self.default + self.break_fct = break_fct + + def reset(self): + self.current = self.default + + def increase(self): + self.current = min(self.current + self.increment, self.max) + + def sleep(self): + for i in range(self.current): + time.sleep(1) + if self.break_fct(): + break + + self.increase() + +def responseStatus(conn): + response = conn.getresponse() + response.read() + return response.status + +def reporting(report, message, errorType = None): + if errorType: + t = 'ERROR' + else: + t = 'INFO' + + if report: + report(t, message) + return None + elif errorType: + raise errorType(message) + else: + return None + +def clientScan(report = None): + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + s.settimeout(30) + + s.bind(('', 8000)) + + buf, address = s.recvfrom(64) + + address = address[0] + port = int(str(buf, encoding='utf8')) + + reporting(report, "Master server found") + + return (address, port) + except socket.timeout: + reporting(report, "No master server on network", IOError) + + return ("", 8000) # return default values + +def clientConnection(address, port, report = None, scan = True, timeout = 5): + if address == "[default]": +# calling operator from python is fucked, scene isn't in context +# if bpy: +# bpy.ops.render.netclientscan() +# else: + if not scan: + return None + + address, port = clientScan() + if address == "": + return None + + try: + conn = http.client.HTTPConnection(address, port, timeout = timeout) + + if conn: + if clientVerifyVersion(conn): + return conn + else: + conn.close() + reporting(report, "Incorrect master version", ValueError) + except BaseException as err: + if report: + report('ERROR', str(err)) + return None + else: + print(err) + return None + +def clientVerifyVersion(conn): + conn.request("GET", "/version") + response = conn.getresponse() + + if response.status != http.client.OK: + conn.close() + return False + + server_version = response.read() + + if server_version != VERSION: + print("Incorrect server version!") + print("expected", str(VERSION, encoding='utf8'), "received", str(server_version, encoding='utf8')) + return False + + return True + +def fileURL(job_id, file_index): + return "/file_%s_%i" % (job_id, file_index) + +def logURL(job_id, frame_number): + return "/log_%s_%i.log" % (job_id, frame_number) + +def renderURL(job_id, frame_number): + return "/render_%s_%i.exr" % (job_id, frame_number) + +def cancelURL(job_id): + return "/cancel_%s" % (job_id) + +def hashFile(path): + f = open(path, "rb") + value = hashData(f.read()) + f.close() + return value + +def hashData(data): + m = hashlib.md5() + m.update(data) + return m.hexdigest() + + +def prefixPath(prefix_directory, file_path, prefix_path, force = False): + if (os.path.isabs(file_path) or + len(file_path) >= 3 and (file_path[1:3] == ":/" or file_path[1:3] == ":\\") or # Windows absolute path don't count as absolute on unix, have to handle them myself + file_path[0] == "/" or file_path[0] == "\\"): # and vice versa + + # if an absolute path, make sure path exists, if it doesn't, use relative local path + full_path = file_path + if force or not os.path.exists(full_path): + p, n = os.path.split(os.path.normpath(full_path)) + + if prefix_path and p.startswith(prefix_path): + if len(prefix_path) < len(p): + directory = os.path.join(prefix_directory, p[len(prefix_path)+1:]) # +1 to remove separator + if not os.path.exists(directory): + os.mkdir(directory) + else: + directory = prefix_directory + full_path = os.path.join(directory, n) + else: + full_path = os.path.join(prefix_directory, n) + else: + full_path = os.path.join(prefix_directory, file_path) + + return full_path + +def getResults(server_address, server_port, job_id, resolution_x, resolution_y, resolution_percentage, frame_ranges): + if bpy.app.debug: + print("=============================================") + print("============= FETCHING RESULTS ==============") + + frame_arguments = [] + for r in frame_ranges: + if len(r) == 2: + frame_arguments.extend(["-s", str(r[0]), "-e", str(r[1]), "-a"]) + else: + frame_arguments.extend(["-f", str(r[0])]) + + filepath = os.path.join(bpy.app.tempdir, "netrender_temp.blend") + bpy.ops.wm.save_as_mainfile(filepath=filepath, copy=True, check_existing=False) + + arguments = [sys.argv[0], "-b", "-noaudio", filepath, "-o", bpy.path.abspath(bpy.context.scene.render.filepath), "-P", __file__] + frame_arguments + ["--", "GetResults", server_address, str(server_port), job_id, str(resolution_x), str(resolution_y), str(resolution_percentage)] + if bpy.app.debug: + print("Starting subprocess:") + print(" ".join(arguments)) + + process = subprocess.Popen(arguments, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + while process.poll() is None: + stdout = process.stdout.read(1024) + if bpy.app.debug: + print(str(stdout, encoding='utf-8'), end="") + + + # read leftovers if needed + stdout = process.stdout.read() + if bpy.app.debug: + print(str(stdout, encoding='utf-8')) + + os.remove(filepath) + + if bpy.app.debug: + print("=============================================") + return + +def _getResults(server_address, server_port, job_id, resolution_x, resolution_y, resolution_percentage): + render = bpy.context.scene.render + + netsettings = bpy.context.scene.network_render + + netsettings.server_address = server_address + netsettings.server_port = int(server_port) + netsettings.job_id = job_id + + render.engine = 'NET_RENDER' + render.resolution_x = int(resolution_x) + render.resolution_y = int(resolution_y) + render.resolution_percentage = int(resolution_percentage) + + render.use_full_sample = False + render.use_compositing = False + render.use_border = False + + +def getFileInfo(filepath, infos): + process = subprocess.Popen([sys.argv[0], "-b", "-noaudio", filepath, "-P", __file__, "--", "FileInfo"] + infos, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout = bytes() + while process.poll() is None: + stdout += process.stdout.read(1024) + + # read leftovers if needed + stdout += process.stdout.read() + + stdout = str(stdout, encoding="utf8") + + values = [eval(v[1:].strip()) for v in stdout.split("\n") if v.startswith("$")] + + return values + + +if __name__ == "__main__": + try: + start = sys.argv.index("--") + 1 + except ValueError: + start = 0 + action, *args = sys.argv[start:] + + if action == "FileInfo": + for info in args: + print("$", eval(info)) + elif action == "GetResults": + _getResults(args[0], args[1], args[2], args[3], args[4], args[5]) diff --git a/netrender/versioning.py b/netrender/versioning.py new file mode 100644 index 00000000..d4892e3e --- /dev/null +++ b/netrender/versioning.py @@ -0,0 +1,108 @@ +import sys, os +import re +import subprocess + +from netrender.utils import * + +class AbstractVCS: + name = "ABSTRACT VCS" + def __init__(self): + pass + + def update(self, info): + """update(info) + Update a working copy to the specified revision. + If working copy doesn't exist, do a full get from server to create it. + [info] model.VersioningInfo instance, specifies the working path, remote path and version number.""" + pass + + def revision(self, path): + """revision(path) + return the current revision of the specified working copy path""" + pass + + def path(self, path): + """path(path) + return the remote path of the specified working copy path""" + pass + +class Subversion(AbstractVCS): + name = "Subversion" + def __init__(self): + super().__init__() + self.version_exp = re.compile("([0-9]*)") + self.path_exp = re.compile("URL: (.*)") + + def update(self, info): + if not os.path.exists(info.wpath): + base, folder = os.path.split(info.wpath) + + with DirectoryContext(base): + subprocess.call(["svn", "co", "%s@%s" % (info.rpath, str(info.revision)), folder]) + else: + with DirectoryContext(info.wpath): + subprocess.call(["svn", "up", "--accept", "theirs-full", "-r", str(info.revision)]) + + def revision(self, path): + if not os.path.exists(path): + return + + with DirectoryContext(path): + stdout = subprocess.check_output(["svnversion"]) + + match = self.version_exp.match(str(stdout, encoding="utf-8")) + + if match: + return match.group(1) + + def path(self, path): + if not os.path.exists(path): + return + + with DirectoryContext(path): + stdout = subprocess.check_output(["svn", "info"]) + + match = self.path_exp.search(str(stdout, encoding="utf-8")) + + if match: + return match.group(1) + +class Git(AbstractVCS): + name = "Git" + def __init__(self): + super().__init__() + self.version_exp = re.compile("^commit (.*)") + + def update(self, info): + if not os.path.exists(info.wpath): + base, folder = os.path.split(info.wpath) + + with DirectoryContext(base): + subprocess.call(["git", "clone", "%s" % (info.rpath), folder]) + + with DirectoryContext(info.wpath): + subprocess.call(["git", "checkout", str(info.revision)]) + + def revision(self, path): + if not os.path.exists(path): + return + + with DirectoryContext(path): + stdout = subprocess.check_output(["git", "show"]) + + match = self.version_exp.search(str(stdout, encoding="utf-8")) + + if match: + return match.group(1) + + def path(self, path): + if not os.path.exists(path): + return + + # find something that could somehow work for git (fun times) + return path + +SYSTEMS = { + Subversion.name: Subversion(), + Git.name: Git() + } -- cgit v1.2.3