diff options
author | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-21 15:48:39 +0400 |
---|---|---|
committer | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-21 15:48:39 +0400 |
commit | fb1f58f45810b4a158d92120a56e0d56694d5193 (patch) | |
tree | 62ab6d5e5d02b7f73e12cbfaa39475565224c644 | |
parent | 98f78b5f29ba7e28f8a503828a74ff770c3844dc (diff) |
Added a constructor for an If component for concurrent pipelines.
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | src/pypeline/helpers/parallel_helpers.py | 34 | ||||
-rw-r--r-- | src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py | 21 |
3 files changed, 51 insertions, 6 deletions
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages setup( name = "pypeline", - version = "0.2.4", + version = "0.2.5", packages = find_packages("src", exclude = ["*tests"]), package_dir = {'': 'src'}, diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index 65d2d26..2346d0d 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -18,6 +18,8 @@ # from concurrent.futures import Future from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit +from pypeline.core.arrows.kleisli_arrow_choice import KleisliArrowChoice +from pypeline.core.types.either import Left, Right from pypeline.core.types.state import State, return_ from pypeline.helpers.helpers import get_dictionary_conversion_function @@ -39,12 +41,18 @@ def cons_function_component(function, output_forming_function = None, state_mutator = None): """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned.""" - def bind_function(future): + def bind_function(bind_a): def state_function(wrapped_state): # Unpack state state = wrapped_state.state - assert isinstance(future, Future), "Future is not a future! Strange." + # Handle input + if isinstance(bind_a, tuple): + the_a = (bind_a[0].result(), bind_a[1].result()) + elif isinstance(bind_a, Future): + the_a = bind_a.result() + else: + the_a = bind_a def do_transformation(a, s): # Transform the input @@ -60,7 +68,7 @@ def cons_function_component(function, # Execute new_future = wrapped_state.executor.submit(do_transformation, - future.result(), + the_a, state) # Mutate the state @@ -135,6 +143,26 @@ def cons_unsplit_wire(unsplit_function): return unsplit(return_, get_unsplit_wrapper(unsplit_function)) +def cons_if_component(condition_function, then_component, else_component): + """Construct a conditional execution component. If the conditional function evaluates to true the 'then' component is executed. Otherwise, the 'else' component is executed. Returns a Kleisli arrow.""" + def get_left_right_wrapper(): + def left_right_bind_function(futured_tuple): + def left_right_state_function(s): + condition_result = futured_tuple[0].result() + a = futured_tuple[1].result() + new_a = Left(a) if condition_result else Right(a) + return (new_a, s) + return State(left_right_state_function) + return left_right_bind_function + + test = (cons_function_component(condition_function) & cons_function_component(lambda a, s: a)) >> \ + KleisliArrow(return_, get_left_right_wrapper()) + if_comp = test >> (KleisliArrowChoice(return_, then_component._func) | \ + KleisliArrowChoice(return_, else_component._func)) + + return if_comp + + def __handle_output(o): po = (o[0].result() if isinstance(o[0], Future) else o[0], o[1].result() if isinstance(o[1], Future) else o[1]) if isinstance(o, tuple) \ diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py index a5354e3..f5f67c7 100644 --- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py +++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py @@ -33,6 +33,7 @@ from pypeline.helpers.parallel_helpers import cons_function_component, \ cons_dictionary_wire, \ cons_split_wire, \ cons_unsplit_wire, \ + cons_if_component, \ run_pipeline, \ eval_pipeline, \ exec_pipeline @@ -42,8 +43,10 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase): @staticmethod def test(no_workers, pipeline, input, state, run_function = run_pipeline): executor = ThreadPoolExecutor(max_workers = no_workers) - result = run_function(executor, pipeline, input, state) - executor.shutdown(True) + try: + result = run_function(executor, pipeline, input, state) + finally: + executor.shutdown(True) return result @@ -182,3 +185,17 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase): pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']})) result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) self.assertEquals(({'pi' : pi}, {'e' : e}), result) + + + def test_parallel_if(self): + then_comp = cons_function_component(lambda a, s: {'z' : 'THEN'}) + else_comp = cons_dictionary_wire({'c' : 'z'}) + pipeline = cons_if_component(lambda a, s: a['a'] == True, then_comp, else_comp) + + value = {'a' : True, 'b' : 'then', 'c' : 'else'} + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) + self.assertEquals({'z' : 'THEN'}, result) + + value = {'a' : False, 'b' : 'then', 'c' : 'else'} + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) + self.assertEquals({'z' : 'else'}, result) |