From 8d8fa38938852d4d08fbfe0c60f1ce6ad9eaac83 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Tue, 16 Oct 2012 11:27:48 +0100 Subject: Fixed bug in run, eval, and exec parallel pipeline functions. --- src/pypeline/helpers/parallel_helpers.py | 12 ++++--- .../tests/parallel_helper_pipeline_tests.py | 39 ++++++++++++++++++++-- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index b4a402a..9a49ed3 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -116,21 +116,23 @@ def __kleisli_wrapper(f): future = Future() future.set_result(input) state_monad = KleisliArrow.runKleisli(pipeline, future) - output = f(state_monad, WrappedState(executor, state)) - return (output[0].result(), output[1].state) + return f(state_monad, WrappedState(executor, state)) return wrapper @__kleisli_wrapper def run_pipeline(state_monad, state): - return State.runState(state_monad, state) + output = State.runState(state_monad, state) + return (output[0].result(), output[1].state) @__kleisli_wrapper def eval_pipeline(state_monad, state): - return State.evalState(state_monad, state) + future = State.evalState(state_monad, state) + return future.result() @__kleisli_wrapper def exec_pipeline(state_monad, state): - return State.execState(state_monad, state) + wrapped_state = State.execState(state_monad, state) + return wrapped_state.state diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py index b9ceaed..c148a9b 100644 --- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py +++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py @@ -33,14 +33,16 @@ from pypeline.helpers.parallel_helpers import cons_function_component, \ cons_dictionary_wire, \ cons_split_wire, \ cons_unsplit_wire, \ - run_pipeline + run_pipeline, \ + eval_pipeline, \ + exec_pipeline class ParallelPypelineHelperUnitTest(unittest.TestCase): @staticmethod - def test(no_workers, pipeline, input, state): + def test(no_workers, pipeline, input, state, run_function = run_pipeline): executor = ThreadPoolExecutor(max_workers = no_workers) - result = run_pipeline(executor, pipeline, input, state) + result = run_function(executor, pipeline, input, state) executor.shutdown(True) return result @@ -102,3 +104,34 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase): result = ParallelPypelineHelperUnitTest.test(2, pipeline, "hello world", list()) self.assertEquals(target, result) + + + def test_parallel_run_eval_and_exec(self): + value = "hello world" + state = 0 + + input_msg = "input" + output_msg = "output" + + input_func = lambda a, s: " ".join([input_msg, a]) + output_func = lambda a, s: " ".join([a, output_msg]) + function = lambda a, s: a.upper() + composition = lambda a, s: output_func(function(input_func(a, s), s), s) + state_func = lambda s: s + 1 + + pipeline = cons_function_component(function, + input_func, + output_func, + state_mutator = state_func) + + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, run_pipeline) + target = (composition(value, state), state_func(state)) + self.assertEquals(target, result) + + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, eval_pipeline) + target = composition(value, state) + self.assertEquals(target, result) + + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, exec_pipeline) + target = state_func(state) + self.assertEquals(target, result) -- cgit v1.2.3