diff options
Diffstat (limited to 'release/scripts/io/netrender')
-rw-r--r-- | release/scripts/io/netrender/__init__.py | 19 | ||||
-rw-r--r-- | release/scripts/io/netrender/balancing.py | 94 | ||||
-rw-r--r-- | release/scripts/io/netrender/client.py | 203 | ||||
-rw-r--r-- | release/scripts/io/netrender/master.py | 760 | ||||
-rw-r--r-- | release/scripts/io/netrender/master_html.py | 135 | ||||
-rw-r--r-- | release/scripts/io/netrender/model.py | 212 | ||||
-rw-r--r-- | release/scripts/io/netrender/operators.py | 423 | ||||
-rw-r--r-- | release/scripts/io/netrender/slave.py | 224 | ||||
-rw-r--r-- | release/scripts/io/netrender/ui.py | 321 | ||||
-rw-r--r-- | release/scripts/io/netrender/utils.py | 86 |
10 files changed, 2477 insertions, 0 deletions
diff --git a/release/scripts/io/netrender/__init__.py b/release/scripts/io/netrender/__init__.py new file mode 100644 index 00000000000..4a1dd2238e3 --- /dev/null +++ b/release/scripts/io/netrender/__init__.py @@ -0,0 +1,19 @@ +# This directory is a Python package. + +import model +import operators +import client +import slave +import master +import master_html +import utils +import balancing +import ui + +# store temp data in bpy module + +import bpy + +bpy.data.netrender_jobs = [] +bpy.data.netrender_slaves = [] +bpy.data.netrender_blacklist = []
\ No newline at end of file diff --git a/release/scripts/io/netrender/balancing.py b/release/scripts/io/netrender/balancing.py new file mode 100644 index 00000000000..637dd5ff92e --- /dev/null +++ b/release/scripts/io/netrender/balancing.py @@ -0,0 +1,94 @@ +import time + +from netrender.utils import * +import netrender.model + +class RatingRule: + def rate(self, job): + return 0 + +class ExclusionRule: + def test(self, job): + return False + +class PriorityRule: + def test(self, job): + return False + +class Balancer: + def __init__(self): + self.rules = [] + self.priorities = [] + self.exceptions = [] + + def addRule(self, rule): + self.rules.append(rule) + + def addPriority(self, priority): + self.priorities.append(priority) + + def addException(self, exception): + self.exceptions.append(exception) + + def applyRules(self, job): + return sum((rule.rate(job) for rule in self.rules)) + + def applyPriorities(self, job): + for priority in self.priorities: + if priority.test(job): + return True # priorities are first + + return False + + def applyExceptions(self, job): + for exception in self.exceptions: + if exception.test(job): + return True # exceptions are last + + return False + + def sortKey(self, job): + return (1 if self.applyExceptions(job) else 0, # exceptions after + 0 if self.applyPriorities(job) else 1, # priorities first + self.applyRules(job)) + + def balance(self, jobs): + if jobs: + jobs.sort(key=self.sortKey) + return jobs[0] + else: + return None + +# ========================== + +class RatingUsage(RatingRule): + def rate(self, job): + # less usage is better + return job.usage / job.priority + +class NewJobPriority(PriorityRule): + def __init__(self, limit = 1): + self.limit = limit + + def test(self, job): + return job.countFrames(status = DONE) < self.limit + +class MinimumTimeBetweenDispatchPriority(PriorityRule): + def __init__(self, limit = 10): + self.limit = limit + + def test(self, job): + return job.countFrames(status = DISPATCHED) == 0 and (time.time() - job.last_dispatched) / 60 > self.limit + +class ExcludeQueuedEmptyJob(ExclusionRule): + def test(self, job): + return job.status != JOB_QUEUED or job.countFrames(status = QUEUED) == 0 + +class ExcludeSlavesLimit(ExclusionRule): + def __init__(self, count_jobs, count_slaves, limit = 0.75): + self.count_jobs = count_jobs + self.count_slaves = count_slaves + self.limit = limit + + def test(self, job): + return not ( self.count_jobs() == 1 or self.count_slaves() <= 1 or float(job.countSlaves() + 1) / self.count_slaves() <= self.limit ) diff --git a/release/scripts/io/netrender/client.py b/release/scripts/io/netrender/client.py new file mode 100644 index 00000000000..1897d1fd949 --- /dev/null +++ b/release/scripts/io/netrender/client.py @@ -0,0 +1,203 @@ +import bpy +import sys, os, re +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +import netrender.model +import netrender.slave as slave +import netrender.master as master +from netrender.utils import * + +def clientSendJob(conn, scene, anim = False): + netsettings = scene.network_render + job = netrender.model.RenderJob() + + if anim: + for f in range(scene.start_frame, scene.end_frame + 1): + job.addFrame(f) + else: + job.addFrame(scene.current_frame) + + filename = bpy.data.filename + job.addFile(filename) + + job_name = netsettings.job_name + path, name = os.path.split(filename) + if job_name == "[default]": + job_name = name + + ########################### + # LIBRARIES + ########################### + for lib in bpy.data.libraries: + lib_path = lib.filename + + if lib_path.startswith("//"): + lib_path = path + os.sep + lib_path[2:] + + job.addFile(lib_path) + + ########################### + # POINT CACHES + ########################### + + root, ext = os.path.splitext(name) + cache_path = path + os.sep + "blendcache_" + root + os.sep # need an API call for that + + if os.path.exists(cache_path): + caches = {} + pattern = re.compile("([a-zA-Z0-9]+)_([0-9]+)_[0-9]+\.bphys") + for cache_file in sorted(os.listdir(cache_path)): + match = pattern.match(cache_file) + + if match: + cache_id = match.groups()[0] + cache_frame = int(match.groups()[1]) + + cache_files = caches.get(cache_id, []) + cache_files.append((cache_frame, cache_file)) + caches[cache_id] = cache_files + + for cache in caches.values(): + cache.sort() + + if len(cache) == 1: + cache_frame, cache_file = cache[0] + job.addFile(cache_path + cache_file, cache_frame, cache_frame) + else: + for i in range(len(cache)): + current_item = cache[i] + next_item = cache[i+1] if i + 1 < len(cache) else None + previous_item = cache[i - 1] if i > 0 else None + + current_frame, current_file = current_item + + if not next_item and not previous_item: + job.addFile(cache_path + current_file, current_frame, current_frame) + elif next_item and not previous_item: + next_frame = next_item[0] + job.addFile(cache_path + current_file, current_frame, next_frame - 1) + elif not next_item and previous_item: + previous_frame = previous_item[0] + job.addFile(cache_path + current_file, previous_frame + 1, current_frame) + else: + next_frame = next_item[0] + previous_frame = previous_item[0] + job.addFile(cache_path + current_file, previous_frame + 1, next_frame - 1) + + ########################### + # IMAGES + ########################### + for image in bpy.data.images: + if image.source == "FILE" and not image.packed_file: + job.addFile(image.filename) + + # print(job.files) + + job.name = job_name + + for slave in scene.network_render.slaves_blacklist: + job.blacklist.append(slave.id) + + job.chunks = netsettings.chunks + job.priority = netsettings.priority + + # try to send path first + conn.request("POST", "/job", repr(job.serialize())) + response = conn.getresponse() + + job_id = response.getheader("job-id") + + # if not ACCEPTED (but not processed), send files + if response.status == http.client.ACCEPTED: + for filepath, start, end in job.files: + f = open(filepath, "rb") + conn.request("PUT", "/file", f, headers={"job-id": job_id, "job-file": filepath}) + f.close() + response = conn.getresponse() + + # server will reply with NOT_FOUD until all files are found + + return job_id + +def requestResult(conn, job_id, frame): + conn.request("GET", "/render", headers={"job-id": job_id, "job-frame":str(frame)}) + +@rnaType +class NetworkRenderEngine(bpy.types.RenderEngine): + __idname__ = 'NET_RENDER' + __label__ = "Network Render" + def render(self, scene): + if scene.network_render.mode == "RENDER_CLIENT": + self.render_client(scene) + elif scene.network_render.mode == "RENDER_SLAVE": + self.render_slave(scene) + elif scene.network_render.mode == "RENDER_MASTER": + self.render_master(scene) + else: + print("UNKNOWN OPERATION MODE") + + def render_master(self, scene): + netsettings = scene.network_render + + address = "" if netsettings.server_address == "[default]" else netsettings.server_address + + master.runMaster((address, netsettings.server_port), netsettings.server_broadcast, netsettings.path, self.update_stats, self.test_break) + + + def render_slave(self, scene): + slave.render_slave(self, scene) + + def render_client(self, scene): + netsettings = scene.network_render + self.update_stats("", "Network render client initiation") + + + conn = clientConnection(scene) + + if conn: + # Sending file + + self.update_stats("", "Network render exporting") + + job_id = netsettings.job_id + + # reading back result + + self.update_stats("", "Network render waiting for results") + + requestResult(conn, job_id, scene.current_frame) + response = conn.getresponse() + + if response.status == http.client.NO_CONTENT: + netsettings.job_id = clientSendJob(conn, scene) + requestResult(conn, job_id, scene.current_frame) + + while response.status == http.client.ACCEPTED and not self.test_break(): + time.sleep(1) + requestResult(conn, job_id, scene.current_frame) + response = conn.getresponse() + + if response.status != http.client.OK: + conn.close() + return + + r = scene.render_data + x= int(r.resolution_x*r.resolution_percentage*0.01) + y= int(r.resolution_y*r.resolution_percentage*0.01) + + f = open(netsettings.path + "output.exr", "wb") + buf = response.read(1024) + + while buf: + f.write(buf) + buf = response.read(1024) + + f.close() + + result = self.begin_result(0, 0, x, y) + result.load_from_file(netsettings.path + "output.exr", 0, 0) + self.end_result(result) + + conn.close() + diff --git a/release/scripts/io/netrender/master.py b/release/scripts/io/netrender/master.py new file mode 100644 index 00000000000..be23fda7a91 --- /dev/null +++ b/release/scripts/io/netrender/master.py @@ -0,0 +1,760 @@ +import sys, os +import http, http.client, http.server, urllib, socket +import subprocess, shutil, time, hashlib + +from netrender.utils import * +import netrender.model +import netrender.balancing +import netrender.master_html + +class MRenderFile: + def __init__(self, filepath, start, end): + self.filepath = filepath + self.start = start + self.end = end + self.found = False + + def test(self): + self.found = os.path.exists(self.filepath) + return self.found + + +class MRenderSlave(netrender.model.RenderSlave): + def __init__(self, name, address, stats): + super().__init__() + self.id = hashlib.md5(bytes(repr(name) + repr(address), encoding='utf8')).hexdigest() + self.name = name + self.address = address + self.stats = stats + self.last_seen = time.time() + + self.job = None + self.job_frames = [] + + netrender.model.RenderSlave._slave_map[self.id] = self + + def seen(self): + self.last_seen = time.time() + + def finishedFrame(self, frame_number): + self.job_frames.remove(frame_number) + if not self.job_frames: + self.job = None + +class MRenderJob(netrender.model.RenderJob): + def __init__(self, job_id, job_type, name, files, chunks = 1, priority = 1, blacklist = []): + super().__init__() + self.id = job_id + self.type = job_type + self.name = name + self.files = files + self.frames = [] + self.chunks = chunks + self.priority = priority + self.usage = 0.0 + self.blacklist = blacklist + self.last_dispatched = time.time() + + # force one chunk for process jobs + if self.type == netrender.model.JOB_PROCESS: + self.chunks = 1 + + # special server properties + self.last_update = 0 + self.save_path = "" + self.files_map = {path: MRenderFile(path, start, end) for path, start, end in files} + self.status = JOB_WAITING + + def save(self): + if self.save_path: + f = open(self.save_path + "job.txt", "w") + f.write(repr(self.serialize())) + f.close() + + def testStart(self): + for f in self.files_map.values(): + if not f.test(): + return False + + self.start() + return True + + def testFinished(self): + for f in self.frames: + if f.status == QUEUED or f.status == DISPATCHED: + break + else: + self.status = JOB_FINISHED + + def start(self): + self.status = JOB_QUEUED + + def addLog(self, frames): + log_name = "_".join(("%04d" % f for f in frames)) + ".log" + log_path = self.save_path + log_name + + for number in frames: + frame = self[number] + if frame: + frame.log_path = log_path + + def addFrame(self, frame_number, command): + frame = MRenderFrame(frame_number, command) + self.frames.append(frame) + return frame + + def reset(self, all): + for f in self.frames: + f.reset(all) + + def getFrames(self): + frames = [] + for f in self.frames: + if f.status == QUEUED: + self.last_dispatched = time.time() + frames.append(f) + if len(frames) >= self.chunks: + break + + return frames + +class MRenderFrame(netrender.model.RenderFrame): + def __init__(self, frame, command): + super().__init__() + self.number = frame + self.slave = None + self.time = 0 + self.status = QUEUED + self.command = command + + self.log_path = None + + def reset(self, all): + if all or self.status == ERROR: + self.slave = None + self.time = 0 + self.status = QUEUED + + +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- +# -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= +# =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + +class RenderHandler(http.server.BaseHTTPRequestHandler): + def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"): + self.send_response(code) + self.send_header("Content-type", content) + + for key, value in headers.items(): + self.send_header(key, value) + + self.end_headers() + + def do_HEAD(self): + + if self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + job = self.server.getJobID(job_id) + if job: + frame = job[job_frame] + + + if frame: + self.send_head(http.client.OK) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + + def do_GET(self): + + if self.path == "/version": + self.send_head() + self.server.stats("", "Version check") + self.wfile.write(VERSION) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/render": + job_id = self.headers['job-id'] + job_frame = int(self.headers['job-frame']) + + job = self.server.getJobID(job_id) + + if job: + frame = job[job_frame] + + if frame: + if frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.ACCEPTED) + elif frame.status == DONE: + self.server.stats("", "Sending result to client") + f = open(job.save_path + "%04d" % job_frame + ".exr", 'rb') + + self.send_head() + + shutil.copyfileobj(f, self.wfile) + + f.close() + elif frame.status == ERROR: + self.send_head(http.client.PARTIAL_CONTENT) + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/log": + job_id = self.headers['job-id'] + job_frame = int(self.headers['job-frame']) + + job = self.server.getJobID(job_id) + + if job: + frame = job[job_frame] + + if frame: + if not frame.log_path or frame.status in (QUEUED, DISPATCHED): + self.send_head(http.client.PROCESSING) + else: + self.server.stats("", "Sending log to client") + f = open(frame.log_path, 'rb') + + self.send_head() + + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/status": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', -1)) + + if job_id: + + job = self.server.getJobID(job_id) + if job: + if job_frame != -1: + frame = job[frame] + + if frame: + message = frame.serialize() + else: + # no such frame + self.send_heat(http.client.NO_CONTENT) + return + else: + message = job.serialize() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + return + else: # status of all jobs + message = [] + + for job in self.server: + message.append(job.serialize()) + + + self.server.stats("", "Sending status") + self.send_head() + self.wfile.write(bytes(repr(message), encoding='utf8')) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/job": + self.server.balance() + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job, frames = self.server.newDispatch(slave_id) + + if job and frames: + for f in frames: + print("dispatch", f.number) + f.status = DISPATCHED + f.slave = slave + + slave.job = job + slave.job_frames = [f.number for f in frames] + + self.send_head(headers={"job-id": job.id}) + + message = job.serialize(frames) + + self.wfile.write(bytes(repr(message), encoding='utf8')) + + self.server.stats("", "Sending job to slave") + else: + # no job available, return error code + slave.job = None + slave.job_frames = [] + + self.send_head(http.client.ACCEPTED) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/file": + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + job_file = self.headers['job-file'] + + job = self.server.getJobID(job_id) + + if job: + render_file = job.files_map.get(job_file, None) + + if render_file: + self.server.stats("", "Sending file to slave") + f = open(render_file.filepath, 'rb') + + self.send_head() + shutil.copyfileobj(f, self.wfile) + + f.close() + else: + # no such file + self.send_head(http.client.NO_CONTENT) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slaves": + message = [] + + self.server.stats("", "Sending slaves status") + + for slave in self.server.slaves: + message.append(slave.serialize()) + + self.send_head() + + self.wfile.write(bytes(repr(message), encoding='utf8')) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + else: + # hand over the rest to the html section + netrender.master_html.get(self) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_POST(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path == "/job": + + length = int(self.headers['content-length']) + + job_info = netrender.model.RenderJob.materialize(eval(str(self.rfile.read(length), encoding='utf8'))) + + job_id = self.server.nextJobID() + + job = MRenderJob(job_id, job_info.type, job_info.name, job_info.files, chunks = job_info.chunks, priority = job_info.priority, blacklist = job_info.blacklist) + + for frame in job_info.frames: + frame = job.addFrame(frame.number, frame.command) + + self.server.addJob(job) + + headers={"job-id": job_id} + + if job.testStart(): + self.server.stats("", "New job, missing files") + self.send_head(headers=headers) + else: + self.server.stats("", "New job, started") + self.send_head(http.client.ACCEPTED, headers=headers) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/cancel": + job_id = self.headers.get('job-id', "") + + job = self.server.getJobID(job_id) + + if job: + self.server.stats("", "Cancelling job") + self.server.removeJob(job) + self.send_head() + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/clear": + # cancel all jobs + self.server.stats("", "Clearing jobs") + self.server.clear() + + self.send_head() + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/reset": + job_id = self.headers.get('job-id', "") + job_frame = int(self.headers.get('job-frame', "-1")) + all = bool(self.headers.get('reset-all', "False")) + + job = self.server.getJobID(job_id) + + if job: + if job_frame != -1: + + frame = job[job_frame] + if frame: + self.server.stats("", "Reset job frame") + frame.reset(all) + self.send_head() + else: + # no such frame + self.send_head(http.client.NO_CONTENT) + + else: + self.server.stats("", "Reset job") + job.reset(all) + self.send_head() + + else: # job not found + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/slave": + length = int(self.headers['content-length']) + job_frame_string = self.headers['job-frame'] + + self.server.stats("", "New slave connected") + + slave_info = netrender.model.RenderSlave.materialize(eval(str(self.rfile.read(length), encoding='utf8'))) + + slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats) + + self.send_head(headers = {"slave-id": slave_id}) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/log": + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + length = int(self.headers['content-length']) + + log_info = netrender.model.LogFile.materialize(eval(str(self.rfile.read(length), encoding='utf8'))) + + job = self.server.getJobID(log_info.job_id) + + if job: + self.server.stats("", "Log announcement") + job.addLog(log_info.frames) + self.send_head(http.client.OK) + else: + # no such job id + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + # -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-= + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + def do_PUT(self): + + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + if self.path == "/file": + self.server.stats("", "Receiving job") + + length = int(self.headers['content-length']) + job_id = self.headers['job-id'] + job_file = self.headers['job-file'] + + job = self.server.getJobID(job_id) + + if job: + + render_file = job.files_map.get(job_file, None) + + if render_file: + main_file = job.files[0][0] # filename of the first file + + main_path, main_name = os.path.split(main_file) + + if job_file != main_file: + file_path = prefixPath(job.save_path, job_file, main_path) + else: + file_path = job.save_path + main_name + + buf = self.rfile.read(length) + + # add same temp file + renames as slave + + f = open(file_path, "wb") + f.write(buf) + f.close() + del buf + + render_file.filepath = file_path # set the new path + + if job.testStart(): + self.server.stats("", "File upload, starting job") + self.send_head(http.client.OK) + else: + self.server.stats("", "File upload, file missings") + self.send_head(http.client.ACCEPTED) + else: # invalid file + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/render": + self.server.stats("", "Receiving render result") + + slave_id = self.headers['slave-id'] + + slave = self.server.getSeenSlave(slave_id) + + if slave: # only if slave id is valid + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + job_result = int(self.headers['job-result']) + job_time = float(self.headers['job-time']) + + frame = job[job_frame] + + if frame: + if job.type == netrender.model.JOB_BLENDER: + if job_result == DONE: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(job.save_path + "%04d" % job_frame + ".exr", 'wb') + f.write(buf) + f.close() + + del buf + elif job_result == ERROR: + # blacklist slave on this job on error + job.blacklist.append(slave.id) + + self.server.stats("", "Receiving result") + + slave.finishedFrame(job_frame) + + frame.status = job_result + frame.time = job_time + + job.testFinished() + + self.send_head() + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + else: # invalid slave id + self.send_head(http.client.NO_CONTENT) + # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- + elif self.path == "/log": + self.server.stats("", "Receiving log file") + + job_id = self.headers['job-id'] + + job = self.server.getJobID(job_id) + + if job: + job_frame = int(self.headers['job-frame']) + + frame = job[job_frame] + + if frame and frame.log_path: + length = int(self.headers['content-length']) + buf = self.rfile.read(length) + f = open(frame.log_path, 'ab') + f.write(buf) + f.close() + + del buf + + self.server.getSeenSlave(self.headers['slave-id']) + + self.send_head() + else: # frame not found + self.send_head(http.client.NO_CONTENT) + else: # job not found + self.send_head(http.client.NO_CONTENT) + +class RenderMasterServer(http.server.HTTPServer): + def __init__(self, address, handler_class, path): + super().__init__(address, handler_class) + self.jobs = [] + self.jobs_map = {} + self.slaves = [] + self.slaves_map = {} + self.job_id = 0 + self.path = path + "master_" + str(os.getpid()) + os.sep + + self.slave_timeout = 2 + + self.balancer = netrender.balancing.Balancer() + self.balancer.addRule(netrender.balancing.RatingUsage()) + self.balancer.addException(netrender.balancing.ExcludeQueuedEmptyJob()) + self.balancer.addException(netrender.balancing.ExcludeSlavesLimit(self.countJobs, self.countSlaves, limit = 0.9)) + self.balancer.addPriority(netrender.balancing.NewJobPriority()) + self.balancer.addPriority(netrender.balancing.MinimumTimeBetweenDispatchPriority(limit = 2)) + + if not os.path.exists(self.path): + os.mkdir(self.path) + + def nextJobID(self): + self.job_id += 1 + return str(self.job_id) + + def addSlave(self, name, address, stats): + slave = MRenderSlave(name, address, stats) + self.slaves.append(slave) + self.slaves_map[slave.id] = slave + + return slave.id + + def removeSlave(self, slave): + self.slaves.remove(slave) + self.slaves_map.pop(slave.id) + + def getSlave(self, slave_id): + return self.slaves_map.get(slave_id, None) + + def getSeenSlave(self, slave_id): + slave = self.getSlave(slave_id) + if slave: + slave.seen() + + return slave + + def timeoutSlaves(self): + removed = [] + + t = time.time() + + for slave in self.slaves: + if (t - slave.last_seen) / 60 > self.slave_timeout: + removed.append(slave) + + if slave.job: + for f in slave.job_frames: + slave.job[f].status = ERROR + + for slave in removed: + self.removeSlave(slave) + + def updateUsage(self): + blend = 0.5 + for job in self.jobs: + job.usage *= (1 - blend) + + if self.slaves: + slave_usage = blend / self.countSlaves() + + for slave in self.slaves: + if slave.job: + slave.job.usage += slave_usage + + + def clear(self): + removed = self.jobs[:] + + for job in removed: + self.removeJob(job) + + def balance(self): + self.balancer.balance(self.jobs) + + def countJobs(self, status = JOB_QUEUED): + total = 0 + for j in self.jobs: + if j.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(self.slaves) + + def removeJob(self, job): + self.jobs.remove(job) + self.jobs_map.pop(job.id) + + for slave in self.slaves: + if slave.job == job: + slave.job = None + slave.job_frames = [] + + def addJob(self, job): + self.jobs.append(job) + self.jobs_map[job.id] = job + + # create job directory + job.save_path = self.path + "job_" + job.id + os.sep + if not os.path.exists(job.save_path): + os.mkdir(job.save_path) + + job.save() + + def getJobID(self, id): + return self.jobs_map.get(id, None) + + def __iter__(self): + for job in self.jobs: + yield job + + def newDispatch(self, slave_id): + if self.jobs: + for job in self.jobs: + if not self.balancer.applyExceptions(job) and slave_id not in job.blacklist: + return job, job.getFrames() + + return None, None + +def runMaster(address, broadcast, path, update_stats, test_break): + httpd = RenderMasterServer(address, RenderHandler, path) + httpd.timeout = 1 + httpd.stats = update_stats + + if broadcast: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + start_time = time.time() + + while not test_break(): + httpd.handle_request() + + if time.time() - start_time >= 10: # need constant here + httpd.timeoutSlaves() + + httpd.updateUsage() + + if broadcast: + print("broadcasting address") + s.sendto(bytes("%i" % address[1], encoding='utf8'), 0, ('<broadcast>', 8000)) + start_time = time.time() diff --git a/release/scripts/io/netrender/master_html.py b/release/scripts/io/netrender/master_html.py new file mode 100644 index 00000000000..545659e8dc4 --- /dev/null +++ b/release/scripts/io/netrender/master_html.py @@ -0,0 +1,135 @@ +import re + +from netrender.utils import * + + +def get(handler): + def output(text): + handler.wfile.write(bytes(text, encoding='utf8')) + + def link(text, url): + return "<a href='%s'>%s</a>" % (url, text) + + def startTable(border=1): + output("<table border='%i'>" % border) + + def headerTable(*headers): + output("<thead><tr>") + + for c in headers: + output("<td>" + c + "</td>") + + output("</tr></thead>") + + def rowTable(*data): + output("<tr>") + + for c in data: + output("<td>" + str(c) + "</td>") + + output("</tr>") + + def endTable(): + output("</table>") + + if handler.path == "/html" or handler.path == "/": + handler.send_head(content = "text/html") + output("<html><head><title>NetRender</title></head><body>") + + output("<h2>Master</h2>") + + output("<h2>Slaves</h2>") + + startTable() + headerTable("name", "address", "last seen", "stats", "job") + + for slave in handler.server.slaves: + rowTable(slave.name, slave.address[0], time.ctime(slave.last_seen), slave.stats, link(slave.job.name, "/html/job" + slave.job.id) if slave.job else "None") + + endTable() + + output("<h2>Jobs</h2>") + + startTable() + headerTable( + "name", + "priority", + "usage", + "wait", + "length", + "done", + "dispatched", + "error", + "first", + "exception" + ) + + handler.server.balance() + + for job in handler.server.jobs: + results = job.framesStatus() + rowTable( + link(job.name, "/html/job" + job.id), + job.priority, + "%0.1f%%" % (job.usage * 100), + "%is" % int(time.time() - job.last_dispatched), + len(job), + results[DONE], + results[DISPATCHED], + results[ERROR], + handler.server.balancer.applyPriorities(job), handler.server.balancer.applyExceptions(job) + ) + + endTable() + + output("</body></html>") + + elif handler.path.startswith("/html/job"): + handler.send_head(content = "text/html") + job_id = handler.path[9:] + + output("<html><head><title>NetRender</title></head><body>") + + job = handler.server.getJobID(job_id) + + if job: + output("<h2>Frames</h2>") + + startTable() + headerTable("no", "status", "render time", "slave", "log") + + for frame in job.frames: + rowTable(frame.number, frame.statusText(), "%.1fs" % frame.time, frame.slave.name if frame.slave else " ", link("view log", "/html/log%s_%i" % (job_id, frame.number)) if frame.log_path else " ") + + endTable() + else: + output("no such job") + + output("</body></html>") + + elif handler.path.startswith("/html/log"): + handler.send_head(content = "text/plain") + pattern = re.compile("([a-zA-Z0-9]+)_([0-9]+)") + + match = pattern.match(handler.path[9:]) + if match: + job_id = match.groups()[0] + frame_number = int(match.groups()[1]) + + job = handler.server.getJobID(job_id) + + if job: + frame = job[frame_number] + + if frame: + f = open(frame.log_path, 'rb') + + shutil.copyfileobj(f, handler.wfile) + + f.close() + else: + output("no such frame") + else: + output("no such job") + else: + output("malformed url") diff --git a/release/scripts/io/netrender/model.py b/release/scripts/io/netrender/model.py new file mode 100644 index 00000000000..ca2a42d87f6 --- /dev/null +++ b/release/scripts/io/netrender/model.py @@ -0,0 +1,212 @@ +import sys, os +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +from netrender.utils import * + +class LogFile: + def __init__(self, job_id = 0, frames = []): + self.job_id = job_id + self.frames = frames + + def serialize(self): + return { + "job_id": self.job_id, + "frames": self.frames + } + + @staticmethod + def materialize(data): + if not data: + return None + + logfile = LogFile() + logfile.job_id = data["job_id"] + logfile.frames = data["frames"] + + return logfile + +class RenderSlave: + _slave_map = {} + + def __init__(self): + self.id = "" + self.name = "" + self.address = ("",0) + self.stats = "" + self.total_done = 0 + self.total_error = 0 + self.last_seen = 0.0 + + def serialize(self): + return { + "id": self.id, + "name": self.name, + "address": self.address, + "stats": self.stats, + "total_done": self.total_done, + "total_error": self.total_error, + "last_seen": self.last_seen + } + + @staticmethod + def materialize(data): + if not data: + return None + + slave_id = data["id"] + + if slave_id in RenderSlave._slave_map: + return RenderSlave._slave_map[slave_id] + else: + slave = RenderSlave() + slave.id = slave_id + slave.name = data["name"] + slave.address = data["address"] + slave.stats = data["stats"] + slave.total_done = data["total_done"] + slave.total_error = data["total_error"] + slave.last_seen = data["last_seen"] + + RenderSlave._slave_map[slave_id] = slave + + return slave + +JOB_BLENDER = 1 +JOB_PROCESS = 2 + +JOB_TYPES = { + JOB_BLENDER: "Blender", + JOB_PROCESS: "Process" + } + +class RenderJob: + def __init__(self): + self.id = "" + self.type = JOB_BLENDER + self.name = "" + self.files = [] + self.frames = [] + self.chunks = 0 + self.priority = 0 + self.usage = 0.0 + self.blacklist = [] + self.last_dispatched = 0.0 + + def addFile(self, file_path, start=-1, end=-1): + self.files.append((file_path, start, end)) + + def addFrame(self, frame_number, command = ""): + frame = RenderFrame(frame_number, command) + self.frames.append(frame) + return frame + + def __len__(self): + return len(self.frames) + + def countFrames(self, status=QUEUED): + total = 0 + for f in self.frames: + if f.status == status: + total += 1 + + return total + + def countSlaves(self): + return len(set((frame.slave for frame in self.frames if frame.status == DISPATCHED))) + + def framesStatus(self): + results = { + QUEUED: 0, + DISPATCHED: 0, + DONE: 0, + ERROR: 0 + } + + for frame in self.frames: + results[frame.status] += 1 + + return results + + def __contains__(self, frame_number): + for f in self.frames: + if f.number == frame_number: + return True + else: + return False + + def __getitem__(self, frame_number): + for f in self.frames: + if f.number == frame_number: + return f + else: + return None + + def serialize(self, frames = None): + min_frame = min((f.number for f in frames)) if frames else -1 + max_frame = max((f.number for f in frames)) if frames else -1 + return { + "id": self.id, + "type": self.type, + "name": self.name, + "files": [f for f in self.files if f[1] == -1 or not frames or (f[1] <= min_frame <= f[2] or f[1] <= max_frame <= f[2])], + "frames": [f.serialize() for f in self.frames if not frames or f in frames], + "chunks": self.chunks, + "priority": self.priority, + "usage": self.usage, + "blacklist": self.blacklist, + "last_dispatched": self.last_dispatched + } + + @staticmethod + def materialize(data): + if not data: + return None + + job = RenderJob() + job.id = data["id"] + job.type = data["type"] + job.name = data["name"] + job.files = data["files"] + job.frames = [RenderFrame.materialize(f) for f in data["frames"]] + job.chunks = data["chunks"] + job.priority = data["priority"] + job.usage = data["usage"] + job.blacklist = data["blacklist"] + job.last_dispatched = data["last_dispatched"] + + return job + +class RenderFrame: + def __init__(self, number = 0, command = ""): + self.number = number + self.time = 0 + self.status = QUEUED + self.slave = None + self.command = command + + def statusText(self): + return STATUS_TEXT[self.status] + + def serialize(self): + return { + "number": self.number, + "time": self.time, + "status": self.status, + "slave": None if not self.slave else self.slave.serialize(), + "command": self.command + } + + @staticmethod + def materialize(data): + if not data: + return None + + frame = RenderFrame() + frame.number = data["number"] + frame.time = data["time"] + frame.status = data["status"] + frame.slave = RenderSlave.materialize(data["slave"]) + frame.command = data["command"] + + return frame diff --git a/release/scripts/io/netrender/operators.py b/release/scripts/io/netrender/operators.py new file mode 100644 index 00000000000..42d1f6a0b86 --- /dev/null +++ b/release/scripts/io/netrender/operators.py @@ -0,0 +1,423 @@ +import bpy +import sys, os +import http, http.client, http.server, urllib, socket +import webbrowser + +from netrender.utils import * +import netrender.client as client +import netrender.model + +@rnaOperator +class RENDER_OT_netclientanim(bpy.types.Operator): + ''' + Operator documentation text, will be used for the operator tooltip and python docs. + ''' + __idname__ = "render.netclientanim" + __label__ = "Net Render Client Anim" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + scene = context.scene + + conn = clientConnection(scene) + + if conn: + # Sending file + scene.network_render.job_id = client.clientSendJob(conn, scene, True) + conn.close() + + bpy.ops.screen.render('INVOKE_AREA', animation=True) + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientsend(bpy.types.Operator): + ''' + Operator documentation text, will be used for the operator tooltip and python docs. + ''' + __idname__ = "render.netclientsend" + __label__ = "Net Render Client Send" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + scene = context.scene + + conn = clientConnection(scene) + + if conn: + # Sending file + scene.network_render.job_id = client.clientSendJob(conn, scene, True) + conn.close() + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientstatus(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientstatus" + __label__ = "Net Render Client Status" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(context.scene) + + if conn: + conn.request("GET", "/status") + + response = conn.getresponse() + print( response.status, response.reason ) + + jobs = (netrender.model.RenderJob.materialize(j) for j in eval(str(response.read(), encoding='utf8'))) + + while(len(netsettings.jobs) > 0): + netsettings.jobs.remove(0) + + bpy.data.netrender_jobs = [] + + for j in jobs: + bpy.data.netrender_jobs.append(j) + netsettings.jobs.add() + job = netsettings.jobs[-1] + + j.results = j.framesStatus() # cache frame status + + job.name = j.name + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientblacklistslave(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientblacklistslave" + __label__ = "Net Render Client Blacklist Slave" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + if netsettings.active_slave_index >= 0: + + # deal with data + slave = bpy.data.netrender_slaves.pop(netsettings.active_slave_index) + bpy.data.netrender_blacklist.append(slave) + + # deal with rna + netsettings.slaves_blacklist.add() + netsettings.slaves_blacklist[-1].name = slave.name + + netsettings.slaves.remove(netsettings.active_slave_index) + netsettings.active_slave_index = -1 + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientwhitelistslave(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientwhitelistslave" + __label__ = "Net Render Client Whitelist Slave" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + if netsettings.active_blacklisted_slave_index >= 0: + + # deal with data + slave = bpy.data.netrender_blacklist.pop(netsettings.active_blacklisted_slave_index) + bpy.data.netrender_slaves.append(slave) + + # deal with rna + netsettings.slaves.add() + netsettings.slaves[-1].name = slave.name + + netsettings.slaves_blacklist.remove(netsettings.active_blacklisted_slave_index) + netsettings.active_blacklisted_slave_index = -1 + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + + +@rnaOperator +class RENDER_OT_netclientslaves(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientslaves" + __label__ = "Net Render Client Slaves" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(context.scene) + + if conn: + conn.request("GET", "/slaves") + + response = conn.getresponse() + print( response.status, response.reason ) + + slaves = (netrender.model.RenderSlave.materialize(s) for s in eval(str(response.read(), encoding='utf8'))) + + while(len(netsettings.slaves) > 0): + netsettings.slaves.remove(0) + + bpy.data.netrender_slaves = [] + + for s in slaves: + for i in range(len(bpy.data.netrender_blacklist)): + slave = bpy.data.netrender_blacklist[i] + if slave.id == s.id: + bpy.data.netrender_blacklist[i] = s + netsettings.slaves_blacklist[i].name = s.name + break + else: + bpy.data.netrender_slaves.append(s) + + netsettings.slaves.add() + slave = netsettings.slaves[-1] + slave.name = s.name + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientcancel(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientcancel" + __label__ = "Net Render Client Cancel" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + netsettings = context.scene.network_render + return netsettings.active_job_index >= 0 and len(netsettings.jobs) > 0 + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(context.scene) + + if conn: + job = bpy.data.netrender_jobs[netsettings.active_job_index] + + conn.request("POST", "/cancel", headers={"job-id":job.id}) + + response = conn.getresponse() + print( response.status, response.reason ) + + netsettings.jobs.remove(netsettings.active_job_index) + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class RENDER_OT_netclientcancelall(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientcancelall" + __label__ = "Net Render Client Cancel All" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + conn = clientConnection(context.scene) + + if conn: + conn.request("POST", "/clear") + + response = conn.getresponse() + print( response.status, response.reason ) + + while(len(netsettings.jobs) > 0): + netsettings.jobs.remove(0) + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class netclientdownload(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientdownload" + __label__ = "Net Render Client Download" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + netsettings = context.scene.network_render + return netsettings.active_job_index >= 0 and len(netsettings.jobs) > 0 + + def execute(self, context): + netsettings = context.scene.network_render + rd = context.scene.render_data + + conn = clientConnection(context.scene) + + if conn: + job = bpy.data.netrender_jobs[netsettings.active_job_index] + + for frame in job.frames: + client.requestResult(conn, job.id, frame.number) + response = conn.getresponse() + + if response.status != http.client.OK: + print("missing", frame.number) + continue + + print("got back", frame.number) + + f = open(netsettings.path + "%06d" % frame.number + ".exr", "wb") + buf = response.read(1024) + + while buf: + f.write(buf) + buf = response.read(1024) + + f.close() + + conn.close() + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class netclientscan(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientscan" + __label__ = "Net Render Client Scan" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + try: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + s.settimeout(30) + + s.bind(('', 8000)) + + buf, address = s.recvfrom(64) + + print("received:", buf) + + netsettings.server_address = address[0] + netsettings.server_port = int(str(buf, encoding='utf8')) + except socket.timeout: + print("no server info") + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) + +@rnaOperator +class netclientweb(bpy.types.Operator): + '''Operator documentation text, will be used for the operator tooltip and python docs.''' + __idname__ = "render.netclientweb" + __label__ = "Net Render Client Web" + + # List of operator properties, the attributes will be assigned + # to the class instance from the operator settings before calling. + + __props__ = [] + + def poll(self, context): + return True + + def execute(self, context): + netsettings = context.scene.network_render + + + # open connection to make sure server exists + conn = clientConnection(context.scene) + + if conn: + conn.close() + + webbrowser.open("http://%s:%i" % (netsettings.server_address, netsettings.server_port)) + + return ('FINISHED',) + + def invoke(self, context, event): + return self.execute(context) diff --git a/release/scripts/io/netrender/slave.py b/release/scripts/io/netrender/slave.py new file mode 100644 index 00000000000..15ca6faf297 --- /dev/null +++ b/release/scripts/io/netrender/slave.py @@ -0,0 +1,224 @@ +import sys, os, platform +import http, http.client, http.server, urllib +import subprocess, time + +from netrender.utils import * +import netrender.model + +CANCEL_POLL_SPEED = 2 +MAX_TIMEOUT = 10 +INCREMENT_TIMEOUT = 1 + +if platform.system() == 'Windows' and platform.version() >= '5': # Error mode is only available on Win2k or higher, that's version 5 + import ctypes + def SetErrorMode(): + val = ctypes.windll.kernel32.SetErrorMode(0x0002) + ctypes.windll.kernel32.SetErrorMode(val | 0x0002) + return val + + def RestoreErrorMode(val): + ctypes.windll.kernel32.SetErrorMode(val) +else: + def SetErrorMode(): + return 0 + + def RestoreErrorMode(val): + pass + +def slave_Info(): + sysname, nodename, release, version, machine, processor = platform.uname() + slave = netrender.model.RenderSlave() + slave.name = nodename + slave.stats = sysname + " " + release + " " + machine + " " + processor + return slave + +def testCancel(conn, job_id, frame_number): + conn.request("HEAD", "/status", headers={"job-id":job_id, "job-frame": str(frame_number)}) + response = conn.getresponse() + + # cancelled if job isn't found anymore + if response.status == http.client.NO_CONTENT: + return True + else: + return False + +def testFile(conn, job_id, slave_id, JOB_PREFIX, file_path, main_path = None): + job_full_path = prefixPath(JOB_PREFIX, file_path, main_path) + + if not os.path.exists(job_full_path): + temp_path = JOB_PREFIX + "slave.temp.blend" + conn.request("GET", "/file", headers={"job-id": job_id, "slave-id":slave_id, "job-file":file_path}) + response = conn.getresponse() + + if response.status != http.client.OK: + return None # file for job not returned by server, need to return an error code to server + + f = open(temp_path, "wb") + buf = response.read(1024) + + while buf: + f.write(buf) + buf = response.read(1024) + + f.close() + + os.renames(temp_path, job_full_path) + + return job_full_path + + +def render_slave(engine, scene): + netsettings = scene.network_render + timeout = 1 + + engine.update_stats("", "Network render node initiation") + + conn = clientConnection(scene) + + if conn: + conn.request("POST", "/slave", repr(slave_Info().serialize())) + response = conn.getresponse() + + slave_id = response.getheader("slave-id") + + NODE_PREFIX = netsettings.path + "slave_" + slave_id + os.sep + if not os.path.exists(NODE_PREFIX): + os.mkdir(NODE_PREFIX) + + while not engine.test_break(): + + conn.request("GET", "/job", headers={"slave-id":slave_id}) + response = conn.getresponse() + + if response.status == http.client.OK: + timeout = 1 # reset timeout on new job + + job = netrender.model.RenderJob.materialize(eval(str(response.read(), encoding='utf8'))) + + JOB_PREFIX = NODE_PREFIX + "job_" + job.id + os.sep + if not os.path.exists(JOB_PREFIX): + os.mkdir(JOB_PREFIX) + + + if job.type == netrender.model.JOB_BLENDER: + job_path = job.files[0][0] # data in files have format (path, start, end) + main_path, main_file = os.path.split(job_path) + + job_full_path = testFile(conn, job.id, slave_id, JOB_PREFIX, job_path) + print("Fullpath", job_full_path) + print("File:", main_file, "and %i other files" % (len(job.files) - 1,)) + engine.update_stats("", "Render File", main_file, "for job", job.id) + + for file_path, start, end in job.files[1:]: + print("\t", file_path) + testFile(conn, job.id, slave_id, JOB_PREFIX, file_path, main_path) + + # announce log to master + logfile = netrender.model.LogFile(job.id, [frame.number for frame in job.frames]) + conn.request("POST", "/log", bytes(repr(logfile.serialize()), encoding='utf8'), headers={"slave-id":slave_id}) + response = conn.getresponse() + + + first_frame = job.frames[0].number + + # start render + start_t = time.time() + + if job.type == netrender.model.JOB_BLENDER: + frame_args = [] + + for frame in job.frames: + print("frame", frame.number) + frame_args += ["-f", str(frame.number)] + + val = SetErrorMode() + process = subprocess.Popen([sys.argv[0], "-b", job_full_path, "-o", JOB_PREFIX + "######", "-E", "BLENDER_RENDER", "-F", "MULTILAYER"] + frame_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + elif job.type == netrender.model.JOB_PROCESS: + command = job.frames[0].command + val = SetErrorMode() + process = subprocess.Popen(command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + RestoreErrorMode(val) + + headers = {"job-id":job.id, "slave-id":slave_id} + + cancelled = False + stdout = bytes() + run_t = time.time() + while process.poll() == None and not cancelled: + stdout += process.stdout.read(32) + current_t = time.time() + cancelled = engine.test_break() + if current_t - run_t > CANCEL_POLL_SPEED: + + # update logs if needed + if stdout: + # (only need to update on one frame, they are linked + headers["job-frame"] = str(first_frame) + conn.request("PUT", "/log", stdout, headers=headers) + response = conn.getresponse() + + stdout = bytes() + + run_t = current_t + if testCancel(conn, job.id, first_frame): + cancelled = True + + # read leftovers if needed + stdout += process.stdout.read() + + if cancelled: + # kill process if needed + if process.poll() == None: + process.terminate() + continue # to next frame + + total_t = time.time() - start_t + + avg_t = total_t / len(job.frames) + + status = process.returncode + + print("status", status) + + # flush the rest of the logs + if stdout: + # (only need to update on one frame, they are linked + headers["job-frame"] = str(first_frame) + conn.request("PUT", "/log", stdout, headers=headers) + response = conn.getresponse() + + headers = {"job-id":job.id, "slave-id":slave_id, "job-time":str(avg_t)} + + if status == 0: # non zero status is error + headers["job-result"] = str(DONE) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + + if job.type == netrender.model.JOB_BLENDER: + # send image back to server + f = open(JOB_PREFIX + "%06d" % frame.number + ".exr", 'rb') + conn.request("PUT", "/render", f, headers=headers) + f.close() + response = conn.getresponse() + elif job.type == netrender.model.JOB_PROCESS: + conn.request("PUT", "/render", headers=headers) + response = conn.getresponse() + else: + headers["job-result"] = str(ERROR) + for frame in job.frames: + headers["job-frame"] = str(frame.number) + # send error result back to server + conn.request("PUT", "/render", headers=headers) + response = conn.getresponse() + else: + if timeout < MAX_TIMEOUT: + timeout += INCREMENT_TIMEOUT + + for i in range(timeout): + time.sleep(1) + if engine.test_break(): + conn.close() + return + + conn.close() diff --git a/release/scripts/io/netrender/ui.py b/release/scripts/io/netrender/ui.py new file mode 100644 index 00000000000..7681d4865e9 --- /dev/null +++ b/release/scripts/io/netrender/ui.py @@ -0,0 +1,321 @@ +import bpy +import sys, os +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +import netrender.slave as slave +import netrender.master as master + +from netrender.utils import * + +VERSION = b"0.3" + +PATH_PREFIX = "/tmp/" + +QUEUED = 0 +DISPATCHED = 1 +DONE = 2 +ERROR = 3 + +class RenderButtonsPanel(bpy.types.Panel): + __space_type__ = "PROPERTIES" + __region_type__ = "WINDOW" + __context__ = "scene" + # COMPAT_ENGINES must be defined in each subclass, external engines can add themselves here + + def poll(self, context): + rd = context.scene.render_data + return (rd.use_game_engine==False) and (rd.engine in self.COMPAT_ENGINES) + +# Setting panel, use in the scene for now. +@rnaType +class SCENE_PT_network_settings(RenderButtonsPanel): + __label__ = "Network Settings" + COMPAT_ENGINES = set(['NET_RENDER']) + + def draw_header(self, context): + layout = self.layout + scene = context.scene + + def draw(self, context): + layout = self.layout + + scene = context.scene + rd = scene.render_data + + layout.active = True + + split = layout.split() + + col = split.column() + + col.itemR(scene.network_render, "mode") + col.itemR(scene.network_render, "path") + col.itemR(scene.network_render, "server_address") + col.itemR(scene.network_render, "server_port") + + if scene.network_render.mode == "RENDER_MASTER": + col.itemR(scene.network_render, "server_broadcast") + else: + col.itemO("render.netclientscan", icon="ICON_FILE_REFRESH", text="") + +@rnaType +class SCENE_PT_network_job(RenderButtonsPanel): + __label__ = "Job Settings" + COMPAT_ENGINES = set(['NET_RENDER']) + + def poll(self, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + scene = context.scene + rd = scene.render_data + + layout.active = True + + split = layout.split() + + col = split.column() + + col.itemO("render.netclientanim", icon='ICON_RENDER_ANIMATION', text="Animaton on network") + col.itemO("render.netclientsend", icon="ICON_FILE_BLEND", text="Send job") + col.itemO("render.netclientweb", icon="ICON_QUESTION", text="Open Master Monitor") + col.itemR(scene.network_render, "job_name") + col.itemR(scene.network_render, "priority") + col.itemR(scene.network_render, "chunks") + +@rnaType +class SCENE_PT_network_slaves(RenderButtonsPanel): + __label__ = "Slaves Status" + COMPAT_ENGINES = set(['NET_RENDER']) + + def poll(self, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + scene = context.scene + netsettings = scene.network_render + + row = layout.row() + row.template_list(netsettings, "slaves", netsettings, "active_slave_index", rows=2) + + col = row.column() + + subcol = col.column(align=True) + subcol.itemO("render.netclientslaves", icon="ICON_FILE_REFRESH", text="") + subcol.itemO("render.netclientblacklistslave", icon="ICON_ZOOMOUT", text="") + + if len(bpy.data.netrender_slaves) == 0 and len(netsettings.slaves) > 0: + while(len(netsettings.slaves) > 0): + netsettings.slaves.remove(0) + + if netsettings.active_slave_index >= 0 and len(netsettings.slaves) > 0: + layout.itemS() + + slave = bpy.data.netrender_slaves[netsettings.active_slave_index] + + layout.itemL(text="Name: " + slave.name) + layout.itemL(text="Address: " + slave.address[0]) + layout.itemL(text="Seen: " + time.ctime(slave.last_seen)) + layout.itemL(text="Stats: " + slave.stats) + +@rnaType +class SCENE_PT_network_slaves_blacklist(RenderButtonsPanel): + __label__ = "Slaves Blacklist" + COMPAT_ENGINES = set(['NET_RENDER']) + + def poll(self, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + scene = context.scene + netsettings = scene.network_render + + row = layout.row() + row.template_list(netsettings, "slaves_blacklist", netsettings, "active_blacklisted_slave_index", rows=2) + + col = row.column() + + subcol = col.column(align=True) + subcol.itemO("render.netclientwhitelistslave", icon="ICON_ZOOMOUT", text="") + + if len(bpy.data.netrender_blacklist) == 0 and len(netsettings.slaves_blacklist) > 0: + while(len(netsettings.slaves_blacklist) > 0): + netsettings.slaves_blacklist.remove(0) + + if netsettings.active_blacklisted_slave_index >= 0 and len(netsettings.slaves_blacklist) > 0: + layout.itemS() + + slave = bpy.data.netrender_blacklist[netsettings.active_blacklisted_slave_index] + + layout.itemL(text="Name: " + slave.name) + layout.itemL(text="Address: " + slave.address[0]) + layout.itemL(text="Seen: " + slave.last_seen) + layout.itemL(text="Stats: " + time.ctime(slave.stats)) + +@rnaType +class SCENE_PT_network_jobs(RenderButtonsPanel): + __label__ = "Jobs" + COMPAT_ENGINES = set(['NET_RENDER']) + + def poll(self, context): + scene = context.scene + return super().poll(context) and scene.network_render.mode == "RENDER_CLIENT" + + def draw(self, context): + layout = self.layout + + scene = context.scene + netsettings = scene.network_render + + row = layout.row() + row.template_list(netsettings, "jobs", netsettings, "active_job_index", rows=2) + + col = row.column() + + subcol = col.column(align=True) + subcol.itemO("render.netclientstatus", icon="ICON_FILE_REFRESH", text="") + subcol.itemO("render.netclientcancel", icon="ICON_ZOOMOUT", text="") + subcol.itemO("render.netclientcancelall", icon="ICON_PANEL_CLOSE", text="") + subcol.itemO("render.netclientdownload", icon='ICON_RENDER_ANIMATION', text="") + + if len(bpy.data.netrender_jobs) == 0 and len(netsettings.jobs) > 0: + while(len(netsettings.jobs) > 0): + netsettings.jobs.remove(0) + + if netsettings.active_job_index >= 0 and len(netsettings.jobs) > 0: + layout.itemS() + + job = bpy.data.netrender_jobs[netsettings.active_job_index] + + layout.itemL(text="Name: %s" % job.name) + layout.itemL(text="Length: %04i" % len(job)) + layout.itemL(text="Done: %04i" % job.results[DONE]) + layout.itemL(text="Error: %04i" % job.results[ERROR]) + +@rnaType +class NetRenderSettings(bpy.types.IDPropertyGroup): + pass + +@rnaType +class NetRenderSlave(bpy.types.IDPropertyGroup): + pass + +@rnaType +class NetRenderJob(bpy.types.IDPropertyGroup): + pass + +bpy.types.Scene.PointerProperty(attr="network_render", type=NetRenderSettings, name="Network Render", description="Network Render Settings") + +NetRenderSettings.StringProperty( attr="server_address", + name="Server address", + description="IP or name of the master render server", + maxlen = 128, + default = "[default]") + +NetRenderSettings.IntProperty( attr="server_port", + name="Server port", + description="port of the master render server", + default = 8000, + min=1, + max=65535) + +NetRenderSettings.BoolProperty( attr="server_broadcast", + name="Broadcast server address", + description="broadcast server address on local network", + default = True) + +if os.name == 'nt': + NetRenderSettings.StringProperty( attr="path", + name="Path", + description="Path for temporary files", + maxlen = 128, + default = "C:/tmp/") +else: + NetRenderSettings.StringProperty( attr="path", + name="Path", + description="Path for temporary files", + maxlen = 128, + default = "/tmp/") + +NetRenderSettings.StringProperty( attr="job_name", + name="Job name", + description="Name of the job", + maxlen = 128, + default = "[default]") + +NetRenderSettings.IntProperty( attr="chunks", + name="Chunks", + description="Number of frame to dispatch to each slave in one chunk", + default = 5, + min=1, + max=65535) + +NetRenderSettings.IntProperty( attr="priority", + name="Priority", + description="Priority of the job", + default = 1, + min=1, + max=10) + +NetRenderSettings.StringProperty( attr="job_id", + name="Network job id", + description="id of the last sent render job", + maxlen = 64, + default = "") + +NetRenderSettings.IntProperty( attr="active_slave_index", + name="Index of the active slave", + description="", + default = -1, + min= -1, + max=65535) + +NetRenderSettings.IntProperty( attr="active_blacklisted_slave_index", + name="Index of the active slave", + description="", + default = -1, + min= -1, + max=65535) + +NetRenderSettings.IntProperty( attr="active_job_index", + name="Index of the active job", + description="", + default = -1, + min= -1, + max=65535) + +NetRenderSettings.EnumProperty(attr="mode", + items=( + ("RENDER_CLIENT", "Client", "Act as render client"), + ("RENDER_MASTER", "Master", "Act as render master"), + ("RENDER_SLAVE", "Slave", "Act as render slave"), + ), + name="network mode", + description="mode of operation of this instance", + default="RENDER_CLIENT") + +NetRenderSettings.CollectionProperty(attr="slaves", type=NetRenderSlave, name="Slaves", description="") +NetRenderSettings.CollectionProperty(attr="slaves_blacklist", type=NetRenderSlave, name="Slaves Blacklist", description="") +NetRenderSettings.CollectionProperty(attr="jobs", type=NetRenderJob, name="Job List", description="") + +NetRenderSlave.StringProperty( attr="name", + name="Name of the slave", + description="", + maxlen = 64, + default = "") + +NetRenderJob.StringProperty( attr="name", + name="Name of the job", + description="", + maxlen = 128, + default = "") diff --git a/release/scripts/io/netrender/utils.py b/release/scripts/io/netrender/utils.py new file mode 100644 index 00000000000..06393a738a0 --- /dev/null +++ b/release/scripts/io/netrender/utils.py @@ -0,0 +1,86 @@ +import bpy +import sys, os +import re +import http, http.client, http.server, urllib +import subprocess, shutil, time, hashlib + +import netrender.model + +VERSION = b"0.5" + +# Jobs status +JOB_WAITING = 0 # before all data has been entered +JOB_PAUSED = 1 # paused by user +JOB_FINISHED = 2 # finished rendering +JOB_QUEUED = 3 # ready to be dispatched + +# Frames status +QUEUED = 0 +DISPATCHED = 1 +DONE = 2 +ERROR = 3 + +STATUS_TEXT = { + QUEUED: "Queued", + DISPATCHED: "Dispatched", + DONE: "Done", + ERROR: "Error" + } + +def rnaType(rna_type): + bpy.types.register(rna_type) + return rna_type + +def rnaOperator(rna_op): + bpy.ops.add(rna_op) + return rna_op + +def clientConnection(scene): + netsettings = scene.network_render + + if netsettings.server_address == "[default]": + bpy.ops.render.netclientscan() + + conn = http.client.HTTPConnection(netsettings.server_address, netsettings.server_port) + + if clientVerifyVersion(conn): + return conn + else: + conn.close() + return None + +def clientVerifyVersion(conn): + conn.request("GET", "/version") + response = conn.getresponse() + + if response.status != http.client.OK: + conn.close() + return False + + server_version = response.read() + + if server_version != VERSION: + print("Incorrect server version!") + print("expected", VERSION, "received", server_version) + return False + + return True + +def prefixPath(prefix_directory, file_path, prefix_path): + if os.path.isabs(file_path): + # if an absolute path, make sure path exists, if it doesn't, use relative local path + full_path = file_path + if not os.path.exists(full_path): + p, n = os.path.split(full_path) + + if prefix_path and p.startswith(prefix_path): + directory = prefix_directory + p[len(prefix_path):] + full_path = directory + n + if not os.path.exists(directory): + os.mkdir(directory) + else: + full_path = prefix_directory + n + else: + full_path = prefix_directory + file_path + + return full_path |