Welcome to mirror list, hosted at ThFree Co, Russian Federation.

helpers.py « helpers « pypeline « src - github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 01c246e9c9d230eab471f542c48be19356ab55e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#
# Copyright Applied Language Solutions 2012
#
# This file is part of Pypeline.
#
# Pypeline is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pypeline is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pypeline.  If not, see <http://www.gnu.org/licenses/>.
#
import subprocess
import types

from pypeline.core.arrows.kleisli_arrow import KleisliArrow
from pypeline.core.types.state import State, return_


def cons_subprocess_component(process_pipe,
                              input_forming_function,
                              output_forming_function,
                              state_mutator = None):
    """Construct a pipeline component using a Popen object. Subprocesses shall accept a single line on stdin and generate a single line on stdout. Input and output forming functions shall be provided to generate and parse single lines of text that will be used to communicate with the subprocess. The returned object shall be a Kleisli arrow representing this pipeline component."""
    if not isinstance(process_pipe, subprocess.Popen):
        raise ValueError("Must be a Popen process")

    if input_forming_function is None or \
       output_forming_function is None:
        raise ValueError("Subprocess components must specify both " +
                         "input and output forming functions")

    #
    # This bind function handles the 'process'
    # being a subprocess.
    #
    def bind_function(a):
        def state_function(s):
            # Transform the value into a line, that when
            # injected into stdin, the subprocess will understand
            transformed_a = input_forming_function(a, s)

            # Communicate with the subprocess
            if transformed_a is not None:
                print >> process_pipe.stdin, str(transformed_a).strip()
                process_pipe.stdin.flush()
                new_a = process_pipe.stdout.readline().strip()

            # Parse the output from the subprocess
            transformed_new_a = output_forming_function(new_a, s)

            # Mutate the state
            next_s = state_mutator(s) if state_mutator else s

            # New value/state pair
            return (transformed_new_a, next_s)
        return State(state_function)

    return KleisliArrow(return_, bind_function)


def cons_function_component(function,
                            input_forming_function = None,
                            output_forming_function = None,
                            state_mutator = None):
    """Construct a component based on a function. Any input or output forming functions shall be called if provided. In this mode only the Kleisli arrow is returned."""
    if type(function) is not types.FunctionType and \
       type(function) is not types.MethodType:
        raise ValueError("Must be a function or method")

    def bind_function(a):
        def state_function(s):
            # Transform the value into a line, that when
            # injected into stdin, the subprocess will understand
            transformed_a = input_forming_function(a, s) if input_forming_function else a

            # Apply
            new_a = function(transformed_a, s)

            # Parse the output from the subprocess
            transformed_new_a = output_forming_function(new_a, s) if output_forming_function else new_a

            # Mutate the state
            next_s = state_mutator(s) if state_mutator else s

            # New value/state pair
            return (transformed_new_a, next_s)
        return State(state_function)

    return KleisliArrow(return_, bind_function)


def cons_wire(schema_conv_function):
    """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
    def bind_function(a):
        def state_function(s):
            return (schema_conv_function(a, s), s)
        return State(state_function)
    return KleisliArrow(return_, bind_function)


def cons_dictionary_wire(conversions):
    """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
    return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions})


def wire_components(component_one, component_two, wire):
    """Wire two components together."""
    return component_one >> wire >> component_two


def cons_pipeline(input_wire, component, output_wire):
    """Prepend an input wire and append an output wire to a component to build a pipeline."""
    return input_wire >> component >> output_wire


def __kleisli_wrapper(f):
    def wrapper(pipeline, input, state):
        """Run, evaluate, or execute a pipeline."""
        state_monad = KleisliArrow.runKleisli(pipeline, input)
        return f(state_monad, state)
    return wrapper


@__kleisli_wrapper
def run_pipeline(state_monad, state):
     return State.runState(state_monad, state)


@__kleisli_wrapper
def eval_pipeline(state_monad, state):
    return State.evalState(state_monad, state)


@__kleisli_wrapper
def exec_pipeline(state_monad, state):
    return State.execState(state_monad, state)