diff options
Diffstat (limited to 'netrender/slave.py')
-rw-r--r-- | netrender/slave.py | 359 |
1 files changed, 359 insertions, 0 deletions
diff --git a/netrender/slave.py b/netrender/slave.py new file mode 100644 index 00000000..7f72530e --- /dev/null +++ b/netrender/slave.py @@ -0,0 +1,359 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os, platform, shutil +import http, http.client, http.server, urllib +import subprocess, time +import json + +import bpy + +from netrender.utils import * +import netrender.model +import netrender.repath +import netrender.thumbnail as thumbnail + +BLENDER_PATH = sys.argv[0] + +CANCEL_POLL_SPEED = 2 +MAX_TIMEOUT = 10 +INCREMENT_TIMEOUT = 1 +MAX_CONNECT_TRY = 10 +try: + system = platform.system() +except UnicodeDecodeError: + import sys + system = sys.platform + +if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5 + import ctypes + def SetErrorMode(): + val = ctypes.windll.kernel32.SetErrorMode(0x0002) + ctypes.windll.kernel32.SetErrorMode(val | 0x0002) + return val + + def RestoreErrorMode(val): + ctypes.windll.kernel32.SetErrorMode(val) +else: + def SetErrorMode(): + return 0 + + def RestoreErrorMode(val): + pass + +def clearSlave(path): + shutil.rmtree(path) + +def slave_Info(): + sysname, nodename, release, version, machine, processor = platform.uname() + slave = netrender.model.RenderSlave() + slave.name = nodename + slave.stats = sysname + " " + release + " " + machine + " " + processor + return slave + +def testCancel(conn, job_id, frame_number): + conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)}) + + # canceled if job isn't found anymore + if responseStatus(conn) == http.client.NO_CONTENT: + return True + else: + return False + +def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None): + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path) + + found = os.path.exists(job_full_path) + + if found and rfile.signature != None: + found_signature = hashFile(job_full_path) + found = found_signature == rfile.signature + + if not found: + print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) + os.remove(job_full_path) + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) + + if not found: + # Force prefix path if not found + job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) + temp_path = os.path.join(JOB_PREFIX, "slave.temp") + conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id}) + response = conn.getresponse() + + if response.status != http.client.OK: + return None # file for job not returned by server, need to return an error code to server + + f = open(temp_path, "wb") + buf = response.read(1024) + + while buf: + f.write(buf) + buf = response.read(1024) + + f.close() + + os.renames(temp_path, job_full_path) + + rfile.filepath = job_full_path + + return job_full_path + +def breakable_timeout(timeout): + for i in range(timeout): + time.sleep(1) + if engine.test_break(): + break + +def render_slave(engine, netsettings, threads): + timeout = 1 + + bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break) + + engine.update_stats("", "Network render node initiation") + + slave_path = bpy.path.abspath(netsettings.path) + + if not os.path.exists(slave_path): + print("Slave working path ( %s ) doesn't exist" % netsettings.path) + return + + if not os.access(slave_path, os.W_OK): + print("Slave working path ( %s ) is not writable" % netsettings.path) + return + + conn = clientConnection(netsettings.server_address, netsettings.server_port) + + if not conn: + timeout = 1 + print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY) + bisleep.reset() + + for i in range(MAX_CONNECT_TRY): + bisleep.sleep() + + conn = clientConnection(netsettings.server_address, netsettings.server_port) + + if conn or engine.test_break(): + break + + print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current)) + + if conn: + conn.request("POST", "/slave", json.dumps(slave_Info().serialize())) + response = conn.getresponse() + response.read() + + slave_id = response.getheader("slave-id") + + NODE_PREFIX = os.path.join(slave_path, "slave_" + slave_id) + if not os.path.exists(NODE_PREFIX): + os.mkdir(NODE_PREFIX) + + engine.update_stats("", "Network render connected to master, waiting for jobs") + + while not engine.test_break(): + conn.request("GET", "/job", headers={"slave-id":slave_id}) + response = conn.getresponse() + + if response.status == http.client.OK: + bisleep.reset() + + job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8'))) + engine.update_stats("", "Network render processing job from master") + + JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id) + if not os.path.exists(JOB_PREFIX): + os.mkdir(JOB_PREFIX) + + # set tempdir for fsaa temp files + # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting + os.environ["TMP"] = JOB_PREFIX + + + if job.type == netrender.model.JOB_BLENDER: + job_path = job.files[0].filepath # path of main file + main_path, main_file = os.path.split(job_path) + + job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX) + print("Fullpath", job_full_path) + print("File:", main_file, "and %i other files" % (len(job.files) - 1,)) + + for rfile in job.files[1:]: + testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path) + print("\t", rfile.filepath) + + netrender.repath.update(job) + + engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) + elif job.type == netrender.model.JOB_VCS: + if not job.version_info: + # Need to return an error to server, incorrect job type + pass + + job_path = job.files[0].filepath # path of main file + main_path, main_file = os.path.split(job_path) + + job.version_info.update() + + # For VCS jobs, file path is relative to the working copy path + job_full_path = os.path.join(job.version_info.wpath, job_path) + + engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) + + # announce log to master + logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames]) + conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8')) + response = conn.getresponse() + response.read() + + + first_frame = job.frames[0].number + + # start render + start_t = time.time() + + if job.rendersWithBlender(): + frame_args = [] + + for frame in job.frames: + print("frame", frame.number) + frame_args += ["-f", str(frame.number)] + + val = SetErrorMode() + process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + elif job.type == netrender.model.JOB_PROCESS: + command = job.frames[0].command + val = SetErrorMode() + process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + + headers = {"slave-id":slave_id} + + cancelled = False + stdout = bytes() + run_t = time.time() + while not cancelled and process.poll() is None: + stdout += process.stdout.read(1024) + current_t = time.time() + cancelled = engine.test_break() + if current_t - run_t > CANCEL_POLL_SPEED: + + # update logs if needed + if stdout: + # (only need to update on one frame, they are linked + conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + responseStatus(conn) + + # Also output on console + if netsettings.use_slave_output_log: + print(str(stdout, encoding='utf8'), end="") + + stdout = bytes() + + run_t = current_t + if testCancel(conn, job.id, first_frame): + cancelled = True + + if job.type == netrender.model.JOB_BLENDER: + netrender.repath.reset(job) + + # read leftovers if needed + stdout += process.stdout.read() + + if cancelled: + # kill process if needed + if process.poll() is None: + try: + process.terminate() + except OSError: + pass + continue # to next frame + + # flush the rest of the logs + if stdout: + # Also output on console + if netsettings.use_slave_thumb: + print(str(stdout, encoding='utf8'), end="") + + # (only need to update on one frame, they are linked + conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + total_t = time.time() - start_t + + avg_t = total_t / len(job.frames) + + status = process.returncode + + print("status", status) + + headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)} + + + if status == 0: # non zero status is error + headers["job-result"] = str(DONE) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + if job.hasRenderResult(): + # send image back to server + + filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number) + + # thumbnail first + if netsettings.use_slave_thumb: + thumbname = thumbnail.generate(filename) + + if thumbname: + f = open(thumbname, 'rb') + conn.request("PUT", "/thumb", f, headers=headers) + f.close() + responseStatus(conn) + + f = open(filename, 'rb') + conn.request("PUT", "/render", f, headers=headers) + f.close() + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + elif job.type == netrender.model.JOB_PROCESS: + conn.request("PUT", "/render", headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + else: + headers["job-result"] = str(ERROR) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + # send error result back to server + conn.request("PUT", "/render", headers=headers) + if responseStatus(conn) == http.client.NO_CONTENT: + continue + + engine.update_stats("", "Network render connected to master, waiting for jobs") + else: + bisleep.sleep() + + conn.close() + + if netsettings.use_slave_clear: + clearSlave(NODE_PREFIX) + +if __name__ == "__main__": + pass |