Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'release/scripts/io/netrender/master.py')
-rw-r--r--release/scripts/io/netrender/master.py1079
1 files changed, 0 insertions, 1079 deletions
diff --git a/release/scripts/io/netrender/master.py b/release/scripts/io/netrender/master.py
deleted file mode 100644
index 793e3bb51bf..00000000000
--- a/release/scripts/io/netrender/master.py
+++ /dev/null
@@ -1,1079 +0,0 @@
-# ##### BEGIN GPL LICENSE BLOCK #####
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-#
-# ##### END GPL LICENSE BLOCK #####
-
-import sys, os
-import http, http.client, http.server, urllib, socket, socketserver, threading
-import subprocess, shutil, time, hashlib
-import pickle
-import select # for select.error
-import json
-
-from netrender.utils import *
-import netrender.model
-import netrender.balancing
-import netrender.master_html
-import netrender.thumbnail as thumbnail
-
-class MRenderFile(netrender.model.RenderFile):
- def __init__(self, filepath, index, start, end, signature):
- super().__init__(filepath, index, start, end, signature)
- self.found = False
-
- def test(self):
- self.found = os.path.exists(self.filepath)
- if self.found and self.signature != None:
- found_signature = hashFile(self.filepath)
- self.found = self.signature == found_signature
-
- return self.found
-
-
-class MRenderSlave(netrender.model.RenderSlave):
- def __init__(self, name, address, stats):
- super().__init__()
- self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest()
- self.name = name
- self.address = address
- self.stats = stats
- self.last_seen = time.time()
-
- self.job = None
- self.job_frames = []
-
- netrender.model.RenderSlave._slave_map[self.id] = self
-
- def seen(self):
- self.last_seen = time.time()
-
- def finishedFrame(self, frame_number):
- self.job_frames.remove(frame_number)
- if not self.job_frames:
- self.job = None
-
-class MRenderJob(netrender.model.RenderJob):
- def __init__(self, job_id, job_info):
- super().__init__(job_info)
- self.id = job_id
- self.last_dispatched = time.time()
-
- # force one chunk for process jobs
- if self.type == netrender.model.JOB_PROCESS:
- self.chunks = 1
-
- # Force WAITING status on creation
- self.status = JOB_WAITING
-
- # special server properties
- self.last_update = 0
- self.save_path = ""
- self.files = [MRenderFile(rfile.filepath, rfile.index, rfile.start, rfile.end, rfile.signature) for rfile in job_info.files]
-
- def initInfo(self):
- if not self.resolution:
- self.resolution = tuple(getFileInfo(self.files[0].filepath, ["bpy.context.scene.render.resolution_x", "bpy.context.scene.render.resolution_y", "bpy.context.scene.render.resolution_percentage"]))
-
- def save(self):
- if self.save_path:
- f = open(os.path.join(self.save_path, "job.txt"), "w")
- f.write(json.dumps(self.serialize()))
- f.close()
-
- def edit(self, info_map):
- if "status" in info_map:
- self.status = info_map["status"]
-
- if "priority" in info_map:
- self.priority = info_map["priority"]
-
- if "chunks" in info_map:
- self.chunks = info_map["chunks"]
-
- def testStart(self):
- # Don't test files for versionned jobs
- if not self.version_info:
- for f in self.files:
- if not f.test():
- return False
-
- self.start()
- self.initInfo()
- return True
-
- def testFinished(self):
- for f in self.frames:
- if f.status == QUEUED or f.status == DISPATCHED:
- break
- else:
- self.status = JOB_FINISHED
-
- def pause(self, status = None):
- if self.status not in {JOB_PAUSED, JOB_QUEUED}:
- return
-
- if status is None:
- self.status = JOB_PAUSED if self.status == JOB_QUEUED else JOB_QUEUED
- elif status:
- self.status = JOB_QUEUED
- else:
- self.status = JOB_PAUSED
-
- def start(self):
- self.status = JOB_QUEUED
-
- def addLog(self, frames):
- log_name = "_".join(("%06d" % f for f in frames)) + ".log"
- log_path = os.path.join(self.save_path, log_name)
-
- for number in frames:
- frame = self[number]
- if frame:
- frame.log_path = log_path
-
- def addFrame(self, frame_number, command):
- frame = MRenderFrame(frame_number, command)
- self.frames.append(frame)
- return frame
-
- def reset(self, all):
- for f in self.frames:
- f.reset(all)
-
- def getFrames(self):
- frames = []
- for f in self.frames:
- if f.status == QUEUED:
- self.last_dispatched = time.time()
- frames.append(f)
- if len(frames) >= self.chunks:
- break
-
- return frames
-
-class MRenderFrame(netrender.model.RenderFrame):
- def __init__(self, frame, command):
- super().__init__()
- self.number = frame
- self.slave = None
- self.time = 0
- self.status = QUEUED
- self.command = command
-
- self.log_path = None
-
- def reset(self, all):
- if all or self.status == ERROR:
- self.log_path = None
- self.slave = None
- self.time = 0
- self.status = QUEUED
-
-
-# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
-# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
-# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-file_pattern = re.compile("/file_([a-zA-Z0-9]+)_([0-9]+)")
-render_pattern = re.compile("/render_([a-zA-Z0-9]+)_([0-9]+).exr")
-thumb_pattern = re.compile("/thumb_([a-zA-Z0-9]+)_([0-9]+).jpg")
-log_pattern = re.compile("/log_([a-zA-Z0-9]+)_([0-9]+).log")
-reset_pattern = re.compile("/reset(all|)_([a-zA-Z0-9]+)_([0-9]+)")
-cancel_pattern = re.compile("/cancel_([a-zA-Z0-9]+)")
-pause_pattern = re.compile("/pause_([a-zA-Z0-9]+)")
-edit_pattern = re.compile("/edit_([a-zA-Z0-9]+)")
-
-class RenderHandler(http.server.BaseHTTPRequestHandler):
- def log_message(self, format, *args):
- # override because the original calls self.address_string(), which
- # is extremely slow due to some timeout..
- sys.stderr.write("[%s] %s\n" % (self.log_date_time_string(), format%args))
-
- def getInfoMap(self):
- length = int(self.headers['content-length'])
-
- if length > 0:
- msg = str(self.rfile.read(length), encoding='utf8')
- return json.loads(msg)
- else:
- return {}
-
- def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"):
- self.send_response(code)
- self.send_header("Content-type", content)
-
- for key, value in headers.items():
- self.send_header(key, value)
-
- self.end_headers()
-
- def do_HEAD(self):
-
- if self.path == "/status":
- job_id = self.headers.get('job-id', "")
- job_frame = int(self.headers.get('job-frame', -1))
-
- job = self.server.getJobID(job_id)
- if job:
- frame = job[job_frame]
-
-
- if frame:
- self.send_head(http.client.OK)
- else:
- # no such frame
- self.send_head(http.client.NO_CONTENT)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
-
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
-
- def do_GET(self):
-
- if self.path == "/version":
- self.send_head()
- self.server.stats("", "Version check")
- self.wfile.write(VERSION)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/render"):
- match = render_pattern.match(self.path)
-
- if match:
- job_id = match.groups()[0]
- frame_number = int(match.groups()[1])
-
- job = self.server.getJobID(job_id)
-
- if job:
- frame = job[frame_number]
-
- if frame:
- if frame.status in (QUEUED, DISPATCHED):
- self.send_head(http.client.ACCEPTED)
- elif frame.status == DONE:
- self.server.stats("", "Sending result to client")
-
- filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
-
- f = open(filename, 'rb')
- self.send_head(content = "image/x-exr")
- shutil.copyfileobj(f, self.wfile)
- f.close()
- elif frame.status == ERROR:
- self.send_head(http.client.PARTIAL_CONTENT)
- else:
- # no such frame
- self.send_head(http.client.NO_CONTENT)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/thumb"):
- match = thumb_pattern.match(self.path)
-
- if match:
- job_id = match.groups()[0]
- frame_number = int(match.groups()[1])
-
- job = self.server.getJobID(job_id)
-
- if job:
- frame = job[frame_number]
-
- if frame:
- if frame.status in (QUEUED, DISPATCHED):
- self.send_head(http.client.ACCEPTED)
- elif frame.status == DONE:
- filename = os.path.join(job.save_path, "%06d.exr" % frame_number)
-
- thumbname = thumbnail.generate(filename)
-
- if thumbname:
- f = open(thumbname, 'rb')
- self.send_head(content = "image/jpeg")
- shutil.copyfileobj(f, self.wfile)
- f.close()
- else: # thumbnail couldn't be generated
- self.send_head(http.client.PARTIAL_CONTENT)
- return
- elif frame.status == ERROR:
- self.send_head(http.client.PARTIAL_CONTENT)
- else:
- # no such frame
- self.send_head(http.client.NO_CONTENT)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/log"):
- match = log_pattern.match(self.path)
-
- if match:
- job_id = match.groups()[0]
- frame_number = int(match.groups()[1])
-
- job = self.server.getJobID(job_id)
-
- if job:
- frame = job[frame_number]
-
- if frame:
- if not frame.log_path or frame.status in (QUEUED, DISPATCHED):
- self.send_head(http.client.PROCESSING)
- else:
- self.server.stats("", "Sending log to client")
- f = open(frame.log_path, 'rb')
-
- self.send_head(content = "text/plain")
-
- shutil.copyfileobj(f, self.wfile)
-
- f.close()
- else:
- # no such frame
- self.send_head(http.client.NO_CONTENT)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid URL
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/status":
- job_id = self.headers.get('job-id', "")
- job_frame = int(self.headers.get('job-frame', -1))
-
- if job_id:
-
- job = self.server.getJobID(job_id)
- if job:
- if job_frame != -1:
- frame = job[frame]
-
- if frame:
- message = frame.serialize()
- else:
- # no such frame
- self.send_heat(http.client.NO_CONTENT)
- return
- else:
- message = job.serialize()
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- return
- else: # status of all jobs
- message = []
-
- for job in self.server:
- message.append(job.serialize())
-
-
- self.server.stats("", "Sending status")
- self.send_head()
- self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
-
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/job":
- self.server.balance()
-
- slave_id = self.headers['slave-id']
-
- slave = self.server.getSeenSlave(slave_id)
-
- if slave: # only if slave id is valid
- job, frames = self.server.newDispatch(slave_id)
-
- if job and frames:
- for f in frames:
- print("dispatch", f.number)
- f.status = DISPATCHED
- f.slave = slave
-
- slave.job = job
- slave.job_frames = [f.number for f in frames]
-
- self.send_head(headers={"job-id": job.id})
-
- message = job.serialize(frames)
-
- self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
-
- self.server.stats("", "Sending job to slave")
- else:
- # no job available, return error code
- slave.job = None
- slave.job_frames = []
-
- self.send_head(http.client.ACCEPTED)
- else: # invalid slave id
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/file"):
- match = file_pattern.match(self.path)
-
- if match:
- slave_id = self.headers['slave-id']
- slave = self.server.getSeenSlave(slave_id)
-
- if not slave:
- # invalid slave id
- print("invalid slave id")
-
- job_id = match.groups()[0]
- file_index = int(match.groups()[1])
-
- job = self.server.getJobID(job_id)
-
- if job:
- render_file = job.files[file_index]
-
- if render_file:
- self.server.stats("", "Sending file to slave")
- f = open(render_file.filepath, 'rb')
-
- self.send_head()
- shutil.copyfileobj(f, self.wfile)
-
- f.close()
- else:
- # no such file
- self.send_head(http.client.NO_CONTENT)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else: # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/slaves":
- message = []
-
- self.server.stats("", "Sending slaves status")
-
- for slave in self.server.slaves:
- message.append(slave.serialize())
-
- self.send_head()
-
- self.wfile.write(bytes(json.dumps(message), encoding='utf8'))
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- else:
- # hand over the rest to the html section
- netrender.master_html.get(self)
-
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- def do_POST(self):
-
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- if self.path == "/job":
-
- length = int(self.headers['content-length'])
-
- job_info = netrender.model.RenderJob.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
-
- job_id = self.server.nextJobID()
-
- job = MRenderJob(job_id, job_info)
-
- for frame in job_info.frames:
- frame = job.addFrame(frame.number, frame.command)
-
- self.server.addJob(job)
-
- headers={"job-id": job_id}
-
- if job.testStart():
- self.server.stats("", "New job, started")
- self.send_head(headers=headers)
- else:
- self.server.stats("", "New job, missing files (%i total)" % len(job.files))
- self.send_head(http.client.ACCEPTED, headers=headers)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/edit"):
- match = edit_pattern.match(self.path)
-
- if match:
- job_id = match.groups()[0]
-
- job = self.server.getJobID(job_id)
-
- if job:
- info_map = self.getInfoMap()
-
- job.edit(info_map)
- self.send_head()
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/balance_limit":
- info_map = self.getInfoMap()
- for rule_id, limit in info_map.items():
- try:
- rule = self.server.balancer.ruleByID(rule_id)
- if rule:
- rule.setLimit(limit)
- except:
- pass # invalid type
-
- self.send_head()
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/balance_enable":
- info_map = self.getInfoMap()
- for rule_id, enabled in info_map.items():
- rule = self.server.balancer.ruleByID(rule_id)
- if rule:
- rule.enabled = enabled
-
- self.send_head()
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/cancel"):
- match = cancel_pattern.match(self.path)
-
- if match:
- info_map = self.getInfoMap()
- clear = info_map.get("clear", False)
-
- job_id = match.groups()[0]
-
- job = self.server.getJobID(job_id)
-
- if job:
- self.server.stats("", "Cancelling job")
- self.server.removeJob(job, clear)
- self.send_head()
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/pause"):
- match = pause_pattern.match(self.path)
-
- if match:
- info_map = self.getInfoMap()
- status = info_map.get("status", None)
-
- job_id = match.groups()[0]
-
- job = self.server.getJobID(job_id)
-
- if job:
- self.server.stats("", "Pausing job")
- job.pause(status)
- self.send_head()
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else:
- # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/clear":
- # cancel all jobs
- info_map = self.getInfoMap()
- clear = info_map.get("clear", False)
-
- self.server.stats("", "Clearing jobs")
- self.server.clear(clear)
-
- self.send_head()
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/reset"):
- match = reset_pattern.match(self.path)
-
- if match:
- all = match.groups()[0] == 'all'
- job_id = match.groups()[1]
- job_frame = int(match.groups()[2])
-
- job = self.server.getJobID(job_id)
-
- if job:
- if job_frame != 0:
-
- frame = job[job_frame]
- if frame:
- self.server.stats("", "Reset job frame")
- frame.reset(all)
- self.send_head()
- else:
- # no such frame
- self.send_head(http.client.NO_CONTENT)
-
- else:
- self.server.stats("", "Reset job")
- job.reset(all)
- self.send_head()
-
- else: # job not found
- self.send_head(http.client.NO_CONTENT)
- else: # invalid url
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/slave":
- length = int(self.headers['content-length'])
- job_frame_string = self.headers['job-frame']
-
- self.server.stats("", "New slave connected")
-
- slave_info = netrender.model.RenderSlave.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')), cache = False)
-
- slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats)
-
- self.send_head(headers = {"slave-id": slave_id})
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/log":
- length = int(self.headers['content-length'])
-
- log_info = netrender.model.LogFile.materialize(json.loads(str(self.rfile.read(length), encoding='utf8')))
-
- slave_id = log_info.slave_id
-
- slave = self.server.getSeenSlave(slave_id)
-
- if slave: # only if slave id is valid
- job = self.server.getJobID(log_info.job_id)
-
- if job:
- self.server.stats("", "Log announcement")
- job.addLog(log_info.frames)
- self.send_head(http.client.OK)
- else:
- # no such job id
- self.send_head(http.client.NO_CONTENT)
- else: # invalid slave id
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- def do_PUT(self):
-
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- if self.path.startswith("/file"):
- match = file_pattern.match(self.path)
-
- if match:
- self.server.stats("", "Receiving job")
-
- length = int(self.headers['content-length'])
- job_id = match.groups()[0]
- file_index = int(match.groups()[1])
-
- job = self.server.getJobID(job_id)
-
- if job:
-
- render_file = job.files[file_index]
-
- if render_file:
- main_file = job.files[0].filepath # filename of the first file
-
- main_path, main_name = os.path.split(main_file)
-
- if file_index > 0:
- file_path = prefixPath(job.save_path, render_file.filepath, main_path)
- else:
- file_path = os.path.join(job.save_path, main_name)
-
- buf = self.rfile.read(length)
-
- # add same temp file + renames as slave
-
- f = open(file_path, "wb")
- f.write(buf)
- f.close()
- del buf
-
- render_file.filepath = file_path # set the new path
-
- if job.testStart():
- self.server.stats("", "File upload, starting job")
- self.send_head(http.client.OK)
- else:
- self.server.stats("", "File upload, file missings")
- self.send_head(http.client.ACCEPTED)
- else: # invalid file
- print("file not found", job_id, file_index)
- self.send_head(http.client.NO_CONTENT)
- else: # job not found
- print("job not found", job_id, file_index)
- self.send_head(http.client.NO_CONTENT)
- else: # invalid url
- print("no match")
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/render":
- self.server.stats("", "Receiving render result")
-
- # need some message content here or the slave doesn't like it
- self.wfile.write(bytes("foo", encoding='utf8'))
-
- slave_id = self.headers['slave-id']
-
- slave = self.server.getSeenSlave(slave_id)
-
- if slave: # only if slave id is valid
- job_id = self.headers['job-id']
-
- job = self.server.getJobID(job_id)
-
- if job:
- job_frame = int(self.headers['job-frame'])
- job_result = int(self.headers['job-result'])
- job_time = float(self.headers['job-time'])
-
- frame = job[job_frame]
-
- if frame:
- if job.hasRenderResult():
- if job_result == DONE:
- length = int(self.headers['content-length'])
- buf = self.rfile.read(length)
- f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb')
- f.write(buf)
- f.close()
-
- del buf
- elif job_result == ERROR:
- # blacklist slave on this job on error
- # slaves might already be in blacklist if errors on the whole chunk
- if not slave.id in job.blacklist:
- job.blacklist.append(slave.id)
-
- slave.finishedFrame(job_frame)
-
- frame.status = job_result
- frame.time = job_time
-
- job.testFinished()
-
- self.send_head()
- else: # frame not found
- self.send_head(http.client.NO_CONTENT)
- else: # job not found
- self.send_head(http.client.NO_CONTENT)
- else: # invalid slave id
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path == "/thumb":
- self.server.stats("", "Receiving thumbnail result")
-
- # need some message content here or the slave doesn't like it
- self.wfile.write(bytes("foo", encoding='utf8'))
-
- slave_id = self.headers['slave-id']
-
- slave = self.server.getSeenSlave(slave_id)
-
- if slave: # only if slave id is valid
- job_id = self.headers['job-id']
-
- job = self.server.getJobID(job_id)
-
- if job:
- job_frame = int(self.headers['job-frame'])
-
- frame = job[job_frame]
-
- if frame:
- if job.hasRenderResult():
- length = int(self.headers['content-length'])
- buf = self.rfile.read(length)
- f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb')
- f.write(buf)
- f.close()
-
- del buf
-
- else: # frame not found
- self.send_head(http.client.NO_CONTENT)
- else: # job not found
- self.send_head(http.client.NO_CONTENT)
- else: # invalid slave id
- self.send_head(http.client.NO_CONTENT)
- # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
- elif self.path.startswith("/log"):
- self.server.stats("", "Receiving log file")
-
- match = log_pattern.match(self.path)
-
- if match:
- job_id = match.groups()[0]
-
- job = self.server.getJobID(job_id)
-
- if job:
- job_frame = int(match.groups()[1])
-
- frame = job[job_frame]
-
- if frame and frame.log_path:
- length = int(self.headers['content-length'])
- buf = self.rfile.read(length)
- f = open(frame.log_path, 'ab')
- f.write(buf)
- f.close()
-
- del buf
-
- self.server.getSeenSlave(self.headers['slave-id'])
-
- self.send_head()
- else: # frame not found
- self.send_head(http.client.NO_CONTENT)
- else: # job not found
- self.send_head(http.client.NO_CONTENT)
- else: # invalid url
- self.send_head(http.client.NO_CONTENT)
-
-class RenderMasterServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
- def __init__(self, address, handler_class, path, subdir=True):
- super().__init__(address, handler_class)
- self.jobs = []
- self.jobs_map = {}
- self.slaves = []
- self.slaves_map = {}
- self.job_id = 0
-
- if subdir:
- self.path = os.path.join(path, "master_" + str(os.getpid()))
- else:
- self.path = path
-
- self.slave_timeout = 5 # 5 mins: need a parameter for that
-
- self.balancer = netrender.balancing.Balancer()
- self.balancer.addRule(netrender.balancing.RatingUsageByCategory(self.getJobs))
- self.balancer.addRule(netrender.balancing.RatingUsage())
- self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob())
- self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9))
- self.balancer.addPriority(netrender.balancing.NewJobPriority())
- self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2))
-
- if not os.path.exists(self.path):
- os.mkdir(self.path)
-
- def restore(self, jobs, slaves, balancer = None):
- self.jobs = jobs
- self.jobs_map = {}
-
- for job in self.jobs:
- self.jobs_map[job.id] = job
- self.job_id = max(self.job_id, int(job.id))
-
- self.slaves = slaves
- for slave in self.slaves:
- self.slaves_map[slave.id] = slave
-
- if balancer:
- self.balancer = balancer
-
-
- def nextJobID(self):
- self.job_id += 1
- return str(self.job_id)
-
- def addSlave(self, name, address, stats):
- slave = MRenderSlave(name, address, stats)
- self.slaves.append(slave)
- self.slaves_map[slave.id] = slave
-
- return slave.id
-
- def removeSlave(self, slave):
- self.slaves.remove(slave)
- self.slaves_map.pop(slave.id)
-
- def getSlave(self, slave_id):
- return self.slaves_map.get(slave_id)
-
- def getSeenSlave(self, slave_id):
- slave = self.getSlave(slave_id)
- if slave:
- slave.seen()
-
- return slave
-
- def timeoutSlaves(self):
- removed = []
-
- t = time.time()
-
- for slave in self.slaves:
- if (t - slave.last_seen) / 60 > self.slave_timeout:
- removed.append(slave)
-
- if slave.job:
- for f in slave.job_frames:
- slave.job[f].status = ERROR
-
- for slave in removed:
- self.removeSlave(slave)
-
- def updateUsage(self):
- blend = 0.5
- for job in self.jobs:
- job.usage *= (1 - blend)
-
- if self.slaves:
- slave_usage = blend / self.countSlaves()
-
- for slave in self.slaves:
- if slave.job:
- slave.job.usage += slave_usage
-
-
- def clear(self, clear_files = False):
- removed = self.jobs[:]
-
- for job in removed:
- self.removeJob(job, clear_files)
-
- def balance(self):
- self.balancer.balance(self.jobs)
-
- def getJobs(self):
- return self.jobs
-
- def countJobs(self, status = JOB_QUEUED):
- total = 0
- for j in self.jobs:
- if j.status == status:
- total += 1
-
- return total
-
- def countSlaves(self):
- return len(self.slaves)
-
- def removeJob(self, job, clear_files = False):
- self.jobs.remove(job)
- self.jobs_map.pop(job.id)
-
- if clear_files:
- shutil.rmtree(job.save_path)
-
- for slave in self.slaves:
- if slave.job == job:
- slave.job = None
- slave.job_frames = []
-
- def addJob(self, job):
- self.jobs.append(job)
- self.jobs_map[job.id] = job
-
- # create job directory
- job.save_path = os.path.join(self.path, "job_" + job.id)
- if not os.path.exists(job.save_path):
- os.mkdir(job.save_path)
-
- job.save()
-
- def getJobID(self, id):
- return self.jobs_map.get(id)
-
- def __iter__(self):
- for job in self.jobs:
- yield job
-
- def newDispatch(self, slave_id):
- if self.jobs:
- for job in self.jobs:
- if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist:
- return job, job.getFrames()
-
- return None, None
-
-def clearMaster(path):
- shutil.rmtree(path)
-
-def createMaster(address, clear, path):
- filepath = os.path.join(path, "blender_master.data")
-
- if not clear and os.path.exists(filepath):
- print("loading saved master:", filepath)
- with open(filepath, 'rb') as f:
- path, jobs, slaves = pickle.load(f)
-
- httpd = RenderMasterServer(address, RenderHandler, path, subdir=False)
- httpd.restore(jobs, slaves)
-
- return httpd
-
- return RenderMasterServer(address, RenderHandler, path)
-
-def saveMaster(path, httpd):
- filepath = os.path.join(path, "blender_master.data")
-
- with open(filepath, 'wb') as f:
- pickle.dump((httpd.path, httpd.jobs, httpd.slaves), f, pickle.HIGHEST_PROTOCOL)
-
-def runMaster(address, broadcast, clear, path, update_stats, test_break):
- httpd = createMaster(address, clear, path)
- httpd.timeout = 1
- httpd.stats = update_stats
-
- if broadcast:
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
-
- start_time = time.time() - 2
-
- while not test_break():
- try:
- httpd.handle_request()
- except select.error:
- pass
-
- if time.time() - start_time >= 2: # need constant here
- httpd.timeoutSlaves()
-
- httpd.updateUsage()
-
- if broadcast:
- print("broadcasting address")
- s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000))
- start_time = time.time()
-
- httpd.server_close()
- if clear:
- clearMaster(httpd.path)
- else:
- saveMaster(path, httpd)
-