Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2013-06-04 20:29:01 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2013-06-04 20:29:01 +0400
commitda862ae23d4dac69833250ceabed029cf1917418 (patch)
treeae6f305e7320b4142a6a7b76a8d4cb3273c2faaa
parent7b23ee97aef45635ff275167ac3b690b82dc510b (diff)
Concurrent function components wait for futures in the executor thread pool.
-rw-r--r--src/pypeline/helpers/parallel_helpers.py109
1 files changed, 58 insertions, 51 deletions
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index 10af618..9084b1b 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -41,43 +41,45 @@ def cons_function_component(function,
output_forming_function = None,
state_mutator = None):
"""Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
- def bind_function(bind_a):
- def state_function(wrapped_state):
- # Unpack state
- state = wrapped_state.state
-
- # Handle input
- if isinstance(bind_a, tuple):
- the_a = (bind_a[0].result(), bind_a[1].result())
- elif isinstance(bind_a, Future):
- the_a = bind_a.result()
- else:
- the_a = bind_a
-
- def do_transformation(a, s):
- # Transform the input
- transformed_a = input_forming_function(a, state) if input_forming_function else a
-
- # Apply
- new_a = function(transformed_a, state)
-
- # Transform the output of the function
- transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a
-
- return transformed_new_a
-
- # Execute
- new_future = wrapped_state.executor.submit(do_transformation,
- the_a,
- state)
-
- # Mutate the state
- next_state = state_mutator(state) if state_mutator else state
- # New value/state pair
- return (new_future, WrappedState(wrapped_state.executor, next_state))
- return State(state_function)
-
- return KleisliArrow(return_, bind_function)
+ def get_bind_function():
+ def bind_function(bind_a):
+ def state_function(wrapped_state):
+ # Unpack state
+ state = wrapped_state.state
+
+ def do_transformation(a, s):
+ # Handle input
+ if isinstance(a, tuple):
+ the_a = (a[0].result(), a[1].result())
+ elif isinstance(a, Future):
+ the_a = a.result()
+ else:
+ the_a = a
+
+ # Transform the input
+ transformed_a = input_forming_function(the_a, state) if input_forming_function else the_a
+
+ # Apply
+ new_a = function(transformed_a, state)
+
+ # Transform the output of the function
+ transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a
+
+ return transformed_new_a
+
+ # Execute
+ new_future = wrapped_state.executor.submit(do_transformation,
+ bind_a,
+ state)
+
+ # Mutate the state
+ next_state = state_mutator(state) if state_mutator else state
+ # New value/state pair
+ return (new_future, WrappedState(wrapped_state.executor, next_state))
+ return State(state_function)
+ return bind_function
+
+ return KleisliArrow(return_, get_bind_function())
def cons_wire(schema_conv_function):
@@ -113,22 +115,27 @@ def cons_dictionary_wire(conversions):
def cons_split_wire():
"""Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
- def split_func(a, s):
- new_a = [None, None]
- if isinstance(a, tuple):
- for an_a, new_a_idx in zip(a, range(len(new_a))):
- if isinstance(an_a, Future):
- new_a[new_a_idx] = an_a
+ def get_split_wire_bind_function():
+ def split_wire_bind_function(a):
+ def split_wire_state_function(s):
+ new_a = [None, None]
+ if isinstance(a, tuple):
+ for an_a, new_a_idx in zip(a, range(len(new_a))):
+ if isinstance(an_a, Future):
+ new_a[new_a_idx] = an_a
+ else:
+ assert False, "Tuple does not contain futures: %s" % str(a)
else:
- assert False, "Tuple does not contain futures: %s" % str(a)
- else:
- new_a[0] = Future()
- new_a[1] = Future()
- new_a[0].set_result(a)
- new_a[1].set_result(a)
+ new_a[0] = Future()
+ new_a[1] = Future()
+ new_a[0].set_result(a)
+ new_a[1].set_result(a)
+
+ return (tuple(new_a), s)
+ return State(split_wire_state_function)
+ return split_wire_bind_function
- return tuple(new_a)
- return cons_function_component(split_func)
+ return KleisliArrow(return_, get_split_wire_bind_function())
def cons_unsplit_wire(unsplit_function):