diff options
author | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-22 18:34:30 +0400 |
---|---|---|
committer | Ian Johnson <ian.johnson@appliedlanguage.com> | 2013-05-22 18:34:30 +0400 |
commit | 6546ef1cb4e88fda1ac5479a5692fad7bd393eed (patch) | |
tree | 72288278a466ffc7dd0649c30a3332b556ab1b7b | |
parent | c60c777b8c3702eea4b755ce040da6019a5df09d (diff) |
Bug fixed in unsplit parallel helper function.
-rw-r--r-- | src/pypeline/helpers/parallel_helpers.py | 50 |
1 files changed, 25 insertions, 25 deletions
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index 2346d0d..10af618 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -82,26 +82,28 @@ def cons_function_component(function, def cons_wire(schema_conv_function): """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema.""" - def wire_bind_function(a): - def wire_state_function(s): - new_a = schema_conv_function(a, s.state) - if isinstance(a, tuple): - futured_new_a = [None, None] - for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))): - if isinstance(a_new_a, Future): - futured_new_a[futured_new_a_idx] = a_new_a - else: - futured_new_a[futured_new_a_idx] = Future() - futured_new_a[futured_new_a_idx].set_result(a_new_a) - futured_new_a = tuple(futured_new_a) - else: - futured_new_a = Future() - futured_new_a.set_result(new_a) + def get_wire_bind_function(): + def wire_bind_function(a): + def wire_state_function(s): + new_a = schema_conv_function(a, s.state) + if isinstance(a, tuple): + futured_new_a = [None, None] + for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))): + if isinstance(a_new_a, Future): + futured_new_a[futured_new_a_idx] = a_new_a + else: + futured_new_a[futured_new_a_idx] = Future() + futured_new_a[futured_new_a_idx].set_result(a_new_a) + futured_new_a = tuple(futured_new_a) + else: + futured_new_a = Future() + futured_new_a.set_result(new_a) - return (futured_new_a, s) - return State(wire_state_function) + return (futured_new_a, s) + return State(wire_state_function) + return wire_bind_function - return KleisliArrow(return_, wire_bind_function) + return KleisliArrow(return_, get_wire_bind_function()) def cons_dictionary_wire(conversions): @@ -132,15 +134,13 @@ def cons_split_wire(): def cons_unsplit_wire(unsplit_function): """Construct a wire that takes a pair and applies a function to this pair to combine them into one value.""" def get_unsplit_wrapper(inner_function): - def unsplit_wrapper(top_future, bottom_future): - t_val = top_future.result() - b_val = bottom_future.result() - nf = Future() - nf.set_result(inner_function(t_val, b_val)) - return nf + def unsplit_wrapper(a, s): + top = a[0].result() if isinstance(a[0], Future) else a[0] + bottom = a[1].result() if isinstance(a[1], Future) else a[1] + return inner_function(top, bottom) return unsplit_wrapper - return unsplit(return_, get_unsplit_wrapper(unsplit_function)) + return cons_function_component(get_unsplit_wrapper(unsplit_function)) def cons_if_component(condition_function, then_component, else_component): |