From a82f4c03e749ada328b6031cfd085cab8f38f51c Mon Sep 17 00:00:00 2001 From: Martin Poirier Date: Wed, 29 Jun 2011 19:46:48 +0000 Subject: netrender better stream handling when uploading files Might solve the problem with OS X masters --- netrender/master.py | 64 +++++++++++++++++++++-------------------------------- netrender/slave.py | 4 ++-- netrender/utils.py | 8 ++++--- 3 files changed, 32 insertions(+), 44 deletions(-) (limited to 'netrender') diff --git a/netrender/master.py b/netrender/master.py index b34509e3..280a4019 100644 --- a/netrender/master.py +++ b/netrender/master.py @@ -213,7 +213,9 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): def send_head(self, code = http.client.OK, headers = {}, content = "application/octet-stream"): self.send_response(code) - self.send_header("Content-type", content) + + if code != http.client.OK and content: + self.send_header("Content-type", content) for key, value in headers.items(): self.send_header(key, value) @@ -512,7 +514,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if job.testStart(): self.server.stats("", "New job, started") - self.send_head(headers=headers) + self.send_head(headers=headers, content = None) else: self.server.stats("", "New job, missing files (%i total)" % len(job.files)) self.send_head(http.client.ACCEPTED, headers=headers) @@ -529,7 +531,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): info_map = self.getInfoMap() job.edit(info_map) - self.send_head() + self.send_head(content = None) else: # no such job id self.send_head(http.client.NO_CONTENT) @@ -547,7 +549,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): except: pass # invalid type - self.send_head() + self.send_head(content = None) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- elif self.path == "/balance_enable": info_map = self.getInfoMap() @@ -556,7 +558,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if rule: rule.enabled = enabled - self.send_head() + self.send_head(content = None) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- elif self.path.startswith("/cancel"): match = cancel_pattern.match(self.path) @@ -572,7 +574,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if job: self.server.stats("", "Cancelling job") self.server.removeJob(job, clear) - self.send_head() + self.send_head(content = None) else: # no such job id self.send_head(http.client.NO_CONTENT) @@ -594,7 +596,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if job: self.server.stats("", "Pausing job") job.pause(status) - self.send_head() + self.send_head(content = None) else: # no such job id self.send_head(http.client.NO_CONTENT) @@ -610,7 +612,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): self.server.stats("", "Clearing jobs") self.server.clear(clear) - self.send_head() + self.send_head(content = None) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- elif self.path.startswith("/reset"): match = reset_pattern.match(self.path) @@ -629,7 +631,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if frame: self.server.stats("", "Reset job frame") frame.reset(all) - self.send_head() + self.send_head(content = None) else: # no such frame self.send_head(http.client.NO_CONTENT) @@ -637,7 +639,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: self.server.stats("", "Reset job") job.reset(all) - self.send_head() + self.send_head(content = None) else: # job not found self.send_head(http.client.NO_CONTENT) @@ -654,7 +656,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): slave_id = self.server.addSlave(slave_info.name, self.client_address, slave_info.stats) - self.send_head(headers = {"slave-id": slave_id}) + self.send_head(headers = {"slave-id": slave_id}, content = None) # =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- elif self.path == "/log": length = int(self.headers['content-length']) @@ -671,7 +673,7 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if job: self.server.stats("", "Log announcement") job.addLog(log_info.frames) - self.send_head(http.client.OK) + self.send_head(content = None) else: # no such job id self.send_head(http.client.NO_CONTENT) @@ -691,7 +693,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): if match: self.server.stats("", "Receiving job") - length = int(self.headers['content-length']) job_id = match.groups()[0] file_index = int(match.groups()[1]) @@ -711,20 +712,17 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): else: file_path = os.path.join(job.save_path, main_name) - buf = self.rfile.read(length) - # add same temp file + renames as slave f = open(file_path, "wb") - f.write(buf) + shutil.copyfileobj(self.rfile, f) f.close() - del buf render_file.filepath = file_path # set the new path if job.testStart(): self.server.stats("", "File upload, starting job") - self.send_head(http.client.OK) + self.send_head(content = None) else: self.server.stats("", "File upload, file missings") self.send_head(http.client.ACCEPTED) @@ -758,15 +756,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): frame = job[job_frame] if frame: + self.send_head(content = None) + if job.hasRenderResult(): if job_result == DONE: - length = int(self.headers['content-length']) - buf = self.rfile.read(length) f = open(os.path.join(job.save_path, "%06d.exr" % job_frame), 'wb') - f.write(buf) + shutil.copyfileobj(self.rfile, f) f.close() - del buf elif job_result == ERROR: # blacklist slave on this job on error # slaves might already be in blacklist if errors on the whole chunk @@ -780,9 +777,6 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): job.testFinished() - self.send_head() - # need some message content here or the slave doesn't like it - self.wfile.write(bytes("foo", encoding='utf8')) else: # frame not found self.send_head(http.client.NO_CONTENT) else: # job not found @@ -808,18 +802,13 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): frame = job[job_frame] if frame: + self.send_head(content = None) + if job.hasRenderResult(): - length = int(self.headers['content-length']) - buf = self.rfile.read(length) f = open(os.path.join(job.save_path, "%06d.jpg" % job_frame), 'wb') - f.write(buf) + shutil.copyfileobj(self.rfile, f) f.close() - del buf - - self.send_head() - # need some message content here or the slave doesn't like it - self.wfile.write(bytes("foo", encoding='utf8')) else: # frame not found self.send_head(http.client.NO_CONTENT) else: # job not found @@ -843,17 +832,14 @@ class RenderHandler(http.server.BaseHTTPRequestHandler): frame = job[job_frame] if frame and frame.log_path: - length = int(self.headers['content-length']) - buf = self.rfile.read(length) + self.send_head(content = None) + f = open(frame.log_path, 'ab') - f.write(buf) + shutil.copyfileobj(self.rfile, f) f.close() - del buf - self.server.getSeenSlave(self.headers['slave-id']) - self.send_head() else: # frame not found self.send_head(http.client.NO_CONTENT) else: # job not found diff --git a/netrender/slave.py b/netrender/slave.py index 3976695d..7f72530e 100644 --- a/netrender/slave.py +++ b/netrender/slave.py @@ -86,6 +86,7 @@ def testFile(conn, job_id, slave_id, rfile, JOB_PREFIX, main_path = None): if not found: print("Found file %s at %s but signature mismatch!" % (rfile.filepath, job_full_path)) + os.remove(job_full_path) job_full_path = prefixPath(JOB_PREFIX, rfile.filepath, main_path, force = True) if not found: @@ -258,8 +259,7 @@ def render_slave(engine, netsettings, threads): if stdout: # (only need to update on one frame, they are linked conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) - response = conn.getresponse() - response.read() + responseStatus(conn) # Also output on console if netsettings.use_slave_output_log: diff --git a/netrender/utils.py b/netrender/utils.py index 602f6cf3..b0c8eb0b 100644 --- a/netrender/utils.py +++ b/netrender/utils.py @@ -92,9 +92,11 @@ class BreakableIncrementedSleep: self.increase() def responseStatus(conn): - response = conn.getresponse() - response.read() - return response.status + with conn.getresponse() as response: + length = int(response.getheader("content-length", "0")) + if length > 0: + response.read() + return response.status def reporting(report, message, errorType = None): if errorType: -- cgit v1.2.3