diff options
Diffstat (limited to 'netrender/master.py')
-rw-r--r-- | netrender/master.py | 1078 |
1 files changed, 1078 insertions, 0 deletions
diff --git a/netrender/master.py b/netrender/master.py new file mode 100644 index 00000000..b34509e3 --- /dev/null +++ b/netrender/master.py @@ -0,0 +1,1078 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + +import sys, os +import http, http.client, http.server, urllib, socket, socketserver, threading +import subprocess, shutil, time, hashlib +import pickle +import select # for select.error +import json + +from netrender.utils import * +import netrender.model +import netrender.balancing +import netrender.master_html +import netrender.thumbnail as thumbnail + +class MRenderFile(netrender.model.RenderFile): + def __init__(self, filepath, index, start, end, signature): + super().__init__(filepath, index, start, end, signature) + self.found = False + + def test(self): + self.found = os.path.exists(self.filepath) + if self.found and self.signature != None: + found_signature = hashFile(self.filepath) + self.found = self.signature == found_signature + + return self.found + + +class MRenderSlave(netrender.model.RenderSlave): + def __init__(self, name, address, stats): + super().__init__() + self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest() + self.name = name + self.address = address + self.stats = stats + self.last_seen = time.time() + + self.job = None + self.job_frames = [] + + netrender.model.RenderSlave._slave_map[self.id] = self + + def seen(self): + self.last_seen = time.time() + + def finishedFrame(self, frame_number): + self.job_frames.remove(frame_number) + if not self.job_frames: + self.job = None + +class MRenderJob(netrender.model.RenderJob): + def __init__(self, job_id, job_info): + super().__init__(job_info) + self.id = job_id + self.last_dispatched = time.time() + + # force one chunk for process jobs + if self.type == netrender.model.JOB_PROCESS: + self.chunks = 1 + + # Force WAITING status on creation + self.status = JOB_WAITING + + # special server properties + self.last_update = 0 + self.save_path = "" + self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files] + + def initInfo(self): + if not self.resolution: + self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"])) + + def save(self): + if self.save_path: + f = open(os.path.join(self.save_path, "job.txt"), "w") + f.write(json.dumps(self.serialize())) + f.close() + + def edit(self, info_map): + if "status" in info_map: + self.status = info_map["status"] + + if "priority" in info_map: + self.priority = info_map["priority"] + + if "chunks" in info_map: + self.chunks = info_map["chunks"] + + def testStart(self): + # Don't test files for versionned jobs + if not self.version_info: + for f in self.files: + if not f.test(): + return False + + self.start() + self.initInfo() + return True + + def testFinished(self): + for f in self.frames: + if f.status == QUEUED or f.status == DISPATCHED: + break + else: + self.status = JOB_FINISHED + + def pause(self, status = None): + if self.status not in {JOB_PAUSED, JOB_QUEUED}: + return + + if status is None: + self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED + elif status: + self.status = JOB_QUEUED + else: + self.status = JOB_PAUSED + + def start(self): + self.status = JOB_QUEUED + + def addLog(self, frames): + log_name = "_".join(("%06d" % f for f in frames)) + ".log" + log_path = os.path.join(self.save_path, log_name) + + for number in frames: + frame = self[number] + if frame: + frame.log_path = log_path + + def addFrame(self, frame_number, command): + frame = MRenderFrame(frame_number, command) + self.frames.append(frame) + return frame + + def reset(self, all): + for f in self.frames: + f.reset(all) + + def getFrames(self): + frames = [] + for f in self.frames: + if f.status == QUEUED: + self.last_dispatched = time.time() + frames.append(f) + if len(frames) >= self.chunks: + break + + return frames + +class MRenderFrame(netrender.model.RenderFrame): + def __init__(self, frame, command): + super().__init__() + self.number = frame + self.slave = None + self.time = 0 + self.status = QUEUED + self.command = command + + self.log_path = None + + def reset(self, all): + if all or self.status == ERROR: + self.log_path = None + self.slave = None + self.time = 0 + self.status = QUEUED + + +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)") +render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr") +thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg") +log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log") +reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)") +cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)") +pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)") +edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)") + +class RenderHandler(http.server.BaseHTTPRequestHandler): + def log_message(self, format, *args): + # override because the original calls self.address_string(), which + # is extremely slow due to some timeout.. + sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args)) + + def getInfoMap(self): + length = int(self.headers['content-length']) + + if length > 0: + msg = str(self.rfile.read(length), encoding='utf8') + return json.loads(msg) + else: + return {} + + def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"): + self.send_response(code) + self.send_header("Content-type", content) + + for key, value in headers.items(): + self.send_header(key, value) + + self.end_headers() + + def do_HEAD(self): + + if self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + job = self.server.getJobID(job_id) + if job: + frame = job[job_frame] + + + if frame: + self.send_head(http.client.OK) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + + def do_GET(self): + + if self.path == "/version": + self.send_head() + self.server.stats("", "Version check") + self.wfile.write(VERSION) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/render"): + match = render_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.ACCEPTED) + elif frame.status == DONE: + self.server.stats("", "Sending result to client") + + filename = os.path.join(job.save_path, "%06d.exr" % frame_number) + + f = open(filename, 'rb') + self.send_head(content = "image/x-exr") + shutil.copyfileobj(f, self.wfile) + f.close() + elif frame.status == ERROR: + self.send_head(http.client.PARTIAL_CONTENT) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/thumb"): + match = thumb_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.ACCEPTED) + elif frame.status == DONE: + filename = os.path.join(job.save_path, "%06d.exr" % frame_number) + + thumbname = thumbnail.generate(filename) + + if thumbname: + f = open(thumbname, 'rb') + self.send_head(content = "image/jpeg") + shutil.copyfileobj(f, self.wfile) + f.close() + else: # thumbnail couldn't be generated + self.send_head(http.client.PARTIAL_CONTENT) + return + elif frame.status == ERROR: + self.send_head(http.client.PARTIAL_CONTENT) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/log"): + match = log_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + if not frame.log_path or frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.PROCESSING) + else: + self.server.stats("", "Sending log to client") + f = open(frame.log_path, 'rb') + + self.send_head(content = "text/plain") + + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid URL + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + if job_id: + + job = self.server.getJobID(job_id) + if job: + if job_frame != -1: + frame = job[frame] + + if frame: + message = frame.serialize() + else: + # no such frame + self.send_heat(http.client.NO_CONTENT) + return + else: + message = job.serialize() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + return + else: # status of all jobs + message = [] + + for job in self.server: + message.append(job.serialize()) + + + self.server.stats("", "Sending status") + self.send_head() + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/job": + self.server.balance() + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job, frames = self.server.newDispatch(slave_id) + + if job and frames: + for f in frames: + print("dispatch", f.number) + f.status = DISPATCHED + f.slave = slave + + slave.job = job + slave.job_frames = [f.number for f in frames] + + self.send_head(headers={"job-id": job.id}) + + message = job.serialize(frames) + + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + + self.server.stats("", "Sending job to slave") + else: + # no job available, return error code + slave.job = None + slave.job_frames = [] + + self.send_head(http.client.ACCEPTED) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/file"): + match = file_pattern.match(self.path) + + if match: + slave_id = self.headers['slave-id'] + slave = self.server.getSeenSlave(slave_id) + + if not slave: + # invalid slave id + print("invalid slave id") + + job_id = match.groups()[0] + file_index = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + render_file = job.files[file_index] + + if render_file: + self.server.stats("", "Sending file to slave") + f = open(render_file.filepath, 'rb') + + self.send_head() + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such file + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slaves": + message = [] + + self.server.stats("", "Sending slaves status") + + for slave in self.server.slaves: + message.append(slave.serialize()) + + self.send_head() + + self.wfile.write(bytes(json.dumps(message), encoding='utf8')) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + else: + # hand over the rest to the html section + netrender.master_html.get(self) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_POST(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path == "/job": + + length = int(self.headers['content-length']) + + job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8'))) + + job_id = self.server.nextJobID() + + job = MRenderJob(job_id, job_info) + + for frame in job_info.frames: + frame = job.addFrame(frame.number, frame.command) + + self.server.addJob(job) + + headers={"job-id": job_id} + + if job.testStart(): + self.server.stats("", "New job, started") + self.send_head(headers=headers) + else: + self.server.stats("", "New job, missing files (%i total)" % len(job.files)) + self.send_head(http.client.ACCEPTED, headers=headers) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/edit"): + match = edit_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + info_map = self.getInfoMap() + + job.edit(info_map) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/balance_limit": + info_map = self.getInfoMap() + for rule_id, limit in info_map.items(): + try: + rule = self.server.balancer.ruleByID(rule_id) + if rule: + rule.setLimit(limit) + except: + pass # invalid type + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/balance_enable": + info_map = self.getInfoMap() + for rule_id, enabled in info_map.items(): + rule = self.server.balancer.ruleByID(rule_id) + if rule: + rule.enabled = enabled + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/cancel"): + match = cancel_pattern.match(self.path) + + if match: + info_map = self.getInfoMap() + clear = info_map.get("clear", False) + + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + self.server.stats("", "Cancelling job") + self.server.removeJob(job, clear) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/pause"): + match = pause_pattern.match(self.path) + + if match: + info_map = self.getInfoMap() + status = info_map.get("status", None) + + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + self.server.stats("", "Pausing job") + job.pause(status) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: + # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/clear": + # cancel all jobs + info_map = self.getInfoMap() + clear = info_map.get("clear", False) + + self.server.stats("", "Clearing jobs") + self.server.clear(clear) + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/reset"): + match = reset_pattern.match(self.path) + + if match: + all = match.groups()[0] == 'all' + job_id = match.groups()[1] + job_frame = int(match.groups()[2]) + + job = self.server.getJobID(job_id) + + if job: + if job_frame != 0: + + frame = job[job_frame] + if frame: + self.server.stats("", "Reset job frame") + frame.reset(all) + self.send_head() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + + else: + self.server.stats("", "Reset job") + job.reset(all) + self.send_head() + + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slave": + length = int(self.headers['content-length']) + job_frame_string = self.headers['job-frame'] + + self.server.stats("", "New slave connected") + + slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False) + + slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats) + + self.send_head(headers = {"slave-id": slave_id}) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/log": + length = int(self.headers['content-length']) + + log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8'))) + + slave_id = log_info.slave_id + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job = self.server.getJobID(log_info.job_id) + + if job: + self.server.stats("", "Log announcement") + job.addLog(log_info.frames) + self.send_head(http.client.OK) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_PUT(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path.startswith("/file"): + match = file_pattern.match(self.path) + + if match: + self.server.stats("", "Receiving job") + + length = int(self.headers['content-length']) + job_id = match.groups()[0] + file_index = int(match.groups()[1]) + + job = self.server.getJobID(job_id) + + if job: + + render_file = job.files[file_index] + + if render_file: + main_file = job.files[0].filepath # filename of the first file + + main_path, main_name = os.path.split(main_file) + + if file_index > 0: + file_path = prefixPath(job.save_path, render_file.filepath, main_path) + else: + file_path = os.path.join(job.save_path, main_name) + + buf = self.rfile.read(length) + + # add same temp file + renames as slave + + f = open(file_path, "wb") + f.write(buf) + f.close() + del buf + + render_file.filepath = file_path # set the new path + + if job.testStart(): + self.server.stats("", "File upload, starting job") + self.send_head(http.client.OK) + else: + self.server.stats("", "File upload, file missings") + self.send_head(http.client.ACCEPTED) + else: # invalid file + print("file not found", job_id, file_index) + self.send_head(http.client.NO_CONTENT) + else: # job not found + print("job not found", job_id, file_index) + self.send_head(http.client.NO_CONTENT) + else: # invalid url + print("no match") + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/render": + self.server.stats("", "Receiving render result") + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + job_result = int(self.headers['job-result']) + job_time = float(self.headers['job-time']) + + frame = job[job_frame] + + if frame: + if job.hasRenderResult(): + if job_result == DONE: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb') + f.write(buf) + f.close() + + del buf + elif job_result == ERROR: + # blacklist slave on this job on error + # slaves might already be in blacklist if errors on the whole chunk + if not slave.id in job.blacklist: + job.blacklist.append(slave.id) + + slave.finishedFrame(job_frame) + + frame.status = job_result + frame.time = job_time + + job.testFinished() + + self.send_head() + # need some message content here or the slave doesn't like it + self.wfile.write(bytes("foo", encoding='utf8')) + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/thumb": + self.server.stats("", "Receiving thumbnail result") + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + + frame = job[job_frame] + + if frame: + if job.hasRenderResult(): + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb') + f.write(buf) + f.close() + + del buf + + self.send_head() + # need some message content here or the slave doesn't like it + self.wfile.write(bytes("foo", encoding='utf8')) + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path.startswith("/log"): + self.server.stats("", "Receiving log file") + + match = log_pattern.match(self.path) + + if match: + job_id = match.groups()[0] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(match.groups()[1]) + + frame = job[job_frame] + + if frame and frame.log_path: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(frame.log_path, 'ab') + f.write(buf) + f.close() + + del buf + + self.server.getSeenSlave(self.headers['slave-id']) + + self.send_head() + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid url + self.send_head(http.client.NO_CONTENT) + +class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + def __init__(self, address, handler_class, path, subdir=True): + super().__init__(address, handler_class) + self.jobs = [] + self.jobs_map = {} + self.slaves = [] + self.slaves_map = {} + self.job_id = 0 + + if subdir: + self.path = os.path.join(path, "master_" + str(os.getpid())) + else: + self.path = path + + self.slave_timeout = 5 # 5 mins: need a parameter for that + + self.balancer = netrender.balancing.Balancer() + self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs)) + self.balancer.addRule(netrender.balancing.RatingUsage()) + self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob()) + self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9)) + self.balancer.addPriority(netrender.balancing.NewJobPriority()) + self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2)) + + if not os.path.exists(self.path): + os.mkdir(self.path) + + def restore(self, jobs, slaves, balancer = None): + self.jobs = jobs + self.jobs_map = {} + + for job in self.jobs: + self.jobs_map[job.id] = job + self.job_id = max(self.job_id, int(job.id)) + + self.slaves = slaves + for slave in self.slaves: + self.slaves_map[slave.id] = slave + + if balancer: + self.balancer = balancer + + + def nextJobID(self): + self.job_id += 1 + return str(self.job_id) + + def addSlave(self, name, address, stats): + slave = MRenderSlave(name, address, stats) + self.slaves.append(slave) + self.slaves_map[slave.id] = slave + + return slave.id + + def removeSlave(self, slave): + self.slaves.remove(slave) + self.slaves_map.pop(slave.id) + + def getSlave(self, slave_id): + return self.slaves_map.get(slave_id) + + def getSeenSlave(self, slave_id): + slave = self.getSlave(slave_id) + if slave: + slave.seen() + + return slave + + def timeoutSlaves(self): + removed = [] + + t = time.time() + + for slave in self.slaves: + if (t - slave.last_seen) / 60 > self.slave_timeout: + removed.append(slave) + + if slave.job: + for f in slave.job_frames: + slave.job[f].status = ERROR + + for slave in removed: + self.removeSlave(slave) + + def updateUsage(self): + blend = 0.5 + for job in self.jobs: + job.usage *= (1 - blend) + + if self.slaves: + slave_usage = blend / self.countSlaves() + + for slave in self.slaves: + if slave.job: + slave.job.usage += slave_usage + + + def clear(self, clear_files = False): + removed = self.jobs[:] + + for job in removed: + self.removeJob(job, clear_files) + + def balance(self): + self.balancer.balance(self.jobs) + + def getJobs(self): + return self.jobs + + def countJobs(self, status = JOB_QUEUED): + total = 0 + for j in self.jobs: + if j.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(self.slaves) + + def removeJob(self, job, clear_files = False): + self.jobs.remove(job) + self.jobs_map.pop(job.id) + + if clear_files: + shutil.rmtree(job.save_path) + + for slave in self.slaves: + if slave.job == job: + slave.job = None + slave.job_frames = [] + + def addJob(self, job): + self.jobs.append(job) + self.jobs_map[job.id] = job + + # create job directory + job.save_path = os.path.join(self.path, "job_" + job.id) + if not os.path.exists(job.save_path): + os.mkdir(job.save_path) + + job.save() + + def getJobID(self, id): + return self.jobs_map.get(id) + + def __iter__(self): + for job in self.jobs: + yield job + + def newDispatch(self, slave_id): + if self.jobs: + for job in self.jobs: + if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist: + return job, job.getFrames() + + return None, None + +def clearMaster(path): + shutil.rmtree(path) + +def createMaster(address, clear, path): + filepath = os.path.join(path, "blender_master.data") + + if not clear and os.path.exists(filepath): + print("loading saved master:", filepath) + with open(filepath, 'rb') as f: + path, jobs, slaves = pickle.load(f) + + httpd = RenderMasterServer(address, RenderHandler, path, subdir=False) + httpd.restore(jobs, slaves) + + return httpd + + return RenderMasterServer(address, RenderHandler, path) + +def saveMaster(path, httpd): + filepath = os.path.join(path, "blender_master.data") + + with open(filepath, 'wb') as f: + pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL) + +def runMaster(address, broadcast, clear, path, update_stats, test_break): + httpd = createMaster(address, clear, path) + httpd.timeout = 1 + httpd.stats = update_stats + + if broadcast: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + start_time = time.time() - 2 + + while not test_break(): + try: + httpd.handle_request() + except select.error: + pass + + if time.time() - start_time >= 2: # need constant here + httpd.timeoutSlaves() + + httpd.updateUsage() + + if broadcast: + print("broadcasting address") + s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000)) + start_time = time.time() + + httpd.server_close() + if clear: + clearMaster(httpd.path) + else: + saveMaster(path, httpd) + |