Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2013-05-21 15:48:39 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2013-05-21 15:48:39 +0400
commitfb1f58f45810b4a158d92120a56e0d56694d5193 (patch)
tree62ab6d5e5d02b7f73e12cbfaa39475565224c644
parent98f78b5f29ba7e28f8a503828a74ff770c3844dc (diff)
Added a constructor for an If component for concurrent pipelines.
-rw-r--r--setup.py2
-rw-r--r--src/pypeline/helpers/parallel_helpers.py34
-rw-r--r--src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py21
3 files changed, 51 insertions, 6 deletions
diff --git a/setup.py b/setup.py
index 9a7b65b..0a92fd3 100644
--- a/setup.py
+++ b/setup.py
@@ -21,7 +21,7 @@ from setuptools import setup, find_packages
setup(
name = "pypeline",
- version = "0.2.4",
+ version = "0.2.5",
packages = find_packages("src", exclude = ["*tests"]),
package_dir = {'': 'src'},
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index 65d2d26..2346d0d 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -18,6 +18,8 @@
#
from concurrent.futures import Future
from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
+from pypeline.core.arrows.kleisli_arrow_choice import KleisliArrowChoice
+from pypeline.core.types.either import Left, Right
from pypeline.core.types.state import State, return_
from pypeline.helpers.helpers import get_dictionary_conversion_function
@@ -39,12 +41,18 @@ def cons_function_component(function,
output_forming_function = None,
state_mutator = None):
"""Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
- def bind_function(future):
+ def bind_function(bind_a):
def state_function(wrapped_state):
# Unpack state
state = wrapped_state.state
- assert isinstance(future, Future), "Future is not a future! Strange."
+ # Handle input
+ if isinstance(bind_a, tuple):
+ the_a = (bind_a[0].result(), bind_a[1].result())
+ elif isinstance(bind_a, Future):
+ the_a = bind_a.result()
+ else:
+ the_a = bind_a
def do_transformation(a, s):
# Transform the input
@@ -60,7 +68,7 @@ def cons_function_component(function,
# Execute
new_future = wrapped_state.executor.submit(do_transformation,
- future.result(),
+ the_a,
state)
# Mutate the state
@@ -135,6 +143,26 @@ def cons_unsplit_wire(unsplit_function):
return unsplit(return_, get_unsplit_wrapper(unsplit_function))
+def cons_if_component(condition_function, then_component, else_component):
+ """Construct a conditional execution component. If the conditional function evaluates to true the 'then' component is executed. Otherwise, the 'else' component is executed. Returns a Kleisli arrow."""
+ def get_left_right_wrapper():
+ def left_right_bind_function(futured_tuple):
+ def left_right_state_function(s):
+ condition_result = futured_tuple[0].result()
+ a = futured_tuple[1].result()
+ new_a = Left(a) if condition_result else Right(a)
+ return (new_a, s)
+ return State(left_right_state_function)
+ return left_right_bind_function
+
+ test = (cons_function_component(condition_function) & cons_function_component(lambda a, s: a)) >> \
+ KleisliArrow(return_, get_left_right_wrapper())
+ if_comp = test >> (KleisliArrowChoice(return_, then_component._func) | \
+ KleisliArrowChoice(return_, else_component._func))
+
+ return if_comp
+
+
def __handle_output(o):
po = (o[0].result() if isinstance(o[0], Future) else o[0],
o[1].result() if isinstance(o[1], Future) else o[1]) if isinstance(o, tuple) \
diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
index a5354e3..f5f67c7 100644
--- a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
@@ -33,6 +33,7 @@ from pypeline.helpers.parallel_helpers import cons_function_component, \
cons_dictionary_wire, \
cons_split_wire, \
cons_unsplit_wire, \
+ cons_if_component, \
run_pipeline, \
eval_pipeline, \
exec_pipeline
@@ -42,8 +43,10 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase):
@staticmethod
def test(no_workers, pipeline, input, state, run_function = run_pipeline):
executor = ThreadPoolExecutor(max_workers = no_workers)
- result = run_function(executor, pipeline, input, state)
- executor.shutdown(True)
+ try:
+ result = run_function(executor, pipeline, input, state)
+ finally:
+ executor.shutdown(True)
return result
@@ -182,3 +185,17 @@ class ParallelPypelineHelperUnitTest(unittest.TestCase):
pipeline = cons_wire(lambda t, s: ({'pi' : t[0]['pi']}, {'e' : t[1]['e']}))
result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
self.assertEquals(({'pi' : pi}, {'e' : e}), result)
+
+
+ def test_parallel_if(self):
+ then_comp = cons_function_component(lambda a, s: {'z' : 'THEN'})
+ else_comp = cons_dictionary_wire({'c' : 'z'})
+ pipeline = cons_if_component(lambda a, s: a['a'] == True, then_comp, else_comp)
+
+ value = {'a' : True, 'b' : 'then', 'c' : 'else'}
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
+ self.assertEquals({'z' : 'THEN'}, result)
+
+ value = {'a' : False, 'b' : 'then', 'c' : 'else'}
+ result = ParallelPypelineHelperUnitTest.test(1, pipeline, value, None, eval_pipeline)
+ self.assertEquals({'z' : 'else'}, result)