diff options
author | Ian Johnson <ianj@wgrids.com> | 2012-09-05 17:22:12 +0400 |
---|---|---|
committer | Ian Johnson <ianj@wgrids.com> | 2012-09-05 17:22:12 +0400 |
commit | a86a28831b85bdb0d25c06c48d543316cf7c20e9 (patch) | |
tree | 53e1ee6791bf7412b0d0d7e83ea6a059c64e4bcb | |
parent | 337269e48e2c7b3fedadc24f09b0a19a5c0c4335 (diff) |
Added batch subprocess pypeline component, and split and unsplit wires.
-rw-r--r-- | src/pypeline/helpers/helpers.py | 50 | ||||
-rw-r--r-- | src/pypeline/helpers/tests/helper_pipeline_tests.py | 4 |
2 files changed, 52 insertions, 2 deletions
diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py index dc5ae1c..f910fbf 100644 --- a/src/pypeline/helpers/helpers.py +++ b/src/pypeline/helpers/helpers.py @@ -19,7 +19,7 @@ import subprocess import types -from pypeline.core.arrows.kleisli_arrow import KleisliArrow +from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit from pypeline.core.types.state import State, return_ @@ -65,6 +65,44 @@ def cons_subprocess_component(process_pipe, return KleisliArrow(return_, bind_function) +def cons_batch_subprocess_component(process_pipe, + input_feed_function, + state_mutator = None): + """Construct a pipeline component using a Popen object. Batch subprocesses shall accept a single line on stdin. An input feed function shall be provided that yields objects, that once "stringyfied", are presented to the subprocess' stdin. This function takes tow arguments: the value and the state objects. It is the responsibility of the feed function implementer to yield an EOF if necessary. The returned object shall be a Kleisli arrow representing this pipeline component.""" + if not isinstance(process_pipe, subprocess.Popen): + raise ValueError("Must be a Popen process") + + if input_feed_function is None: + raise ValueError("Subprocess components must specify both " + + "input and output forming functions") + + # + # This bind function handles the 'process' + # being a subprocess. + # + def bind_function(a): + def state_function(s): + # The input forming function is an iterable, so + # request every value this function will return + # and feed it to the underlying subprocess. + # This function shall return a value, that when stringyfied and + # injected into stdin, the subprocess will understand + for transformed_a in input_feed_function(a, s): + # Communicate with the subprocess + if transformed_a is not None: + print >> process_pipe.stdin, str(transformed_a).strip() + process_pipe.stdin.flush() + + # Mutate the state + next_s = state_mutator(s) if state_mutator else s + + # New value/state pair + return (transformed_new_a, next_s) + return State(state_function) + + return KleisliArrow(return_, bind_function) + + def cons_function_component(function, input_forming_function = None, output_forming_function = None, @@ -118,6 +156,16 @@ def cons_dictionary_wire(conversions): # return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions}) +def cons_split_wire(): + """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators.""" + return split(return_) + + +def cons_unsplit_wire(unsplit_func): + """Construct a wire that takes a pair and applies a function to this pair to combine them into one value.""" + return unsplit(return_, unsplit_func) + + def wire_components(component_one, component_two, wire): """Wire two components together.""" return component_one >> wire >> component_two diff --git a/src/pypeline/helpers/tests/helper_pipeline_tests.py b/src/pypeline/helpers/tests/helper_pipeline_tests.py index 55e3d8c..e586fe3 100644 --- a/src/pypeline/helpers/tests/helper_pipeline_tests.py +++ b/src/pypeline/helpers/tests/helper_pipeline_tests.py @@ -31,6 +31,8 @@ from pypeline.helpers.helpers import cons_subprocess_component, \ cons_function_component, \ cons_wire, \ cons_dictionary_wire, \ + cons_split_wire, \ + cons_unsplit_wire, \ cons_pipeline, \ wire_components, \ run_pipeline @@ -61,7 +63,7 @@ class PypelineHelperFunctionUnitTest(unittest.TestCase): return (arrow, pipe) - def test_pypeline_wth_subprocess_and_function_components(self): + def test_pypeline_with_subprocess_and_function_components(self): if sys.platform.startswith('win'): self.fail("Currently only this unit test is only supported on non-Windows platforms") |