Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2013-05-13 19:19:14 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2013-05-13 19:19:14 +0400
commit78754db34cfb91249b59bd9f64dbc63ce549c430 (patch)
tree835f4d85cb723b9e4c854e758388c53dd2e9b063
parent87e36daa5b1f12f7f0d440e74160b096ff39f493 (diff)
Fixed bugs in the parallel helpers.
-rw-r--r--setup.py2
-rw-r--r--src/pypeline/helpers/parallel_helpers.py86
-rw-r--r--src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py33
3 files changed, 102 insertions, 19 deletions
diff --git a/setup.py b/setup.py
index 52902ed..13c92c0 100644
--- a/setup.py
+++ b/setup.py
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages
setup(
name = "pypeline",
- version = "0.2.2",
+ version = "0.2.3",
packages = find_packages("src", exclude = ["*tests"]),
package_dir = {'': 'src'},
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index a47e85a..1cba442 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -19,7 +19,13 @@
from concurrent.futures import Future
from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.types.state import State, return_
-from pypeline.helpers import helpers
+from pypeline.helpers.helpers import get_dictionary_conversion_function
+
+
+#
+# Monkey patch the Future class to support indexing
+#
+Future.__getitem__ = lambda f, key: (f.result())[key]
class WrappedState(object):
@@ -38,6 +44,11 @@ def cons_function_component(function,
# Unpack state
state = wrapped_state.state
+ print "Comp input: %s" % str(future) if isinstance(future, Future) \
+ else "Comp input Not Future: %s" % str(future)
+
+ assert isinstance(future, Future)
+
def do_transformation(a, s):
# Transform the input
transformed_a = input_forming_function(a, state) if input_forming_function else a
@@ -51,11 +62,12 @@ def cons_function_component(function,
return transformed_new_a
# Execute
- new_future = wrapped_state.executor.submit(do_transformation, future.result(), state)
+ new_future = wrapped_state.executor.submit(do_transformation,
+ future.result(),
+ state)
# Mutate the state
next_state = state_mutator(state) if state_mutator else state
-
# New value/state pair
return (new_future, WrappedState(wrapped_state.executor, next_state))
return State(state_function)
@@ -65,25 +77,51 @@ def cons_function_component(function,
def cons_wire(schema_conv_function):
"""Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
- def get_wire_wrapper(inner_function):
- def wire_wrapper(future, wrapper_state):
- new_value = inner_function(future.result(), wrapper_state.state)
- nf = Future()
- nf.set_result(new_value)
- return nf
- return wire_wrapper
-
- return helpers.cons_wire(get_wire_wrapper(schema_conv_function))
+ def wire_bind_function(a):
+ def wire_state_function(s):
+ new_a = schema_conv_function(a, s.state)
+ if isinstance(a, tuple):
+ futured_new_a = [None, None]
+ for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))):
+ if isinstance(a_new_a, Future):
+ futured_new_a[futured_new_a_idx] = a_new_a
+ else:
+ futured_new_a[futured_new_a_idx] = Future()
+ futured_new_a[futured_new_a_idx].set_result(a_new_a)
+ futured_new_a = tuple(futured_new_a)
+ else:
+ futured_new_a = Future()
+ futured_new_a.set_result(new_a)
+
+ return (futured_new_a, s)
+ return State(wire_state_function)
+
+ return KleisliArrow(return_, wire_bind_function)
def cons_dictionary_wire(conversions):
"""Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
- return cons_wire(helpers.get_dictionary_conversion_function(conversions))
+ return cons_wire(get_dictionary_conversion_function(conversions))
def cons_split_wire():
"""Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
- return split(return_)
+ def split_func(a, s):
+ new_a = [None, None]
+ if isinstance(a, tuple):
+ for an_a, new_a_idx in zip(a, range(len(new_a))):
+ if isinstance(an_a, Future):
+ new_a[new_a_idx] = an_a
+ else:
+ assert False, "Tuple does not contain futures: %s" % str(a)
+ else:
+ new_a[0] = Future()
+ new_a[1] = Future()
+ new_a[0].set_result(a)
+ new_a[1].set_result(a)
+
+ return tuple(new_a)
+ return cons_function_component(split_func)
def cons_unsplit_wire(unsplit_function):
@@ -100,11 +138,23 @@ def cons_unsplit_wire(unsplit_function):
return unsplit(return_, get_unsplit_wrapper(unsplit_function))
+def __handle_output(o):
+ po = (o[0].result() if isinstance(o[0], Future) else o[0],
+ o[1].result() if isinstance(o[1], Future) else o[1]) if isinstance(o, tuple) \
+ else o.result()
+ return po
+
+
def __kleisli_wrapper(f):
def wrapper(executor, pipeline, input, state):
"""Run, evaluate, or execute a pipeline."""
- future = Future()
- future.set_result(input)
+ if isinstance(input, tuple):
+ future = (Future(), Future())
+ for fu, v in zip(future, input):
+ fu.set_result(v)
+ else:
+ future = Future()
+ future.set_result(input)
state_monad = KleisliArrow.runKleisli(pipeline, future)
return f(state_monad, WrappedState(executor, state))
return wrapper
@@ -113,13 +163,13 @@ def __kleisli_wrapper(f):
@__kleisli_wrapper
def run_pipeline(state_monad, state):
output = State.runState(state_monad, state)
- return (output[0].result(), output[1].state)
+ return (__handle_output(output[0]), output[1].state)
@__kleisli_wrapper
def eval_pipeline(state_monad, state):
future = State.evalState(state_monad, state)
- return future.result()
+ return __handle_output(future)
@__kleisli_wrapper
diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
index c16c892..a5354e3 100644
--- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
@@ -149,3 +149,36 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase):
pipeline = cons_dictionary_wire({'pi' : 'PI', 'e' : 'E'})
result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
self.assertEquals({'PI' : 3.141, 'E' : 2.718}, result)
+
+
+ def test_parallel_split(self):
+ pi = 3.141
+ value = {'pi' : pi}
+ pipeline = cons_function_component(lambda a, s: a) >> \
+ cons_split_wire() >> \
+ cons_function_component(lambda a, s: {'PI' : a['pi']}).first() >> \
+ (cons_function_component(lambda a, s: {'PI' : a['PI']}) ** \
+ cons_function_component(lambda a, s: a)) >> \
+ cons_unsplit_wire(lambda t, b: {'PI' : t['PI'], 'pi' : b['pi']})
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
+ self.assertEquals({'PI' : pi, 'pi' : pi}, result)
+
+
+ def test_parallel_first_and_second(self):
+ pi = 3.141
+ e = 2.718
+ value = {'pi' : pi, 'e' : e}
+ pipeline = cons_split_wire() >> \
+ (cons_dictionary_wire({'pi' : 'PI'}) >> cons_function_component(lambda a, s: {'PI' : a['PI']})).first() >> \
+ (cons_dictionary_wire({'e' : 'E'}) >> cons_function_component(lambda a, s: {'E' : a['E']})).second()
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
+ self.assertEquals(({'PI' : pi}, {'E' : e}), result)
+
+
+ def test_parallel_tuple_input_wire(self):
+ pi = 3.141
+ e = 2.718
+ value = ({'pi' : pi}, {'e' : e})
+ pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']}))
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
+ self.assertEquals(({'pi' : pi}, {'e' : e}), result)