Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2012-10-15 20:26:14 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2012-10-15 20:26:14 +0400
commit23fa8a6583677c4d48ed44832426b2d264321018 (patch)
tree53d9929607ae2e176021784e42ba22e5cbb5557f
parent8ef9aeecabcfe944eb492623c6cbf8b93751bcad (diff)
Added parallel helper unit tests. Bug fixing.
-rw-r--r--README.md34
-rw-r--r--setup.py1
-rw-r--r--src/pypeline/helpers/helpers.py91
-rw-r--r--src/pypeline/helpers/parallel_helpers.py42
-rw-r--r--src/pypeline/helpers/tests/helper_pipeline_tests.py30
-rw-r--r--src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py93
6 files changed, 146 insertions, 145 deletions
diff --git a/README.md b/README.md
index beb71f3..0c2cccd 100644
--- a/README.md
+++ b/README.md
@@ -44,7 +44,7 @@ Pipelines can be constructed using the helpers function in the [pypeline.helpers
2. Pipeline components, and
3. Wires.
-A pipeline is a series of pipeline components that are, optionally, connected with wires. Pipeline components can be constructed with functions, or with a `subprocess.Popen` object; this enables pipelines to be built that call externally running programs. Currently, the protocol for communicating with subprocesses is via stdin and stdout. A single line is fed into the subprocess on stdin and it shall respond with a single line on it's stdout.
+A pipeline is a series of pipeline components that are, optionally, connected with wires. Pipeline components are built from functions.
Wires can be used to convert the output of one pipeline component into the input of the succeeding pipeline component. Wires can be constructed using a function or a dictionary. Assuming a pipeline component's output is a dictionary and the next component accepts, as input, a dictionary, a wire, constructed from a dictionary, maps values in the output dictionary into a dictionary which is to be used as an input. However, a wire constructed from a function can create arbitrary output to input mappings.
@@ -96,38 +96,6 @@ The input and output forming functions shall take two arguments: a value and the
The state mutator function shall take one argument, the state object, and return a mutated state object if desired. The state mutator function is applied after all the other functions have been applied. If no state mutator function is specified the state flows through the component unchanged.
-#### Constructing a Subprocess Based Pipeline Component
-
- helpers.cons_subprocess_component(process_pipe,
- input_forming_function,
- output_forming_function,
- state_mutator_function = None)
-
-Construct a pipeline component whose computation will be achieved using a sub-process. Input and output forming functions should generate the single line given to the `stdin` of the sub-process, and parse out the single line written to the sub-process' `stdout` respectively. An optional state mutator function can be provided to alter the state object passed into one of the pipeline run/evaluating/executing functions.
-
-The output from the previous component is applied to the input forming function and the "stringyfied" resultant object is written to the sub-process' `stdin`. Once the sub-process has responded a single line, from `stdout`, is applied to the output forming function. This function is to parse the response and the resultant object is passed to the subsequent pipeline component, or wire. The input and output forming functions shall take two arguments: a value and the state object. Or,
-
- input_forming_function :: a -> s -> b
- output_forming_function :: a -> s -> b
-
-The state mutator function shall take one argument, the state object, and return a mutated state object if desired. If no state mutator function is specified the state flows through the component unchanged.
-
-#### Constructing a Batch Subprocess Pipeline Component
-
- helpers.cons_batch_subprocess_component(process_pipe,
- input_generator_function,
- output_function,
- state_mutator = None)
-
-Construct a pipeline component whose computation requires many values to be sent to the sub-process. An input generator function is required that shall provide the values for the computation. This function shall be a generator that takes two arguments: the value, and the state. This function shall yield objects, that once "stringyfied", shall be sent, as one line, to the `stdin` of the sub-process. The `stdout` of the sub-process is ignored.
-
-The output function generates the value that shall be passed to the subsequent pipeline component. This function shall take two arguments: the input value to the components, and the state object.
-
- input_feed_function :: a -> s -> b
- output_function :: a -> s -> c
-
-The state mutator function shall take one argument, the state object, and return a mutated state of object if desired. If no state mutator function is specified the state flows through the component unchanged.
-
### Wire Functions
#### Constructing a Function Based Wire
diff --git a/setup.py b/setup.py
index c06cd39..416041b 100644
--- a/setup.py
+++ b/setup.py
@@ -34,6 +34,7 @@ setup(
url = "http://www.appliedlanguage.com",
setup_requires = ['nose>=1.0'],
+ install_requires = ['futures>=2.1.3'],
test_suite = 'nose.collector',
)
diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py
index a8c1359..61085d8 100644
--- a/src/pypeline/helpers/helpers.py
+++ b/src/pypeline/helpers/helpers.py
@@ -23,99 +23,11 @@ from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.types.state import State, return_
-def cons_subprocess_component(process_pipe,
- input_forming_function,
- output_forming_function,
- state_mutator = None):
- """Construct a pipeline component using a Popen object. Subprocesses shall accept a single line on stdin and generate a single line on stdout. Input and output forming functions shall be provided to generate and parse single lines of text that will be used to communicate with the subprocess. The returned object shall be a Kleisli arrow representing this pipeline component."""
- if not isinstance(process_pipe, subprocess.Popen):
- raise ValueError("Must be a Popen process")
-
- if input_forming_function is None or \
- output_forming_function is None:
- raise ValueError("Subprocess components must specify both " +
- "input and output forming functions")
-
- #
- # This bind function handles the 'process'
- # being a subprocess.
- #
- def bind_function(a):
- def state_function(s):
- # Transform the value into a line, that when
- # injected into stdin, the subprocess will understand
- transformed_a = input_forming_function(a, s)
-
- # Communicate with the subprocess
- if transformed_a is not None:
- print >> process_pipe.stdin, str(transformed_a).strip()
- process_pipe.stdin.flush()
- new_a = process_pipe.stdout.readline().strip()
-
- # Parse the output from the subprocess
- transformed_new_a = output_forming_function(new_a, s)
-
- # Mutate the state
- next_s = state_mutator(s) if state_mutator else s
-
- # New value/state pair
- return (transformed_new_a, next_s)
- return State(state_function)
-
- return KleisliArrow(return_, bind_function)
-
-
-def cons_batch_subprocess_component(process_pipe,
- input_generator_function,
- output_function,
- state_mutator = None):
- """Construct a pipeline component using a Popen object. Batch subprocesses shall accept a single line on stdin. An input generator function shall be provided that yields objects, that once "stringyfied", are presented to the subprocess' stdin. This function takes tow arguments: the value and the state objects. It is the responsibility of the feed function implementer to yield an EOF if necessary. The returned object shall be a Kleisli arrow representing this pipeline component."""
- if not isinstance(process_pipe, subprocess.Popen):
- raise ValueError("Must be a Popen process")
-
- if input_generator_function is None or output_function is None:
- raise ValueError("Subprocess components must specify both " +
- "input generator and output functions")
-
- #
- # This bind function handles the 'process'
- # being a subprocess.
- #
- def bind_function(a):
- def state_function(s):
- # The input forming function is an iterable, so
- # request every value this function will return
- # and feed it to the underlying subprocess.
- # This function shall return a value, that when stringyfied and
- # injected into stdin, the subprocess will understand
- for transformed_a in input_generator_function(a, s):
- # Communicate with the subprocess
- if transformed_a is not None:
- print >> process_pipe.stdin, str(transformed_a).strip()
- process_pipe.stdin.flush()
-
- # Get the new a
- new_a = output_function(a, s)
-
- # Mutate the state
- next_s = state_mutator(s) if state_mutator else s
-
- # New value/state pair
- return (new_a, next_s)
- return State(state_function)
-
- return KleisliArrow(return_, bind_function)
-
-
def cons_function_component(function,
input_forming_function = None,
output_forming_function = None,
state_mutator = None):
"""Construct a pipeline component whose computation will be achieved using a function. Optional input and output forming functions pre- and post-process the input and output values to and from the function. An optional state mutator function can be provided to alter the state object passed into one of the pipeline run/evaluating/executing functions. A Kleisli arrow is returned."""
- if type(function) is not types.FunctionType and \
- type(function) is not types.MethodType:
- raise ValueError("Must be a function or method")
-
def bind_function(a):
def state_function(s):
# Transform the input
@@ -204,6 +116,7 @@ def exec_pipeline(state_monad, state):
return State.execState(state_monad, state)
-def get_dictionary_conversion_function(conversion):
+def get_dictionary_conversion_function(conversions):
"""Returns a function that completes the dictionary conversions as part of a wire."""
return lambda a, _: {conversions[key]: a[key] for key in conversions}
+
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
index 0c84bb4..b4a402a 100644
--- a/src/pypeline/helpers/parallel_helpers.py
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -22,7 +22,7 @@ from pypeline.core.types.state import State, return_
from pypeline.helpers import helpers
-class WrapperState(object):
+class WrappedState(object):
def __init__(self, executor, state):
self.executor = executor
self.state = state
@@ -33,19 +33,34 @@ def cons_function_component(function,
output_forming_function = None,
state_mutator = None):
"""Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
- def get_component_function_wrapper(inner_function):
- def component_function_wrapper(future, wrapper_state):
- value = future.result()
- this_future = wrapper_state.executor.submit(inner_function, value, wrapper_state.state)
- return this_future
- return component_function_wrapper
+ def bind_function(future):
+ def state_function(wrapped_state):
+ # Unpack state
+ state = wrapped_state.state
- component = helpers.cons_function_component(get_component_function_wrapper(function),
- input_forming_function,
- output_forming_function,
- state_mutator)
+ def do_transformation(a, s):
+ # Transform the input
+ transformed_a = input_forming_function(a, state) if input_forming_function else a
- return component
+ # Apply
+ new_a = function(transformed_a, state)
+
+ # Transform the output of the function
+ transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a
+
+ return transformed_new_a
+
+ # Execute
+ new_future = wrapped_state.executor.submit(do_transformation, future.result(), state)
+
+ # Mutate the state
+ next_state = state_mutator(state) if state_mutator else state
+
+ # New value/state pair
+ return (new_future, WrappedState(wrapped_state.executor, next_state))
+ return State(state_function)
+
+ return KleisliArrow(return_, bind_function)
def cons_wire(schema_conv_function):
@@ -101,7 +116,8 @@ def __kleisli_wrapper(f):
future = Future()
future.set_result(input)
state_monad = KleisliArrow.runKleisli(pipeline, future)
- return f(state_monad, WrapperState(executor, state))
+ output = f(state_monad, WrappedState(executor, state))
+ return (output[0].result(), output[1].state)
return wrapper
diff --git a/src/pypeline/helpers/tests/helper_pipeline_tests.py b/src/pypeline/helpers/tests/helper_pipeline_tests.py
index e660108..7cf4364 100644
--- a/src/pypeline/helpers/tests/helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/helper_pipeline_tests.py
@@ -27,8 +27,7 @@ import subprocess
import sys
import unittest
-from pypeline.helpers.helpers import cons_subprocess_component, \
- cons_function_component, \
+from pypeline.helpers.helpers import cons_function_component, \
cons_wire, \
cons_dictionary_wire, \
cons_split_wire, \
@@ -40,7 +39,7 @@ from pypeline.helpers.helpers import cons_subprocess_component, \
run_pipeline
-class PypelineHelperFunctionUnitTest(unittest.TestCase):
+class PypelineHelperUnitTest(unittest.TestCase):
@staticmethod
def __cons_and_start_subprocess_component(command,
arguments,
@@ -52,11 +51,22 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
pipe = subprocess.Popen(args,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE)
+
+ def get_process_function(process_pipe):
+ def process_function(a, s):
+ new_a = None
+ if a:
+ print >> process_pipe.stdin, str(a).strip()
+ process_pipe.stdin.flush()
+ new_a = process_pipe.stdout.readline().strip()
+ return new_a
+ return process_function
+
try:
- arrow = cons_subprocess_component(pipe,
- input_forming_func,
- output_forming_func,
- state_mutator)
+ arrow = cons_function_component(get_process_function(pipe),
+ input_forming_func,
+ output_forming_func,
+ state_mutator)
except Exception, ex:
pipe.terminate()
pipe.wait()
@@ -75,13 +85,13 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
reverse_command = os.path.join("src", "pypeline", "helpers", "tests", "reverse.sh")
- comp_proc_one = PypelineHelperFunctionUnitTest.__cons_and_start_subprocess_component(
+ comp_proc_one = PypelineHelperUnitTest.__cons_and_start_subprocess_component(
reverse_command, tuple(),
lambda a, s: str(a['input']),
lambda a, s: {'output': str(a)},
state_mutator = lambda s: s.append(rev_msg_one) or s)
try:
- comp_proc_two = PypelineHelperFunctionUnitTest.__cons_and_start_subprocess_component(
+ comp_proc_two = PypelineHelperUnitTest.__cons_and_start_subprocess_component(
reverse_command, tuple(),
lambda a, s: str(a['input']),
lambda a, s: {'output': str(a)},
@@ -136,7 +146,7 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
input_func = lambda a, s: str(a['input'])
output_func = lambda a, s: {'output': str(a)}
- comp_proc_one = PypelineHelperFunctionUnitTest.__cons_and_start_subprocess_component(
+ comp_proc_one = PypelineHelperUnitTest.__cons_and_start_subprocess_component(
reverse_command, tuple(),
input_func,
output_func,
diff --git a/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
new file mode 100644
index 0000000..ed4f743
--- /dev/null
+++ b/src/pypeline/helpers/tests/parallel_helper_pipeline_tests.py
@@ -0,0 +1,93 @@
+#
+# Copyright Applied Language Solutions 2012
+#
+# This file is part of Pypeline.
+#
+# Pypeline is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pypeline is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Pypeline. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Unit tests for building a pipeline using the helper functions.
+# The pipeline is a mixture of components that are sub-processes and plain old
+# Python functions.
+#
+import os
+import subprocess
+import sys
+import unittest
+
+from concurrent.futures import ThreadPoolExecutor
+from pypeline.helpers.parallel_helpers import cons_function_component, \
+ cons_wire, \
+ cons_dictionary_wire, \
+ cons_split_wire, \
+ cons_unsplit_wire, \
+ run_pipeline
+
+
+class ParallelPypelineHelperUnitTest(unittest.TestCase):
+ @staticmethod
+ def test(no_workers, pipeline, input, state):
+ executor = ThreadPoolExecutor(max_workers = no_workers)
+ result = run_pipeline(executor, pipeline, input, state)
+ executor.shutdown(True)
+ return result
+
+
+ def test_serial_pypeline_with_function_components(self):
+ rev_msg_one = "reverse(1)"
+ rev_msg_two = "reverse(2)"
+ upper_msg = "upper"
+
+ reverse_function = lambda a, s: a[::-1]
+ upper_function = lambda a, s: a.upper()
+
+ comp_rev_one = cons_function_component(reverse_function,
+ state_mutator = lambda s: s.append(rev_msg_one) or s)
+ comp_rev_two = cons_function_component(reverse_function,
+ state_mutator = lambda s: s.append(rev_msg_two) or s)
+ comp_upper = cons_function_component(upper_function,
+ state_mutator = lambda s: s.append(upper_msg) or s)
+
+ pipeline = comp_rev_one >> comp_rev_two >> comp_upper
+
+ value = "hello world"
+ target = (upper_function(value, None), [rev_msg_one, rev_msg_two, upper_msg])
+ result = ParallelPypelineHelperUnitTest.test(2, pipeline, "hello world", list())
+
+ self.assertEquals(target, result)
+
+
+ def test_parallel_pypeline_with_split_and_unsplit_wires(self):
+ rev_msg_one = "reverse(top)"
+ rev_msg_two = "reverse(bottom)"
+
+ reverse_func = lambda a, s: a[::-1]
+
+ comp_rev_top = cons_function_component(reverse_func,
+ state_mutator = lambda s: s.append(rev_msg_one) or s)
+ comp_rev_bottom = cons_function_component(reverse_func,
+ state_mutator = lambda s: s.append(rev_msg_two) or s)
+
+ unsplit_func = lambda t, b: {'top': t, 'bottom': b}
+
+ pipeline = (comp_rev_top & comp_rev_bottom) >> cons_unsplit_wire(unsplit_func)
+
+ value = "hello world"
+ target = (unsplit_func(reverse_func(value, None), reverse_func(value, None)),
+ [rev_msg_one, rev_msg_two])
+
+ result = ParallelPypelineHelperUnitTest.test(2, pipeline, "hello world", list())
+
+ self.assertEquals(target, result)