Welcome to mirror list, hosted at ThFree Co, Russian Federation.

parallel_helpers.py « helpers « pypeline « src - github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 0c84bb4a9bd846ed153a849ed77143c9966b655d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#
# Copyright Applied Language Solutions 2012
#
# This file is part of Pypeline.
#
# Pypeline is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pypeline is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pypeline.  If not, see <http://www.gnu.org/licenses/>.
#
from concurrent.futures import Future
from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
from pypeline.core.types.state import State, return_
from pypeline.helpers import helpers


class WrapperState(object):
    def __init__(self, executor, state):
        self.executor = executor
        self.state = state


def cons_function_component(function,
                            input_forming_function = None,
                            output_forming_function = None,
                            state_mutator = None):
    """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
    def get_component_function_wrapper(inner_function):
      def component_function_wrapper(future, wrapper_state):
        value = future.result()
        this_future = wrapper_state.executor.submit(inner_function, value, wrapper_state.state)
        return this_future
      return component_function_wrapper

    component = helpers.cons_function_component(get_component_function_wrapper(function),
                                                input_forming_function,
                                                output_forming_function,
                                                state_mutator)

    return component


def cons_wire(schema_conv_function):
    """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
    def get_wire_wrapper(inner_function):
        def wire_wrapper(future, wrapper_state):
            new_value = inner_function(future.result(), wrapper_state.state)
            nf = Future()
            nf.set_result(new_value)
            return nf
        return wire_wrapper

    return helpers.cons_wire(get_wire_wrapper(schema_conv_function))


def cons_dictionary_wire(conversions):
    """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
    def get_dictionary_wire_wrapper(conversion_function):
        def dictionary_wire_wrapper(f, s):
            new_value = conversion_function(f.result(), None)
            nf = Future()
            nf.set_result(new_value)
            return nf

        return dictionary_wire_wrapper

    function = helpers.get_dictionary_conversion_function(conversions)
    return helpers.cons_dictionary_wire(get_dictionary_conversion_function(function))


def cons_split_wire():
    """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
    return split(return_)


def cons_unsplit_wire(unsplit_function):
    """Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
    def get_unsplit_wrapper(inner_function):
        def unsplit_wrapper(top_future, bottom_future):
            t_val = top_future.result()
            b_val = bottom_future.result()
            nf = Future()
            nf.set_result(inner_function(t_val, b_val))
            return nf
        return unsplit_wrapper
    
    return unsplit(return_, get_unsplit_wrapper(unsplit_function))


def __kleisli_wrapper(f):
    def wrapper(executor, pipeline, input, state):
        """Run, evaluate, or execute a pipeline."""
        future = Future()
        future.set_result(input)
        state_monad = KleisliArrow.runKleisli(pipeline, future)
        return f(state_monad, WrapperState(executor, state))
    return wrapper


@__kleisli_wrapper
def run_pipeline(state_monad, state):
     return State.runState(state_monad, state)


@__kleisli_wrapper
def eval_pipeline(state_monad, state):
    return State.evalState(state_monad, state)


@__kleisli_wrapper
def exec_pipeline(state_monad, state):
    return State.execState(state_monad, state)