Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ianj@wgrids.com>2012-09-05 17:22:12 +0400
committerIan Johnson <ianj@wgrids.com>2012-09-05 17:22:12 +0400
commita86a28831b85bdb0d25c06c48d543316cf7c20e9 (patch)
tree53e1ee6791bf7412b0d0d7e83ea6a059c64e4bcb
parent337269e48e2c7b3fedadc24f09b0a19a5c0c4335 (diff)
Added batch subprocess pypeline component, and split and unsplit wires.
-rw-r--r--src/pypeline/helpers/helpers.py50
-rw-r--r--src/pypeline/helpers/tests/helper_pipeline_tests.py4
2 files changed, 52 insertions, 2 deletions
diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py
index dc5ae1c..f910fbf 100644
--- a/src/pypeline/helpers/helpers.py
+++ b/src/pypeline/helpers/helpers.py
@@ -19,7 +19,7 @@
import subprocess
import types
-from pypeline.core.arrows.kleisli_arrow import KleisliArrow
+from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.types.state import State, return_
@@ -65,6 +65,44 @@ def cons_subprocess_component(process_pipe,
return KleisliArrow(return_, bind_function)
+def cons_batch_subprocess_component(process_pipe,
+ input_feed_function,
+ state_mutator = None):
+ """Construct a pipeline component using a Popen object. Batch subprocesses shall accept a single line on stdin. An input feed function shall be provided that yields objects, that once "stringyfied", are presented to the subprocess' stdin. This function takes tow arguments: the value and the state objects. It is the responsibility of the feed function implementer to yield an EOF if necessary. The returned object shall be a Kleisli arrow representing this pipeline component."""
+ if not isinstance(process_pipe, subprocess.Popen):
+ raise ValueError("Must be a Popen process")
+
+ if input_feed_function is None:
+ raise ValueError("Subprocess components must specify both " +
+ "input and output forming functions")
+
+ #
+ # This bind function handles the 'process'
+ # being a subprocess.
+ #
+ def bind_function(a):
+ def state_function(s):
+ # The input forming function is an iterable, so
+ # request every value this function will return
+ # and feed it to the underlying subprocess.
+ # This function shall return a value, that when stringyfied and
+ # injected into stdin, the subprocess will understand
+ for transformed_a in input_feed_function(a, s):
+ # Communicate with the subprocess
+ if transformed_a is not None:
+ print >> process_pipe.stdin, str(transformed_a).strip()
+ process_pipe.stdin.flush()
+
+ # Mutate the state
+ next_s = state_mutator(s) if state_mutator else s
+
+ # New value/state pair
+ return (transformed_new_a, next_s)
+ return State(state_function)
+
+ return KleisliArrow(return_, bind_function)
+
+
def cons_function_component(function,
input_forming_function = None,
output_forming_function = None,
@@ -118,6 +156,16 @@ def cons_dictionary_wire(conversions):
# return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions})
+def cons_split_wire():
+ """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
+ return split(return_)
+
+
+def cons_unsplit_wire(unsplit_func):
+ """Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
+ return unsplit(return_, unsplit_func)
+
+
def wire_components(component_one, component_two, wire):
"""Wire two components together."""
return component_one >> wire >> component_two
diff --git a/src/pypeline/helpers/tests/helper_pipeline_tests.py b/src/pypeline/helpers/tests/helper_pipeline_tests.py
index 55e3d8c..e586fe3 100644
--- a/src/pypeline/helpers/tests/helper_pipeline_tests.py
+++ b/src/pypeline/helpers/tests/helper_pipeline_tests.py
@@ -31,6 +31,8 @@ from pypeline.helpers.helpers import cons_subprocess_component, \
cons_function_component, \
cons_wire, \
cons_dictionary_wire, \
+ cons_split_wire, \
+ cons_unsplit_wire, \
cons_pipeline, \
wire_components, \
run_pipeline
@@ -61,7 +63,7 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase):
return (arrow, pipe)
- def test_pypeline_wth_subprocess_and_function_components(self):
+ def test_pypeline_with_subprocess_and_function_components(self):
if sys.platform.startswith('win'):
self.fail("Currently only this unit test is only supported on non-Windows platforms")