Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2012-09-11 13:01:57 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2012-09-11 13:01:57 +0400
commit918d347b6fed69176322eccc15b3bb560c2b7e3b (patch)
treebafc2ede13de326d951dc01d0379c0790ff8383b
parent9e62254aa52fe8fe6b7e59d8d186f8a08ebb8281 (diff)
parenta648c962cbd20c6bff948b6ae716aba03e8a252d (diff)
Merge branch 'master' of https://github.com/ianj-als/pypeline
-rw-r--r--README.md44
-rw-r--r--src/pypeline/core/arrows/kleisli_arrow.py47
-rw-r--r--src/pypeline/core/arrows/tests/kleisli_arrow_tests.py154
-rw-r--r--src/pypeline/core/types/just.py5
-rw-r--r--src/pypeline/core/types/monad.py3
-rw-r--r--src/pypeline/core/types/nothing.py5
-rw-r--r--src/pypeline/core/types/state.py5
-rw-r--r--src/pypeline/helpers/helpers.py68
-rw-r--r--src/pypeline/helpers/tests/helper_pipeline_tests.py60
9 files changed, 372 insertions, 19 deletions
diff --git a/README.md b/README.md
index 30fa8da..c418a6e 100644
--- a/README.md
+++ b/README.md
@@ -111,6 +111,22 @@ The output from the previous component is applied to the input forming function
The state mutator function shall take one argument, the state object, and return a mutated state object if desired. If no state mutator function is specified the state flows through the component unchanged.
+#### Constructing a Batch Subprocess Pipeline Component
+
+ helpers.cons_batch_subprocess_component(process_pipe,
+ input_generator_function,
+ output_function,
+ state_mutator = None)
+
+Construct a pipeline component whose computation requires many values to be sent to the sub-process. An input generator function is required that shall provide the values for the computation. This function shall be a generator that takes two arguments: the value, and the state. This function shall yield objects, that once "stringyfied", shall be sent, as one line, to the `stdin` of the sub-process. The `stdout` of the sub-process is ignored.
+
+The output function generates the value that shall be passed to the subsequent pipeline component. This function shall take two arguments: the input value to the components, and the state object.
+
+ input_feed_function :: a -> s -> b
+ output_function :: a -> s -> c
+
+The state mutator function shall take one argument, the state object, and return a mutated state of object if desired. If no state mutator function is specified the state flows through the component unchanged.
+
### Wire Functions
#### Constructing a Function Based Wire
@@ -125,8 +141,36 @@ Construct a wire based on a function. The function should take two arguments: th
Construct a wire based on a *conversion* dictionary. Assuming that dictionaries are used as values passed through a pipeline, or pipeline component, a dictionary based wire can be used. The dictionary, whose keys are the keys in the previous component's output are mapped to the conversion dictionary's values that are the keys of the next stage input dictionary.
+#### Constructing a Split Wire
+
+ helpers.cons_split_wire()
+
+Constructs a wire that splits a single input into a pair.
+
+#### Constructing an Unsplit Wire
+
+ helpers.cons_unsplit_wire(unsplit_function)
+
+Constructs a wire that takes a pair and combines them into a single value specified by the `unsplit_function`. The unsplit function takes two arguments: the top and bottom values.
+
+ unsplit_function :: b -> c -> d
+
#### Wire Up Two Pipeline Components
helpers.wire_components(component_one, component_two, wire)
Returns a pipeline component that is the composition of two components with a wire between them.
+
+### Component Composition Functions
+
+#### Constructing a Composed Component
+
+ helpers.cons_composed_component(first_component, second_component)
+
+Returns a components that is the composition of the `first_component` and the `second_component`.
+
+#### Constructing a Parallel Component
+
+ helpers.cons_parallel_component(top_component, bottom_component)
+
+Returns a component that will execute the two provided components in parallel. The input to the constructed component is a pair, whose first value is applied to the `top_component` and the second value is applied to the `bottom_component`. The constructed component`s output shall be a pair, whose first value is the output of the top component, and the second value is the output of the bottom component.
diff --git a/src/pypeline/core/arrows/kleisli_arrow.py b/src/pypeline/core/arrows/kleisli_arrow.py
index c08b953..aed24eb 100644
--- a/src/pypeline/core/arrows/kleisli_arrow.py
+++ b/src/pypeline/core/arrows/kleisli_arrow.py
@@ -45,10 +45,7 @@ class KleisliArrow(Arrow):
# arr f = K(\b ->return(f b))
def arr(self, f):
- ka = KleisliArrow(self._patcher, None)
- ka._func = lambda b: ka._patcher(f(b))
-
- return ka
+ return KleisliArrow(self._patcher, lambda b: self._patcher(f(b)))
# K f >>> K g = K(\b -> f b >>= g)
def __rshift__(self, other):
@@ -57,20 +54,50 @@ class KleisliArrow(Arrow):
return KleisliArrow(other._patcher, lambda b: self._func(b) >= other._func)
+ # K f (***) K g = first K f >>> second K g
+ def __pow__(self, other):
+ if not isinstance(other, KleisliArrow):
+ raise ValueError("Must be an KleisliArrow")
+
+ return self.first() >> other.second()
+
+ # K f &&& K g = K $ \a -> do
+ # b <- f a
+ # c <- g a
+ # return (b,c)
+ def __and__(self, other):
+ if not isinstance(other, KleisliArrow):
+ raise ValueError("Must be an KleisliArrow")
+
+ func = lambda a: self._func(a) >= (lambda b: other._func(a) >= (lambda c: other._patcher((b, c))))
+ return KleisliArrow(other._patcher, func)
+
# first (K f) = K(\(b, d) -> f b >>= \c -> return (c, d))
def first(self):
- ka = KleisliArrow(self._patcher, None)
- ka._func = lambda t: self._func(t[0]) >= (lambda c: self._patcher((c, t[1])))
- return ka
+ func = lambda t: self._func(t[0]) >= (lambda c: self._patcher((c, t[1])))
+ return KleisliArrow(self._patcher, func)
# second (K f) = K(\(d, b) -> f b >>= \c -> return (d, c))
def second(self):
- ka = KleisliArrow(self._patcher, None)
- ka._func = lambda t: self._func(t[1]) >= (lambda c: self._patcher((t[0], c)))
- return ka
+ func = lambda t: self._func(t[1]) >= (lambda c: self._patcher((t[0], c)))
+ return KleisliArrow(self._patcher, func)
@staticmethod
def runKleisli(k, a):
if not isinstance(k, KleisliArrow):
raise ValueError("Arrow must be a Kleisli arrow")
return k._func(a)
+
+
+#
+# Split
+#
+def split(patcher):
+ return KleisliArrow(patcher, lambda b: patcher((b, b)))
+
+
+#
+# Unsplit
+#
+def unsplit(patcher, op_func):
+ return KleisliArrow(patcher, lambda t: patcher(op_func(t[0], t[1])))
diff --git a/src/pypeline/core/arrows/tests/kleisli_arrow_tests.py b/src/pypeline/core/arrows/tests/kleisli_arrow_tests.py
index 80208e6..f436e6a 100644
--- a/src/pypeline/core/arrows/tests/kleisli_arrow_tests.py
+++ b/src/pypeline/core/arrows/tests/kleisli_arrow_tests.py
@@ -19,7 +19,7 @@
import unittest
from pypeline.core.arrows.function_arrow import FunctionArrow
-from pypeline.core.arrows.kleisli_arrow import KleisliArrow
+from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split as kleisli_split, unsplit as kleisli_unsplit
from pypeline.core.types.just import Just, return_ as just_return
from pypeline.core.types.state import State, return_ as state_return
@@ -73,3 +73,155 @@ class KleisliArrowUnitTest(unittest.TestCase):
state = list()
state_monad = KleisliArrow.runKleisli(arrow, value) # This is the value
self.assertEquals((x(w(value)), [s1, s2]), State.runState(state_monad, state)) # This is the state
+
+
+ def test_first_with_maybe_monad(self):
+ w = lambda a: a * 2
+ wk = lambda a: Just(w(a))
+ arrow = KleisliArrow(just_return, wk).first()
+
+ value = 9
+ result = KleisliArrow.runKleisli(arrow, (value, value))
+ target = Just((w(value), value))
+ self.assertEquals(target, result)
+
+
+ def test_second_with_maybe_monad(self):
+ w = lambda a: a * 2
+ wk = lambda a: Just(w(a))
+ arrow = KleisliArrow(just_return, wk).second()
+
+ value = 9
+ result = KleisliArrow.runKleisli(arrow, (value, value))
+ target = Just((value, w(value)))
+ self.assertEquals(target, result)
+
+
+ def test_first_with_state_monad(self):
+ w = lambda a: a * 2
+ s1 = "*2"
+ wk = lambda a: State(lambda s: (w(a), s.append(s1) or s))
+ arrow = KleisliArrow(state_return, wk).first()
+
+ value = 9
+ state = KleisliArrow.runKleisli(arrow, (value, value))
+ result = State.runState(state, list())
+ target = ((w(value), value), [s1])
+ self.assertEquals(target, result)
+
+
+ def test_second_with_state_monad(self):
+ w = lambda a: a * 2
+ s1 = "*2"
+ wk = lambda a: State(lambda s: (w(a), s.append(s1) or s))
+ arrow = KleisliArrow(state_return, wk).second()
+
+ value = 9
+ state = KleisliArrow.runKleisli(arrow, (value, value))
+ result = State.runState(state, list())
+ target = ((value, w(value)), [s1])
+ self.assertEquals(target, result)
+
+
+ def test_triple_asterisk_with_maybe_monad(self):
+ w = lambda a: a * 2
+ wk = lambda a: Just(w(a))
+ k1 = KleisliArrow(just_return, wk)
+
+ x = lambda a: a - 9
+ xk = lambda a: Just(x(a))
+ k2 = KleisliArrow(just_return, xk)
+
+ arrow = k1 ** k2
+
+ value = 7
+ target = Just((w(value), x(value)))
+ result = KleisliArrow.runKleisli(arrow, (value, value))
+ self.assertEquals(target, result)
+
+
+ def test_triple_ampersand_with_maybe_monad(self):
+ w = lambda a: a * 2
+ wk = lambda a: Just(w(a))
+ k1 = KleisliArrow(just_return, wk)
+
+ x = lambda a: a - 9
+ xk = lambda a: Just(x(a))
+ k2 = KleisliArrow(just_return, xk)
+
+ arrow = k1 & k2
+
+ value = 7
+ target = Just((w(value), x(value)))
+ result = KleisliArrow.runKleisli(arrow, value)
+ self.assertEquals(target, result)
+
+
+ def test_triple_ampersand_with_state_monad(self):
+ s1 = "*2"
+ w = lambda a: a * 2
+ f = lambda a: State(lambda s: (w(a), s.append(s1) or s))
+ k1 = KleisliArrow(state_return, f)
+
+ s2 = "-9"
+ x = lambda a: a - 9
+ h = lambda a: State(lambda s: (x(a), s.append(s2) or s))
+ k2 = KleisliArrow(state_return, h)
+
+ arrow = k1 & k2
+
+ value = 5
+ state = list()
+ state_monad = KleisliArrow.runKleisli(arrow, value)
+
+ target = ((w(value), x(value)), [s1, s2])
+ result = State.runState(state_monad, state)
+ self.assertEquals(target, result)
+
+
+ def test_split_with_maybe_monad(self):
+ value = 7
+ arrow = kleisli_split(just_return)
+ result = KleisliArrow.runKleisli(arrow, value)
+ target = Just((value, value))
+ self.assertEquals(target, result)
+
+
+ def test_split_with_state_monad(self):
+ value = 7
+ arrow = kleisli_split(state_return)
+ state = KleisliArrow.runKleisli(arrow, value)
+ result = State.runState(state, list())
+ target = ((value, value), list())
+ self.assertEquals(target, result)
+
+
+ def test_unsplit_with_maybe_monad(self):
+ value = 8
+
+ k1 = kleisli_split(just_return)
+
+ f = lambda x, y: x * y
+ k2 = kleisli_unsplit(just_return, f)
+
+ arrow = k1 >> k2
+
+ result = KleisliArrow.runKleisli(arrow, value)
+ target = Just(f(value, value))
+ self.assertEquals(target, result)
+
+
+ def test_unsplit_with_state_monad(self):
+ value = 7
+
+ k1 = kleisli_split(state_return)
+
+ f = lambda x, y: x * y
+ k2 = kleisli_unsplit(state_return, f)
+
+ arrow = k1 >> k2
+
+ state = KleisliArrow.runKleisli(arrow, value)
+ result = State.runState(state, list())
+ target = (f(value, value), list())
+ self.assertEquals(target, result)
diff --git a/src/pypeline/core/types/just.py b/src/pypeline/core/types/just.py
index 34a7fa0..6328c5b 100644
--- a/src/pypeline/core/types/just.py
+++ b/src/pypeline/core/types/just.py
@@ -30,6 +30,11 @@ class Just(Maybe):
raise ValueError("Value cannot be None")
self._a = a
+ # return
+ # return :: a -> m a
+ def return_(self, a):
+ return return_(a)
+
def __ge__(self, function):
if type(function) is not types.FunctionType and \
type(function) is not types.MethodType:
diff --git a/src/pypeline/core/types/monad.py b/src/pypeline/core/types/monad.py
index 34c7a35..a6ff269 100644
--- a/src/pypeline/core/types/monad.py
+++ b/src/pypeline/core/types/monad.py
@@ -17,6 +17,7 @@
# along with Pypeline. If not, see <http://www.gnu.org/licenses/>.
#
+
#
# Base monad class
#
@@ -34,7 +35,7 @@ class Monad(object):
# Bind "Shove" operator
# (>>) :: m a -> m b -> m b
def __rshift__(self, other):
- if not isinstance(other):
+ if not isinstance(other, Monad):
raise ValueError("Must be a monadic type")
return self >= (lambda _: other)
diff --git a/src/pypeline/core/types/nothing.py b/src/pypeline/core/types/nothing.py
index f60237d..aab5df2 100644
--- a/src/pypeline/core/types/nothing.py
+++ b/src/pypeline/core/types/nothing.py
@@ -29,7 +29,10 @@ class Nothing(Maybe):
if cls._instance is None:
cls._instance = super(Nothing, cls).__new__(cls, *args, **kwargs)
return cls._instance
-
+
+ def return_(self, a):
+ return return_(a)
+
def __ge__(self, function):
return Nothing()
diff --git a/src/pypeline/core/types/state.py b/src/pypeline/core/types/state.py
index 6df48e4..2c47e15 100644
--- a/src/pypeline/core/types/state.py
+++ b/src/pypeline/core/types/state.py
@@ -41,6 +41,11 @@ class State(Monad):
else:
self._func = lambda s: (a, s)
+ # return
+ # return :: a -> m a
+ def return_(self, a):
+ return return_(a)
+
# (>>=) :: State s a -> (a -> State s b) -> State s b
# (State h) >>= f = State $ \s -> let (a, newState) = h s
# (State g) = f a
diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py
index 01c246e..190d8fb 100644
--- a/src/pypeline/helpers/helpers.py
+++ b/src/pypeline/helpers/helpers.py
@@ -19,7 +19,7 @@
import subprocess
import types
-from pypeline.core.arrows.kleisli_arrow import KleisliArrow
+from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.types.state import State, return_
@@ -65,6 +65,48 @@ def cons_subprocess_component(process_pipe,
return KleisliArrow(return_, bind_function)
+def cons_batch_subprocess_component(process_pipe,
+ input_generator_function,
+ output_function,
+ state_mutator = None):
+ """Construct a pipeline component using a Popen object. Batch subprocesses shall accept a single line on stdin. An input generator function shall be provided that yields objects, that once "stringyfied", are presented to the subprocess' stdin. This function takes tow arguments: the value and the state objects. It is the responsibility of the feed function implementer to yield an EOF if necessary. The returned object shall be a Kleisli arrow representing this pipeline component."""
+ if not isinstance(process_pipe, subprocess.Popen):
+ raise ValueError("Must be a Popen process")
+
+ if input_generator_function is None or output_function is None:
+ raise ValueError("Subprocess components must specify both " +
+ "input generator and output functions")
+
+ #
+ # This bind function handles the 'process'
+ # being a subprocess.
+ #
+ def bind_function(a):
+ def state_function(s):
+ # The input forming function is an iterable, so
+ # request every value this function will return
+ # and feed it to the underlying subprocess.
+ # This function shall return a value, that when stringyfied and
+ # injected into stdin, the subprocess will understand
+ for transformed_a in input_generator_function(a, s):
+ # Communicate with the subprocess
+ if transformed_a is not None:
+ print >> process_pipe.stdin, str(transformed_a).strip()
+ process_pipe.stdin.flush()
+
+ # Get the new a
+ new_a = output_function(a, s)
+
+ # Mutate the state
+ next_s = state_mutator(s) if state_mutator else s
+
+ # New value/state pair
+ return (new_a, next_s)
+ return State(state_function)
+
+ return KleisliArrow(return_, bind_function)
+
+
def cons_function_component(function,
input_forming_function = None,
output_forming_function = None,
@@ -110,8 +152,18 @@ def cons_dictionary_wire(conversions):
return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions})
-def wire_components(component_one, component_two, wire):
- """Wire two components together."""
+def cons_split_wire():
+ """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
+ return split(return_)
+
+
+def cons_unsplit_wire(unsplit_func):
+ """Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
+ return unsplit(return_, unsplit_func)
+
+
+def cons_wired_components(component_one, component_two, wire):
+ """Wire two components together and return a component that is the composition of these components."""
return component_one >> wire >> component_two
@@ -120,6 +172,16 @@ def cons_pipeline(input_wire, component, output_wire):
return input_wire >> component >> output_wire
+def cons_composed_component(first_component, second_component):
+ """Compose two components and return a component that represents the composed computation."""
+ return first_component >> second_component
+
+
+def cons_parallel_component(top_component, bottom_component):
+ """Construct a component that will compute the provided components in parallel. The returned component takes a pair as input, see cons_split_wire(), and the component shall return a pair."""
+ return top_component ** bottom_component
+
+
def __kleisli_wrapper(f):
def wrapper(pipeline, input, state):
"""Run, evaluate, or execute a pipeline."""
diff --git a/src/pypeline/helpers/tests/helper_pipeline_tests.py b/src/pypeline/helpers/tests/helper_pipeline_tests.py
index 55e3d8c..e660108 100644
--- a/src/pypeline/helpers/tests/helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/helper_pipeline_tests.py
@@ -31,8 +31,12 @@ from pypeline.helpers.helpers import cons_subprocess_component, \
cons_function_component, \
cons_wire, \
cons_dictionary_wire, \
+ cons_split_wire, \
+ cons_unsplit_wire, \
cons_pipeline, \
- wire_components, \
+ cons_wired_components, \
+ cons_composed_component, \
+ cons_parallel_component, \
run_pipeline
@@ -61,7 +65,7 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
return (arrow, pipe)
- def test_pypeline_wth_subprocess_and_function_components(self):
+ def test_pypeline_with_subprocess_and_function_components(self):
if sys.platform.startswith('win'):
self.fail("Currently only this unit test is only supported on non-Windows platforms")
@@ -101,7 +105,10 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
output_wire_func = lambda a, s: str(a['output'])
output_wire = cons_wire(output_wire_func)
- pipeline = input_wire >> wire_components(comp_one, comp_two, wire) >> to_upper_wire >> comp_three
+ pipeline = cons_pipeline(input_wire,
+ cons_wired_components(comp_one, comp_two, wire),
+ to_upper_wire)
+ pipeline = cons_composed_component(pipeline, comp_three)
value = "hello world"
target = (upper_func(value, None), [rev_msg_one, rev_msg_two, upper_msg])
@@ -114,3 +121,50 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
finally:
comp_proc_one[1].terminate()
comp_proc_one[1].wait()
+
+
+ def test_pypeline_with_split_and_unsplit_wires(self):
+ if sys.platform.startswith('win'):
+ self.fail("Currently only this unit test is only supported on non-Windows platforms")
+
+ rev_msg_one = "reverse(subprocess)"
+ rev_msg_two = "reverse(function)"
+
+ reverse_command = os.path.join("src", "pypeline", "helpers", "tests", "reverse.sh")
+
+ reverse_func = lambda a, s: a[::-1]
+ input_func = lambda a, s: str(a['input'])
+ output_func = lambda a, s: {'output': str(a)}
+
+ comp_proc_one = PypelineHelperFunctionUnitTest.__cons_and_start_subprocess_component(
+ reverse_command, tuple(),
+ input_func,
+ output_func,
+ state_mutator = lambda s: s.append(rev_msg_one) or s)
+ try:
+ comp_one = comp_proc_one[0]
+ comp_two = cons_function_component(
+ reverse_func,
+ input_func,
+ output_func,
+ state_mutator = lambda s: s.append(rev_msg_two) or s)
+
+ parallel_reverse_comp = cons_parallel_component(comp_one, comp_two)
+ split_wire = cons_split_wire()
+ unsplit_func = lambda a, b: {'subprocess_output' : a['output'],
+ 'function_output': b['output']}
+ unsplit_wire = cons_unsplit_wire(unsplit_func)
+ input_wire = cons_wire(lambda a, s: {'input': a})
+ pipeline = cons_pipeline(input_wire,
+ cons_composed_component(split_wire, parallel_reverse_comp),
+ unsplit_wire)
+
+ value = "hello world"
+ result = run_pipeline(pipeline, "hello world", list())
+ target_dict = {'output': reverse_func(value, None)}
+ target_value = unsplit_func(target_dict, target_dict)
+ target = (target_value, [rev_msg_one, rev_msg_two])
+ self.assertEquals(target, result)
+ finally:
+ comp_proc_one[1].terminate()
+ comp_proc_one[1].wait()