# ##### BEGIN GPL LICENSE BLOCK ##### # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # # ##### END GPL LICENSE BLOCK ##### import sys, os, platform, shutil import http, http.client, http.server, urllib import subprocess, time import json import bpy from netrender.utils import * import netrender.model import netrender.repath import netrender.thumbnail as thumbnail BLENDER_PATH = sys.argv[0] CANCEL_POLL_SPEED = 2 MAX_TIMEOUT = 10 INCREMENT_TIMEOUT = 1 MAX_CONNECT_TRY = 10 try: system = platform.system() except UnicodeDecodeError: import sys system = sys.platform if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5 import ctypes def SetErrorMode(): val = ctypes.windll.kernel32.SetErrorMode(0x0002) ctypes.windll.kernel32.SetErrorMode(val | 0x0002) return val def RestoreErrorMode(val): ctypes.windll.kernel32.SetErrorMode(val) else: def SetErrorMode(): return 0 def RestoreErrorMode(val): pass def clearSlave(path): shutil.rmtree(path) def slave_Info(): sysname, nodename, release, version, machine, processor = platform.uname() slave = netrender.model.RenderSlave() slave.name = nodename slave.stats = sysname + " " + release + " " + machine + " " + processor return slave def testCancel(conn, job_id, frame_number): conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)}) # canceled if job isn't found anymore if responseStatus(conn) == http.client.NO_CONTENT: return True else: return False def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None): job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path) found = os.path.exists(job_full_path) if found and rfile.signature != None: found_signature = hashFile(job_full_path) found = found_signature == rfile.signature if not found: print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) if not found: # Force prefix path if not found job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) temp_path = os.path.join(JOB_PREFIX, "slave.temp") conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id}) response = conn.getresponse() if response.status != http.client.OK: return None # file for job not returned by server, need to return an error code to server f = open(temp_path, "wb") buf = response.read(1024) while buf: f.write(buf) buf = response.read(1024) f.close() os.renames(temp_path, job_full_path) rfile.filepath = job_full_path return job_full_path def breakable_timeout(timeout): for i in range(timeout): time.sleep(1) if engine.test_break(): break def render_slave(engine, netsettings, threads): timeout = 1 bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break) engine.update_stats("", "Network render node initiation") conn = clientConnection(netsettings.server_address, netsettings.server_port) if not conn: timeout = 1 print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY) bisleep.reset() for i in range(MAX_CONNECT_TRY): bisleep.sleep() conn = clientConnection(netsettings.server_address, netsettings.server_port) if conn or engine.test_break(): break print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current)) if conn: conn.request("POST", "/slave", json.dumps(slave_Info().serialize())) response = conn.getresponse() response.read() slave_id = response.getheader("slave-id") NODE_PREFIX = os.path.join(bpy.path.abspath(netsettings.path), "slave_" + slave_id) if not os.path.exists(NODE_PREFIX): os.mkdir(NODE_PREFIX) engine.update_stats("", "Network render connected to master, waiting for jobs") while not engine.test_break(): conn.request("GET", "/job", headers={"slave-id":slave_id}) response = conn.getresponse() if response.status == http.client.OK: bisleep.reset() job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8'))) engine.update_stats("", "Network render processing job from master") JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id) if not os.path.exists(JOB_PREFIX): os.mkdir(JOB_PREFIX) # set tempdir for fsaa temp files # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting os.environ["TMP"] = JOB_PREFIX if job.type == netrender.model.JOB_BLENDER: job_path = job.files[0].filepath # path of main file main_path, main_file = os.path.split(job_path) job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX) print("Fullpath", job_full_path) print("File:", main_file, "and %i other files" % (len(job.files) - 1,)) for rfile in job.files[1:]: testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path) print("\t", rfile.filepath) netrender.repath.update(job) engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) elif job.type == netrender.model.JOB_VCS: if not job.version_info: # Need to return an error to server, incorrect job type pass job_path = job.files[0].filepath # path of main file main_path, main_file = os.path.split(job_path) job.version_info.update() # For VCS jobs, file path is relative to the working copy path job_full_path = os.path.join(job.version_info.wpath, job_path) engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id) # announce log to master logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames]) conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8')) response = conn.getresponse() response.read() first_frame = job.frames[0].number # start render start_t = time.time() if job.rendersWithBlender(): frame_args = [] for frame in job.frames: print("frame", frame.number) frame_args += ["-f", str(frame.number)] val = SetErrorMode() process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) RestoreErrorMode(val) elif job.type == netrender.model.JOB_PROCESS: command = job.frames[0].command val = SetErrorMode() process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) RestoreErrorMode(val) headers = {"slave-id":slave_id} cancelled = False stdout = bytes() run_t = time.time() while not cancelled and process.poll() is None: stdout += process.stdout.read(1024) current_t = time.time() cancelled = engine.test_break() if current_t - run_t > CANCEL_POLL_SPEED: # update logs if needed if stdout: # (only need to update on one frame, they are linked conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) response = conn.getresponse() response.read() # Also output on console if netsettings.use_slave_output_log: print(str(stdout, encoding='utf8'), end="") stdout = bytes() run_t = current_t if testCancel(conn, job.id, first_frame): cancelled = True if job.type == netrender.model.JOB_BLENDER: netrender.repath.reset(job) # read leftovers if needed stdout += process.stdout.read() if cancelled: # kill process if needed if process.poll() is None: try: process.terminate() except OSError: pass continue # to next frame # flush the rest of the logs if stdout: # Also output on console if netsettings.use_slave_thumb: print(str(stdout, encoding='utf8'), end="") # (only need to update on one frame, they are linked conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) if responseStatus(conn) == http.client.NO_CONTENT: continue total_t = time.time() - start_t avg_t = total_t / len(job.frames) status = process.returncode print("status", status) headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)} if status == 0: # non zero status is error headers["job-result"] = str(DONE) for frame in job.frames: headers["job-frame"] = str(frame.number) if job.hasRenderResult(): # send image back to server filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number) # thumbnail first if netsettings.use_slave_thumb: thumbname = thumbnail.generate(filename) if thumbname: f = open(thumbname, 'rb') conn.request("PUT", "/thumb", f, headers=headers) f.close() responseStatus(conn) f = open(filename, 'rb') conn.request("PUT", "/render", f, headers=headers) f.close() if responseStatus(conn) == http.client.NO_CONTENT: continue elif job.type == netrender.model.JOB_PROCESS: conn.request("PUT", "/render", headers=headers) if responseStatus(conn) == http.client.NO_CONTENT: continue else: headers["job-result"] = str(ERROR) for frame in job.frames: headers["job-frame"] = str(frame.number) # send error result back to server conn.request("PUT", "/render", headers=headers) if responseStatus(conn) == http.client.NO_CONTENT: continue engine.update_stats("", "Network render connected to master, waiting for jobs") else: bisleep.sleep() conn.close() if netsettings.use_slave_clear: clearSlave(NODE_PREFIX) if __name__ == "__main__": pass