Welcome to mirror list, hosted at ThFree Co, Russian Federation.

parallel_helpers.py « helpers « pypeline « src - github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 315a565ca6d1608cc97d98b89fbff3ce306bc46c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#
# Copyright Applied Language Solutions 2012
#
# This file is part of Pypeline.
#
# Pypeline is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pypeline is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pypeline.  If not, see <http://www.gnu.org/licenses/>.
#
from concurrent.futures import Future
from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.arrows.kleisli_arrow_choice import KleisliArrowChoice
from pypeline.core.types.either import Either, Left, Right
from pypeline.core.types.state import State, return_
from pypeline.helpers.helpers import get_dictionary_conversion_function


#
# Monkey patch the Future class to support indexing
#
Future.__getitem__ = lambda f, key: (f.result())[key]


class WrappedState(object):
    def __init__(self, executor, state):
        self.executor = executor
        self.state = state


def cons_function_component(function,
                            input_forming_function = None,
                            output_forming_function = None,
                            state_mutator = None):
    """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
    def get_bind_function():
        def bind_function(bind_a):
            def state_function(wrapped_state):
                # Unpack state
                state = wrapped_state.state

                def do_transformation(a, s):
                    # Handle input
                    if isinstance(a, tuple):
                        the_a = (a[0].result(), a[1].result())
                    elif isinstance(a, Future):
                        the_a = a.result()
                    else:
                        raise ValueError("Component state function has value that is not of type tuple or Future")
                    
                    # Transform the input
                    transformed_a = input_forming_function(the_a, state) if input_forming_function else the_a

                    # Apply
                    new_a = function(transformed_a, state)

                    # Transform the output of the function
                    transformed_new_a = output_forming_function(new_a, state) if output_forming_function else new_a

                    return transformed_new_a

                # Execute
                new_future = wrapped_state.executor.submit(do_transformation,
                                                           bind_a,
                                                           state)

                # Mutate the state
                next_state = state_mutator(state) if state_mutator else state
                # New value/state pair
                return (new_future, WrappedState(wrapped_state.executor, next_state))
            return State(state_function)
        return bind_function

    return KleisliArrow(return_, get_bind_function())


def cons_wire(schema_conv_function):
    """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
    def get_wire_bind_function():
        def wire_bind_function(a):
            def wire_state_function(s):
                new_a = schema_conv_function(a, s.state)
                if isinstance(a, tuple):
                    futured_new_a = [None, None]
                    for a_new_a, futured_new_a_idx in zip(new_a, range(len(futured_new_a))):
                        if isinstance(a_new_a, Future):
                            futured_new_a[futured_new_a_idx] = a_new_a
                        else:
                            futured_new_a[futured_new_a_idx] = Future()
                            futured_new_a[futured_new_a_idx].set_result(a_new_a)
                    futured_new_a = tuple(futured_new_a)
                else:
                    futured_new_a = Future()
                    futured_new_a.set_result(new_a)

                return (futured_new_a, s)
            return State(wire_state_function)
        return wire_bind_function

    return KleisliArrow(return_, get_wire_bind_function())


def cons_dictionary_wire(conversions):
    """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
    return cons_wire(get_dictionary_conversion_function(conversions))


def cons_split_wire():
    """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
    def get_split_wire_bind_function():
        def split_wire_bind_function(a):
            def split_wire_state_function(s):
                new_a = [None, None]
                if isinstance(a, tuple):
                    for an_a, new_a_idx in zip(a, range(len(new_a))):
                        if isinstance(an_a, Future):
                            new_a[new_a_idx] = an_a
                        else:
                            assert False, "Tuple does not contain futures: %s" % str(a)
                else:
                    new_a[0] = Future()
                    new_a[1] = Future()
                    new_a[0].set_result(a)
                    new_a[1].set_result(a)

                return (tuple(new_a), s)
            return State(split_wire_state_function)
        return split_wire_bind_function

    return KleisliArrow(return_, get_split_wire_bind_function())


def cons_unsplit_wire(unsplit_function):
    """Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
    def get_unsplit_wrapper(inner_function):
        def unsplit_wrapper(a, s):
            top = a[0].result() if isinstance(a[0], Future) else a[0]
            bottom = a[1].result() if isinstance(a[1], Future) else a[1]
            return inner_function(top, bottom)
        return unsplit_wrapper
    
    return cons_function_component(get_unsplit_wrapper(unsplit_function))


def cons_if_component(condition_function, then_component, else_component):
    """Construct a conditional execution component. If the conditional function evaluates to true the 'then' component is executed. Otherwise, the 'else' component is executed. Returns a Kleisli arrow."""
    if not isinstance(then_component, KleisliArrow):
        raise ValueError("Then component must be a KleisliArrow")
    if not isinstance(else_component, KleisliArrow):
        raise ValueError("Else component must be a KleisliArrow")

    def get_test_bind_function():
        def test_bind_function(bind_a):
            def test_state_function(wrapped_state):
                # Unpack state
                state = wrapped_state.state

                def do_transformation(a, s):
                    # Handle input
                    if isinstance(a, tuple):
                        the_a = (a[0].result(), a[1].result())
                    elif isinstance(a, Future):
                        the_a = a.result()
                    else:
                        raise ValueError("Test state function has value that is not of type tuple or Future")

                    condition_result = condition_function(a, s)
                    new_a = Left(a) if condition_result else Right(a)

                    return new_a

                # Execute
                new_future = wrapped_state.executor.submit(do_transformation,
                                                           bind_a,
                                                           state)

                # New value/state pair
                return (new_future, wrapped_state)
            return State(test_state_function)
        return test_bind_function

    def get_if_left_function(left_function):
        def if_left_function(futured_either):
            either = futured_either.result()
            if isinstance(either, Left):
                return left_function(either.val) >= (lambda a: return_(Left(a)))
            elif isinstance(either, Right):
                return return_(either)
            else:
                raise ValueError("Result of future must be of type Either")
        return if_left_function

    if_comp = KleisliArrow(return_, get_test_bind_function()) >> \
              ((KleisliArrowChoice(return_, get_if_left_function(then_component._func)) >> \
                KleisliArrowChoice(return_, else_component._func).right()) >> \
               KleisliArrow(return_, lambda either: return_(either.val)))

    return if_comp


def __handle_output(o):
    po = (o[0].result() if isinstance(o[0], Future) else o[0],
          o[1].result() if isinstance(o[1], Future) else o[1]) if isinstance(o, tuple) \
         else o.result()
    return po


def __kleisli_wrapper(f):
    def wrapper(executor, pipeline, input, state):
        """Run, evaluate, or execute a pipeline."""
        if isinstance(input, tuple):
            future = (Future(), Future())
            for fu, v in zip(future, input):
                fu.set_result(v)
        else:
            future = Future()
            future.set_result(input)
        state_monad = KleisliArrow.runKleisli(pipeline, future)
        return f(state_monad, WrappedState(executor, state))
    return wrapper


@__kleisli_wrapper
def run_pipeline(state_monad, state):
     output = State.runState(state_monad, state)
     return (__handle_output(output[0]), output[1].state)


@__kleisli_wrapper
def eval_pipeline(state_monad, state):
    future = State.evalState(state_monad, state)
    return __handle_output(future)


@__kleisli_wrapper
def exec_pipeline(state_monad, state):
    wrapped_state = State.execState(state_monad, state)
    return wrapped_state.state