Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender-addons.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'netrender/slave.py')
-rw-r--r--netrender/slave.py359
1 files changed, 359 insertions, 0 deletions
diff --git a/netrender/slave.py b/netrender/slave.py
new file mode 100644
index 00000000..7f72530e
--- /dev/null
+++ b/netrender/slave.py
@@ -0,0 +1,359 @@
+# ##### BEGIN GPL LICENSE BLOCK #####
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software Foundation,
+# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+#
+# ##### END GPL LICENSE BLOCK #####
+
+import sys, os, platform, shutil
+import http, http.client, http.server, urllib
+import subprocess, time
+import json
+
+import bpy
+
+from netrender.utils import *
+import netrender.model
+import netrender.repath
+import netrender.thumbnail as thumbnail
+
+BLENDER_PATH = sys.argv[0]
+
+CANCEL_POLL_SPEED = 2
+MAX_TIMEOUT = 10
+INCREMENT_TIMEOUT = 1
+MAX_CONNECT_TRY = 10
+try:
+ system = platform.system()
+except UnicodeDecodeError:
+ import sys
+ system = sys.platform
+
+if system in ('Windows', 'win32') and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5
+ import ctypes
+ def SetErrorMode():
+ val = ctypes.windll.kernel32.SetErrorMode(0x0002)
+ ctypes.windll.kernel32.SetErrorMode(val | 0x0002)
+ return val
+
+ def RestoreErrorMode(val):
+ ctypes.windll.kernel32.SetErrorMode(val)
+else:
+ def SetErrorMode():
+ return 0
+
+ def RestoreErrorMode(val):
+ pass
+
+def clearSlave(path):
+ shutil.rmtree(path)
+
+def slave_Info():
+ sysname, nodename, release, version, machine, processor = platform.uname()
+ slave = netrender.model.RenderSlave()
+ slave.name = nodename
+ slave.stats = sysname + " " + release + " " + machine + " " + processor
+ return slave
+
+def testCancel(conn, job_id, frame_number):
+ conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)})
+
+ # canceled if job isn't found anymore
+ if responseStatus(conn) == http.client.NO_CONTENT:
+ return True
+ else:
+ return False
+
+def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None):
+ job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path)
+
+ found = os.path.exists(job_full_path)
+
+ if found and rfile.signature != None:
+ found_signature = hashFile(job_full_path)
+ found = found_signature == rfile.signature
+
+ if not found:
+ print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path))
+ os.remove(job_full_path)
+ job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True)
+
+ if not found:
+ # Force prefix path if not found
+ job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True)
+ temp_path = os.path.join(JOB_PREFIX, "slave.temp")
+ conn.request("GET", fileURL(job_id, rfile.index), headers={"slave-id":slave_id})
+ response = conn.getresponse()
+
+ if response.status != http.client.OK:
+ return None # file for job not returned by server, need to return an error code to server
+
+ f = open(temp_path, "wb")
+ buf = response.read(1024)
+
+ while buf:
+ f.write(buf)
+ buf = response.read(1024)
+
+ f.close()
+
+ os.renames(temp_path, job_full_path)
+
+ rfile.filepath = job_full_path
+
+ return job_full_path
+
+def breakable_timeout(timeout):
+ for i in range(timeout):
+ time.sleep(1)
+ if engine.test_break():
+ break
+
+def render_slave(engine, netsettings, threads):
+ timeout = 1
+
+ bisleep = BreakableIncrementedSleep(INCREMENT_TIMEOUT, 1, MAX_TIMEOUT, engine.test_break)
+
+ engine.update_stats("", "Network render node initiation")
+
+ slave_path = bpy.path.abspath(netsettings.path)
+
+ if not os.path.exists(slave_path):
+ print("Slave working path ( %s ) doesn't exist" % netsettings.path)
+ return
+
+ if not os.access(slave_path, os.W_OK):
+ print("Slave working path ( %s ) is not writable" % netsettings.path)
+ return
+
+ conn = clientConnection(netsettings.server_address, netsettings.server_port)
+
+ if not conn:
+ timeout = 1
+ print("Connection failed, will try connecting again at most %i times" % MAX_CONNECT_TRY)
+ bisleep.reset()
+
+ for i in range(MAX_CONNECT_TRY):
+ bisleep.sleep()
+
+ conn = clientConnection(netsettings.server_address, netsettings.server_port)
+
+ if conn or engine.test_break():
+ break
+
+ print("Retry %i failed, waiting %is before retrying" % (i + 1, bisleep.current))
+
+ if conn:
+ conn.request("POST", "/slave", json.dumps(slave_Info().serialize()))
+ response = conn.getresponse()
+ response.read()
+
+ slave_id = response.getheader("slave-id")
+
+ NODE_PREFIX = os.path.join(slave_path, "slave_" + slave_id)
+ if not os.path.exists(NODE_PREFIX):
+ os.mkdir(NODE_PREFIX)
+
+ engine.update_stats("", "Network render connected to master, waiting for jobs")
+
+ while not engine.test_break():
+ conn.request("GET", "/job", headers={"slave-id":slave_id})
+ response = conn.getresponse()
+
+ if response.status == http.client.OK:
+ bisleep.reset()
+
+ job = netrender.model.RenderJob.materialize(json.loads(str(response.read(), encoding='utf8')))
+ engine.update_stats("", "Network render processing job from master")
+
+ JOB_PREFIX = os.path.join(NODE_PREFIX, "job_" + job.id)
+ if not os.path.exists(JOB_PREFIX):
+ os.mkdir(JOB_PREFIX)
+
+ # set tempdir for fsaa temp files
+ # have to set environ var because render is done in a subprocess and that's the easiest way to propagate the setting
+ os.environ["TMP"] = JOB_PREFIX
+
+
+ if job.type == netrender.model.JOB_BLENDER:
+ job_path = job.files[0].filepath # path of main file
+ main_path, main_file = os.path.split(job_path)
+
+ job_full_path = testFile(conn, job.id, slave_id, job.files[0], JOB_PREFIX)
+ print("Fullpath", job_full_path)
+ print("File:", main_file, "and %i other files" % (len(job.files) - 1,))
+
+ for rfile in job.files[1:]:
+ testFile(conn, job.id, slave_id, rfile, JOB_PREFIX, main_path)
+ print("\t", rfile.filepath)
+
+ netrender.repath.update(job)
+
+ engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id)
+ elif job.type == netrender.model.JOB_VCS:
+ if not job.version_info:
+ # Need to return an error to server, incorrect job type
+ pass
+
+ job_path = job.files[0].filepath # path of main file
+ main_path, main_file = os.path.split(job_path)
+
+ job.version_info.update()
+
+ # For VCS jobs, file path is relative to the working copy path
+ job_full_path = os.path.join(job.version_info.wpath, job_path)
+
+ engine.update_stats("", "Render File "+ main_file+ " for job "+ job.id)
+
+ # announce log to master
+ logfile = netrender.model.LogFile(job.id, slave_id, [frame.number for frame in job.frames])
+ conn.request("POST", "/log", bytes(json.dumps(logfile.serialize()), encoding='utf8'))
+ response = conn.getresponse()
+ response.read()
+
+
+ first_frame = job.frames[0].number
+
+ # start render
+ start_t = time.time()
+
+ if job.rendersWithBlender():
+ frame_args = []
+
+ for frame in job.frames:
+ print("frame", frame.number)
+ frame_args += ["-f", str(frame.number)]
+
+ val = SetErrorMode()
+ process = subprocess.Popen([BLENDER_PATH, "-b", "-noaudio", job_full_path, "-t", str(threads), "-o", os.path.join(JOB_PREFIX, "######"), "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ RestoreErrorMode(val)
+ elif job.type == netrender.model.JOB_PROCESS:
+ command = job.frames[0].command
+ val = SetErrorMode()
+ process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ RestoreErrorMode(val)
+
+ headers = {"slave-id":slave_id}
+
+ cancelled = False
+ stdout = bytes()
+ run_t = time.time()
+ while not cancelled and process.poll() is None:
+ stdout += process.stdout.read(1024)
+ current_t = time.time()
+ cancelled = engine.test_break()
+ if current_t - run_t > CANCEL_POLL_SPEED:
+
+ # update logs if needed
+ if stdout:
+ # (only need to update on one frame, they are linked
+ conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
+ responseStatus(conn)
+
+ # Also output on console
+ if netsettings.use_slave_output_log:
+ print(str(stdout, encoding='utf8'), end="")
+
+ stdout = bytes()
+
+ run_t = current_t
+ if testCancel(conn, job.id, first_frame):
+ cancelled = True
+
+ if job.type == netrender.model.JOB_BLENDER:
+ netrender.repath.reset(job)
+
+ # read leftovers if needed
+ stdout += process.stdout.read()
+
+ if cancelled:
+ # kill process if needed
+ if process.poll() is None:
+ try:
+ process.terminate()
+ except OSError:
+ pass
+ continue # to next frame
+
+ # flush the rest of the logs
+ if stdout:
+ # Also output on console
+ if netsettings.use_slave_thumb:
+ print(str(stdout, encoding='utf8'), end="")
+
+ # (only need to update on one frame, they are linked
+ conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers)
+ if responseStatus(conn) == http.client.NO_CONTENT:
+ continue
+
+ total_t = time.time() - start_t
+
+ avg_t = total_t / len(job.frames)
+
+ status = process.returncode
+
+ print("status", status)
+
+ headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)}
+
+
+ if status == 0: # non zero status is error
+ headers["job-result"] = str(DONE)
+ for frame in job.frames:
+ headers["job-frame"] = str(frame.number)
+ if job.hasRenderResult():
+ # send image back to server
+
+ filename = os.path.join(JOB_PREFIX, "%06d.exr" % frame.number)
+
+ # thumbnail first
+ if netsettings.use_slave_thumb:
+ thumbname = thumbnail.generate(filename)
+
+ if thumbname:
+ f = open(thumbname, 'rb')
+ conn.request("PUT", "/thumb", f, headers=headers)
+ f.close()
+ responseStatus(conn)
+
+ f = open(filename, 'rb')
+ conn.request("PUT", "/render", f, headers=headers)
+ f.close()
+ if responseStatus(conn) == http.client.NO_CONTENT:
+ continue
+
+ elif job.type == netrender.model.JOB_PROCESS:
+ conn.request("PUT", "/render", headers=headers)
+ if responseStatus(conn) == http.client.NO_CONTENT:
+ continue
+ else:
+ headers["job-result"] = str(ERROR)
+ for frame in job.frames:
+ headers["job-frame"] = str(frame.number)
+ # send error result back to server
+ conn.request("PUT", "/render", headers=headers)
+ if responseStatus(conn) == http.client.NO_CONTENT:
+ continue
+
+ engine.update_stats("", "Network render connected to master, waiting for jobs")
+ else:
+ bisleep.sleep()
+
+ conn.close()
+
+ if netsettings.use_slave_clear:
+ clearSlave(NODE_PREFIX)
+
+if __name__ == "__main__":
+ pass