Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2013-06-10 19:44:04 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2013-06-10 19:44:04 +0400
commitc4fe73dd6b3c50f18ff285fe584ca19740fe3257 (patch)
tree718dd35243677ccdfd2109139c793275fc399fdd
parent6036e42acb37f75f08f3df997e8b4034752d8a48 (diff)
Bug fixed the concurrent helper module. Simpler, and now hopefully working.
-rw-r--r--setup.py2
-rw-r--r--src/pypeline/helpers/parallel_helpers.py52
-rw-r--r--src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py9
3 files changed, 16 insertions, 47 deletions
diff --git a/setup.py b/setup.py
index be0f887..641ce72 100644
--- a/setup.py
+++ b/setup.py
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages
setup(
name = "pypeline",
- version = "0.2.6",
+ version = "0.2.7",
packages = find_packages("src", exclude = ["*tests"]),
package_dir = {'': 'src'},
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index 315a565..8d79b4a 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -49,10 +49,10 @@ def cons_function_component(function,
def do_transformation(a, s):
# Handle input
- if isinstance(a, tuple):
- the_a = (a[0].result(), a[1].result())
- elif isinstance(a, Future):
+ if isinstance(a, Future):
the_a = a.result()
+ elif isinstance(a, tuple):
+ the_a = a
else:
raise ValueError("Component state function has value that is not of type tuple or Future")
@@ -82,30 +82,18 @@ def cons_function_component(function,
return KleisliArrow(return_, get_bind_function())
-def cons_wire(schema_conv_function):
+def cons_wire(wire_function):
"""Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
- def get_wire_bind_function():
- def wire_bind_function(a):
- def wire_state_function(s):
- new_a = schema_conv_function(a, s.state)
- if isinstance(a, tuple):
- futured_new_a = [None, None]
- for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))):
- if isinstance(a_new_a, Future):
- futured_new_a[futured_new_a_idx] = a_new_a
- else:
- futured_new_a[futured_new_a_idx] = Future()
- futured_new_a[futured_new_a_idx].set_result(a_new_a)
- futured_new_a = tuple(futured_new_a)
- else:
- futured_new_a = Future()
- futured_new_a.set_result(new_a)
+ def get_wire_function(conv_function):
+ def wire_function(a, s):
+ if isinstance(a, tuple):
+ raise ValueError("Wire has value that is a tuple")
+ new_a = conv_function(a, s)
+ return new_a
- return (futured_new_a, s)
- return State(wire_state_function)
- return wire_bind_function
+ return wire_function
- return KleisliArrow(return_, get_wire_bind_function())
+ return cons_function_component(get_wire_function(wire_function))
def cons_dictionary_wire(conversions):
@@ -118,20 +106,9 @@ def cons_split_wire():
def get_split_wire_bind_function():
def split_wire_bind_function(a):
def split_wire_state_function(s):
- new_a = [None, None]
if isinstance(a, tuple):
- for an_a, new_a_idx in zip(a, range(len(new_a))):
- if isinstance(an_a, Future):
- new_a[new_a_idx] = an_a
- else:
- assert False, "Tuple does not contain futures: %s" % str(a)
- else:
- new_a[0] = Future()
- new_a[1] = Future()
- new_a[0].set_result(a)
- new_a[1].set_result(a)
-
- return (tuple(new_a), s)
+ raise ValueError("Split wire has a value that is a tuple")
+ return ((a, a), s)
return State(split_wire_state_function)
return split_wire_bind_function
@@ -223,6 +200,7 @@ def __kleisli_wrapper(f):
else:
future = Future()
future.set_result(input)
+
state_monad = KleisliArrow.runKleisli(pipeline, future)
return f(state_monad, WrappedState(executor, state))
return wrapper
diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
index f5f67c7..afd8cf4 100644
--- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
@@ -178,15 +178,6 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase):
self.assertEquals(({'PI' : pi}, {'E' : e}), result)
- def test_parallel_tuple_input_wire(self):
- pi = 3.141
- e = 2.718
- value = ({'pi' : pi}, {'e' : e})
- pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']}))
- result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
- self.assertEquals(({'pi' : pi}, {'e' : e}), result)
-
-
def test_parallel_if(self):
then_comp = cons_function_component(lambda a, s: {'z' : 'THEN'})
else_comp = cons_dictionary_wire({'c' : 'z'})