Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2013-05-22 18:34:30 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2013-05-22 18:34:30 +0400
commit6546ef1cb4e88fda1ac5479a5692fad7bd393eed (patch)
tree72288278a466ffc7dd0649c30a3332b556ab1b7b
parentc60c777b8c3702eea4b755ce040da6019a5df09d (diff)
Bug fixed in unsplit parallel helper function.
-rw-r--r--src/pypeline/helpers/parallel_helpers.py50
1 files changed, 25 insertions, 25 deletions
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index 2346d0d..10af618 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -82,26 +82,28 @@ def cons_function_component(function,
def cons_wire(schema_conv_function):
"""Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
- def wire_bind_function(a):
- def wire_state_function(s):
- new_a = schema_conv_function(a, s.state)
- if isinstance(a, tuple):
- futured_new_a = [None, None]
- for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))):
- if isinstance(a_new_a, Future):
- futured_new_a[futured_new_a_idx] = a_new_a
- else:
- futured_new_a[futured_new_a_idx] = Future()
- futured_new_a[futured_new_a_idx].set_result(a_new_a)
- futured_new_a = tuple(futured_new_a)
- else:
- futured_new_a = Future()
- futured_new_a.set_result(new_a)
+ def get_wire_bind_function():
+ def wire_bind_function(a):
+ def wire_state_function(s):
+ new_a = schema_conv_function(a, s.state)
+ if isinstance(a, tuple):
+ futured_new_a = [None, None]
+ for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))):
+ if isinstance(a_new_a, Future):
+ futured_new_a[futured_new_a_idx] = a_new_a
+ else:
+ futured_new_a[futured_new_a_idx] = Future()
+ futured_new_a[futured_new_a_idx].set_result(a_new_a)
+ futured_new_a = tuple(futured_new_a)
+ else:
+ futured_new_a = Future()
+ futured_new_a.set_result(new_a)
- return (futured_new_a, s)
- return State(wire_state_function)
+ return (futured_new_a, s)
+ return State(wire_state_function)
+ return wire_bind_function
- return KleisliArrow(return_, wire_bind_function)
+ return KleisliArrow(return_, get_wire_bind_function())
def cons_dictionary_wire(conversions):
@@ -132,15 +134,13 @@ def cons_split_wire():
def cons_unsplit_wire(unsplit_function):
"""Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
def get_unsplit_wrapper(inner_function):
- def unsplit_wrapper(top_future, bottom_future):
- t_val = top_future.result()
- b_val = bottom_future.result()
- nf = Future()
- nf.set_result(inner_function(t_val, b_val))
- return nf
+ def unsplit_wrapper(a, s):
+ top = a[0].result() if isinstance(a[0], Future) else a[0]
+ bottom = a[1].result() if isinstance(a[1], Future) else a[1]
+ return inner_function(top, bottom)
return unsplit_wrapper
- return unsplit(return_, get_unsplit_wrapper(unsplit_function))
+ return cons_function_component(get_unsplit_wrapper(unsplit_function))
def cons_if_component(condition_function, then_component, else_component):