From 1d31ef75966273045338ebc72ebcce5ad63a76b4 Mon Sep 17 00:00:00 2001 From: Martin Poirier Date: Sun, 15 Jan 2012 19:51:01 +0000 Subject: netrender use threading for interprocess communication. Don't stall slave communication when rendering/baking process output blocks. This enables running slow baking and rendering jobs correctly without the slave disconnecting from the master. It also makes slaves much more responsive to cancelling jobs on the master. add save on job option (default false) to save the current file before sending a rendering blender job. --- netrender/slave.py | 69 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 21 deletions(-) (limited to 'netrender/slave.py') diff --git a/netrender/slave.py b/netrender/slave.py index 361b78fc..1ed21d7d 100644 --- a/netrender/slave.py +++ b/netrender/slave.py @@ -18,7 +18,7 @@ import sys, os, platform, shutil import http, http.client, http.server -import subprocess, time +import subprocess, time, threading import json import bpy @@ -239,24 +239,46 @@ def render_slave(engine, netsettings, threads): results = [] - cancelled = False - stdout = bytes() - run_t = time.time() line = "" - while not cancelled and process.poll() is None: - stdout += process.stdout.read(1024) - current_t = time.time() - cancelled = engine.test_break() - if current_t - run_t > CANCEL_POLL_SPEED: + + class ProcessData: + def __init__(self): + self.lock = threading.Lock() + self.stdout = bytes() + self.cancelled = False + self.start_time = time.time() + self.last_time = time.time() + + data = ProcessData() + + def run_process(process, data): + while not data.cancelled and process.poll() is None: + buf = process.stdout.read(1024) + + data.lock.acquire() + data.stdout += buf + data.lock.release() + + process_thread = threading.Thread(target=run_process, args=(process, data)) + + process_thread.start() + + while not data.cancelled and process_thread.is_alive(): + time.sleep(CANCEL_POLL_SPEED / 2) + current_time = time.time() + data.cancelled = engine.test_break() + if current_time - data.last_time > CANCEL_POLL_SPEED: + + data.lock.acquire() # update logs if needed - if stdout: + if data.stdout: # (only need to update on one frame, they are linked with ConnectionContext(): - conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + conn.request("PUT", logURL(job.id, first_frame), data.stdout, headers=headers) responseStatus(conn) - stdout_text = str(stdout, encoding='utf8') + stdout_text = str(data.stdout, encoding='utf8') # Also output on console if netsettings.use_slave_output_log: @@ -268,20 +290,25 @@ def render_slave(engine, netsettings, threads): if job.subtype == netrender.model.JOB_SUB_BAKING: results.extend(netrender.baking.resultsFromOuput(lines)) - stdout = bytes() + data.stdout = bytes() + + data.lock.release() - run_t = current_t + data.last_time = current_time if testCancel(conn, job.id, first_frame): engine.update_stats("", "Job canceled by Master") - cancelled = True + data.cancelled = True + + process_thread.join() + del process_thread if job.type == netrender.model.JOB_BLENDER: netrender.repath.reset(job) # read leftovers if needed - stdout += process.stdout.read() + data.stdout += process.stdout.read() - if cancelled: + if data.cancelled: # kill process if needed if process.poll() is None: try: @@ -291,8 +318,8 @@ def render_slave(engine, netsettings, threads): continue # to next frame # flush the rest of the logs - if stdout: - stdout_text = str(stdout, encoding='utf8') + if data.stdout: + stdout_text = str(data.stdout, encoding='utf8') # Also output on console if netsettings.use_slave_output_log: @@ -305,12 +332,12 @@ def render_slave(engine, netsettings, threads): # (only need to update on one frame, they are linked with ConnectionContext(): - conn.request("PUT", logURL(job.id, first_frame), stdout, headers=headers) + conn.request("PUT", logURL(job.id, first_frame), data.stdout, headers=headers) if responseStatus(conn) == http.client.NO_CONTENT: continue - total_t = time.time() - start_t + total_t = time.time() - data.start_time avg_t = total_t / len(job.frames) -- cgit v1.2.3