From da862ae23d4dac69833250ceabed029cf1917418 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Tue, 4 Jun 2013 17:29:01 +0100 Subject: Concurrent function components wait for futures in the executor thread pool. --- src/pypeline/helpers/parallel_helpers.py | 109 ++++++++++++++++--------------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py index 10af618..9084b1b 100644 --- a/src/pypeline/helpers/parallel_helpers.py +++ b/src/pypeline/helpers/parallel_helpers.py @@ -41,43 +41,45 @@ def cons_function_component(function, output_forming_function = None, state_mutator = None): """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned.""" - def bind_function(bind_a): - def state_function(wrapped_state): - # Unpack state - state = wrapped_state.state - - # Handle input - if isinstance(bind_a, tuple): - the_a = (bind_a[0].result(), bind_a[1].result()) - elif isinstance(bind_a, Future): - the_a = bind_a.result() - else: - the_a = bind_a - - def do_transformation(a, s): - # Transform the input - transformed_a = input_forming_function(a, state) if input_forming_function else a - - # Apply - new_a = function(transformed_a, state) - - # Transform the output of the function - transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a - - return transformed_new_a - - # Execute - new_future = wrapped_state.executor.submit(do_transformation, - the_a, - state) - - # Mutate the state - next_state = state_mutator(state) if state_mutator else state - # New value/state pair - return (new_future, WrappedState(wrapped_state.executor, next_state)) - return State(state_function) - - return KleisliArrow(return_, bind_function) + def get_bind_function(): + def bind_function(bind_a): + def state_function(wrapped_state): + # Unpack state + state = wrapped_state.state + + def do_transformation(a, s): + # Handle input + if isinstance(a, tuple): + the_a = (a[0].result(), a[1].result()) + elif isinstance(a, Future): + the_a = a.result() + else: + the_a = a + + # Transform the input + transformed_a = input_forming_function(the_a, state) if input_forming_function else the_a + + # Apply + new_a = function(transformed_a, state) + + # Transform the output of the function + transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a + + return transformed_new_a + + # Execute + new_future = wrapped_state.executor.submit(do_transformation, + bind_a, + state) + + # Mutate the state + next_state = state_mutator(state) if state_mutator else state + # New value/state pair + return (new_future, WrappedState(wrapped_state.executor, next_state)) + return State(state_function) + return bind_function + + return KleisliArrow(return_, get_bind_function()) def cons_wire(schema_conv_function): @@ -113,22 +115,27 @@ def cons_dictionary_wire(conversions): def cons_split_wire(): """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators.""" - def split_func(a, s): - new_a = [None, None] - if isinstance(a, tuple): - for an_a, new_a_idx in zip(a, range(len(new_a))): - if isinstance(an_a, Future): - new_a[new_a_idx] = an_a + def get_split_wire_bind_function(): + def split_wire_bind_function(a): + def split_wire_state_function(s): + new_a = [None, None] + if isinstance(a, tuple): + for an_a, new_a_idx in zip(a, range(len(new_a))): + if isinstance(an_a, Future): + new_a[new_a_idx] = an_a + else: + assert False, "Tuple does not contain futures: %s" % str(a) else: - assert False, "Tuple does not contain futures: %s" % str(a) - else: - new_a[0] = Future() - new_a[1] = Future() - new_a[0].set_result(a) - new_a[1].set_result(a) + new_a[0] = Future() + new_a[1] = Future() + new_a[0].set_result(a) + new_a[1].set_result(a) + + return (tuple(new_a), s) + return State(split_wire_state_function) + return split_wire_bind_function - return tuple(new_a) - return cons_function_component(split_func) + return KleisliArrow(return_, get_split_wire_bind_function()) def cons_unsplit_wire(unsplit_function): -- cgit v1.2.3