diff options
author | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-06-10 19:44:04 +0400 |
---|---|---|
committer | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-06-10 19:44:04 +0400 |
commit | c4fe73dd6b3c50f18ff285fe584ca19740fe3257 (patch) | |
tree | 718dd35243677ccdfd2109139c793275fc399fdd | |
parent | 6036e42acb37f75f08f3df997e8b4034752d8a48 (diff) |
Bug fixed the concurrent helper module. Simpler, and now hopefully working.
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | src/pypeline/helpers/parallel_helpers.py | 52 | ||||
-rw-r--r-- | src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py | 9 |
3 files changed, 16 insertions, 47 deletions
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages setup( name = "pypeline", - version = "0.2.6", + version = "0.2.7", packages = find_packages("src", exclude = ["*tests"]), package_dir = {'': 'src'}, diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index 315a565..8d79b4a 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -49,10 +49,10 @@ def cons_function_component(function, def do_transformation(a, s): # Handle input - if isinstance(a, tuple): - the_a = (a[0].result(), a[1].result()) - elif isinstance(a, Future): + if isinstance(a, Future): the_a = a.result() + elif isinstance(a, tuple): + the_a = a else: raise ValueError("Component state function has value that is not of type tuple or Future") @@ -82,30 +82,18 @@ def cons_function_component(function, return KleisliArrow(return_, get_bind_function()) -def cons_wire(schema_conv_function): +def cons_wire(wire_function): """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema.""" - def get_wire_bind_function(): - def wire_bind_function(a): - def wire_state_function(s): - new_a = schema_conv_function(a, s.state) - if isinstance(a, tuple): - futured_new_a = [None, None] - for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))): - if isinstance(a_new_a, Future): - futured_new_a[futured_new_a_idx] = a_new_a - else: - futured_new_a[futured_new_a_idx] = Future() - futured_new_a[futured_new_a_idx].set_result(a_new_a) - futured_new_a = tuple(futured_new_a) - else: - futured_new_a = Future() - futured_new_a.set_result(new_a) + def get_wire_function(conv_function): + def wire_function(a, s): + if isinstance(a, tuple): + raise ValueError("Wire has value that is a tuple") + new_a = conv_function(a, s) + return new_a - return (futured_new_a, s) - return State(wire_state_function) - return wire_bind_function + return wire_function - return KleisliArrow(return_, get_wire_bind_function()) + return cons_function_component(get_wire_function(wire_function)) def cons_dictionary_wire(conversions): @@ -118,20 +106,9 @@ def cons_split_wire(): def get_split_wire_bind_function(): def split_wire_bind_function(a): def split_wire_state_function(s): - new_a = [None, None] if isinstance(a, tuple): - for an_a, new_a_idx in zip(a, range(len(new_a))): - if isinstance(an_a, Future): - new_a[new_a_idx] = an_a - else: - assert False, "Tuple does not contain futures: %s" % str(a) - else: - new_a[0] = Future() - new_a[1] = Future() - new_a[0].set_result(a) - new_a[1].set_result(a) - - return (tuple(new_a), s) + raise ValueError("Split wire has a value that is a tuple") + return ((a, a), s) return State(split_wire_state_function) return split_wire_bind_function @@ -223,6 +200,7 @@ def __kleisli_wrapper(f): else: future = Future() future.set_result(input) + state_monad = KleisliArrow.runKleisli(pipeline, future) return f(state_monad, WrappedState(executor, state)) return wrapper diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py index f5f67c7..afd8cf4 100644 --- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py +++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py @@ -178,15 +178,6 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase): self.assertEquals(({'PI' : pi}, {'E' : e}), result) - def test_parallel_tuple_input_wire(self): - pi = 3.141 - e = 2.718 - value = ({'pi' : pi}, {'e' : e}) - pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']})) - result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) - self.assertEquals(({'pi' : pi}, {'e' : e}), result) - - def test_parallel_if(self): then_comp = cons_function_component(lambda a, s: {'z' : 'THEN'}) else_comp = cons_dictionary_wire({'c' : 'z'}) |