diff options
author | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-13 19:19:14 +0400 |
---|---|---|
committer | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-13 19:19:14 +0400 |
commit | 78754db34cfb91249b59bd9f64dbc63ce549c430 (patch) | |
tree | 835f4d85cb723b9e4c854e758388c53dd2e9b063 | |
parent | 87e36daa5b1f12f7f0d440e74160b096ff39f493 (diff) |
Fixed bugs in the parallel helpers.
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | src/pypeline/helpers/parallel_helpers.py | 86 | ||||
-rw-r--r-- | src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py | 33 |
3 files changed, 102 insertions, 19 deletions
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages setup( name = "pypeline", - version = "0.2.2", + version = "0.2.3", packages = find_packages("src", exclude = ["*tests"]), package_dir = {'': 'src'}, diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index a47e85a..1cba442 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -19,7 +19,13 @@ from concurrent.futures import Future from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit from pypeline.core.types.state import State, return_ -from pypeline.helpers import helpers +from pypeline.helpers.helpers import get_dictionary_conversion_function + + +# +# Monkey patch the Future class to support indexing +# +Future.__getitem__ = lambda f, key: (f.result())[key] class WrappedState(object): @@ -38,6 +44,11 @@ def cons_function_component(function, # Unpack state state = wrapped_state.state + print "Comp input: %s" % str(future) if isinstance(future, Future) \ + else "Comp input Not Future: %s" % str(future) + + assert isinstance(future, Future) + def do_transformation(a, s): # Transform the input transformed_a = input_forming_function(a, state) if input_forming_function else a @@ -51,11 +62,12 @@ def cons_function_component(function, return transformed_new_a # Execute - new_future = wrapped_state.executor.submit(do_transformation, future.result(), state) + new_future = wrapped_state.executor.submit(do_transformation, + future.result(), + state) # Mutate the state next_state = state_mutator(state) if state_mutator else state - # New value/state pair return (new_future, WrappedState(wrapped_state.executor, next_state)) return State(state_function) @@ -65,25 +77,51 @@ def cons_function_component(function, def cons_wire(schema_conv_function): """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema.""" - def get_wire_wrapper(inner_function): - def wire_wrapper(future, wrapper_state): - new_value = inner_function(future.result(), wrapper_state.state) - nf = Future() - nf.set_result(new_value) - return nf - return wire_wrapper - - return helpers.cons_wire(get_wire_wrapper(schema_conv_function)) + def wire_bind_function(a): + def wire_state_function(s): + new_a = schema_conv_function(a, s.state) + if isinstance(a, tuple): + futured_new_a = [None, None] + for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))): + if isinstance(a_new_a, Future): + futured_new_a[futured_new_a_idx] = a_new_a + else: + futured_new_a[futured_new_a_idx] = Future() + futured_new_a[futured_new_a_idx].set_result(a_new_a) + futured_new_a = tuple(futured_new_a) + else: + futured_new_a = Future() + futured_new_a.set_result(new_a) + + return (futured_new_a, s) + return State(wire_state_function) + + return KleisliArrow(return_, wire_bind_function) def cons_dictionary_wire(conversions): """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}.""" - return cons_wire(helpers.get_dictionary_conversion_function(conversions)) + return cons_wire(get_dictionary_conversion_function(conversions)) def cons_split_wire(): """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators.""" - return split(return_) + def split_func(a, s): + new_a = [None, None] + if isinstance(a, tuple): + for an_a, new_a_idx in zip(a, range(len(new_a))): + if isinstance(an_a, Future): + new_a[new_a_idx] = an_a + else: + assert False, "Tuple does not contain futures: %s" % str(a) + else: + new_a[0] = Future() + new_a[1] = Future() + new_a[0].set_result(a) + new_a[1].set_result(a) + + return tuple(new_a) + return cons_function_component(split_func) def cons_unsplit_wire(unsplit_function): @@ -100,11 +138,23 @@ def cons_unsplit_wire(unsplit_function): return unsplit(return_, get_unsplit_wrapper(unsplit_function)) +def __handle_output(o): + po = (o[0].result() if isinstance(o[0], Future) else o[0], + o[1].result() if isinstance(o[1], Future) else o[1]) if isinstance(o, tuple) \ + else o.result() + return po + + def __kleisli_wrapper(f): def wrapper(executor, pipeline, input, state): """Run, evaluate, or execute a pipeline.""" - future = Future() - future.set_result(input) + if isinstance(input, tuple): + future = (Future(), Future()) + for fu, v in zip(future, input): + fu.set_result(v) + else: + future = Future() + future.set_result(input) state_monad = KleisliArrow.runKleisli(pipeline, future) return f(state_monad, WrappedState(executor, state)) return wrapper @@ -113,13 +163,13 @@ def __kleisli_wrapper(f): @__kleisli_wrapper def run_pipeline(state_monad, state): output = State.runState(state_monad, state) - return (output[0].result(), output[1].state) + return (__handle_output(output[0]), output[1].state) @__kleisli_wrapper def eval_pipeline(state_monad, state): future = State.evalState(state_monad, state) - return future.result() + return __handle_output(future) @__kleisli_wrapper diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py index c16c892..a5354e3 100644 --- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py +++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py @@ -149,3 +149,36 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase): pipeline = cons_dictionary_wire({'pi' : 'PI', 'e' : 'E'}) result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) self.assertEquals({'PI' : 3.141, 'E' : 2.718}, result) + + + def test_parallel_split(self): + pi = 3.141 + value = {'pi' : pi} + pipeline = cons_function_component(lambda a, s: a) >> \ + cons_split_wire() >> \ + cons_function_component(lambda a, s: {'PI' : a['pi']}).first() >> \ + (cons_function_component(lambda a, s: {'PI' : a['PI']}) ** \ + cons_function_component(lambda a, s: a)) >> \ + cons_unsplit_wire(lambda t, b: {'PI' : t['PI'], 'pi' : b['pi']}) + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) + self.assertEquals({'PI' : pi, 'pi' : pi}, result) + + + def test_parallel_first_and_second(self): + pi = 3.141 + e = 2.718 + value = {'pi' : pi, 'e' : e} + pipeline = cons_split_wire() >> \ + (cons_dictionary_wire({'pi' : 'PI'}) >> cons_function_component(lambda a, s: {'PI' : a['PI']})).first() >> \ + (cons_dictionary_wire({'e' : 'E'}) >> cons_function_component(lambda a, s: {'E' : a['E']})).second() + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) + self.assertEquals(({'PI' : pi}, {'E' : e}), result) + + + def test_parallel_tuple_input_wire(self): + pi = 3.141 + e = 2.718 + value = ({'pi' : pi}, {'e' : e}) + pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']})) + result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline) + self.assertEquals(({'pi' : pi}, {'e' : e}), result) |