Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2012-10-16 14:27:48 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2012-10-16 14:27:48 +0400
commit8d8fa38938852d4d08fbfe0c60f1ce6ad9eaac83 (patch)
treef83f01a3c6b30e63c7379f5560c8205f72b2e499
parent6b5860129da5be85813a8a6791787039c197eb0d (diff)
Fixed bug in run, eval, and exec parallel pipeline functions.
-rw-r--r--src/pypeline/helpers/parallel_helpers.py12
-rw-r--r--src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py39
2 files changed, 43 insertions, 8 deletions
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index b4a402a..9a49ed3 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -116,21 +116,23 @@ def __kleisli_wrapper(f):
future = Future()
future.set_result(input)
state_monad = KleisliArrow.runKleisli(pipeline, future)
- output = f(state_monad, WrappedState(executor, state))
- return (output[0].result(), output[1].state)
+ return f(state_monad, WrappedState(executor, state))
return wrapper
@__kleisli_wrapper
def run_pipeline(state_monad, state):
- return State.runState(state_monad, state)
+ output = State.runState(state_monad, state)
+ return (output[0].result(), output[1].state)
@__kleisli_wrapper
def eval_pipeline(state_monad, state):
- return State.evalState(state_monad, state)
+ future = State.evalState(state_monad, state)
+ return future.result()
@__kleisli_wrapper
def exec_pipeline(state_monad, state):
- return State.execState(state_monad, state)
+ wrapped_state = State.execState(state_monad, state)
+ return wrapped_state.state
diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
index b9ceaed..c148a9b 100644
--- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
@@ -33,14 +33,16 @@ from pypeline.helpers.parallel_helpers import cons_function_component, \
cons_dictionary_wire, \
cons_split_wire, \
cons_unsplit_wire, \
- run_pipeline
+ run_pipeline, \
+ eval_pipeline, \
+ exec_pipeline
class ParallelPypelineHelperUnitTest(unittest.TestCase):
@staticmethod
- def test(no_workers, pipeline, input, state):
+ def test(no_workers, pipeline, input, state, run_function = run_pipeline):
executor = ThreadPoolExecutor(max_workers = no_workers)
- result = run_pipeline(executor, pipeline, input, state)
+ result = run_function(executor, pipeline, input, state)
executor.shutdown(True)
return result
@@ -102,3 +104,34 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase):
result = ParallelPypelineHelperUnitTest.test(2, pipeline, "hello world", list())
self.assertEquals(target, result)
+
+
+ def test_parallel_run_eval_and_exec(self):
+ value = "hello world"
+ state = 0
+
+ input_msg = "input"
+ output_msg = "output"
+
+ input_func = lambda a, s: " ".join([input_msg, a])
+ output_func = lambda a, s: " ".join([a, output_msg])
+ function = lambda a, s: a.upper()
+ composition = lambda a, s: output_func(function(input_func(a, s), s), s)
+ state_func = lambda s: s + 1
+
+ pipeline = cons_function_component(function,
+ input_func,
+ output_func,
+ state_mutator = state_func)
+
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, run_pipeline)
+ target = (composition(value, state), state_func(state))
+ self.assertEquals(target, result)
+
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, eval_pipeline)
+ target = composition(value, state)
+ self.assertEquals(target, result)
+
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, state, exec_pipeline)
+ target = state_func(state)
+ self.assertEquals(target, result)