From 2eadc2754c7c6eb2d4ba2b2abc0aa2b0a2516933 Mon Sep 17 00:00:00 2001 From: Ian Johnson Date: Mon, 15 Oct 2012 15:40:41 +0100 Subject: Added helpers to create pipeline components that use futures rather than values. These pipeline components run in an executor. --- src/pypeline/helpers/helpers.py | 9 ++- src/pypeline/helpers/parallel_helpers.py | 120 +++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 src/pypeline/helpers/parallel_helpers.py diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py index 5a6b7cd..d00f575 100644 --- a/src/pypeline/helpers/helpers.py +++ b/src/pypeline/helpers/helpers.py @@ -111,7 +111,7 @@ def cons_function_component(function, input_forming_function = None, output_forming_function = None, state_mutator = None): - """Construct a component based on a function. Any input or output forming functions shall be called if provided. In this mode only the Kleisli arrow is returned.""" + """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned.""" if type(function) is not types.FunctionType and \ type(function) is not types.MethodType: raise ValueError("Must be a function or method") @@ -148,7 +148,7 @@ def cons_wire(schema_conv_function): def cons_dictionary_wire(conversions): """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}.""" - return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions}) + return cons_wire(get_dictionary_conversion_function(conversions)) def cons_split_wire(): @@ -202,3 +202,8 @@ def eval_pipeline(state_monad, state): @__kleisli_wrapper def exec_pipeline(state_monad, state): return State.execState(state_monad, state) + + +def get_dictionary_conversion_function(conversion): + """Returns a function that completes the dictionary conversions as part of a wire.""" + return lambda a, _: {conversions[key]: a[key] for key in conversions} diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py new file mode 100644 index 0000000..0c84bb4 --- /dev/null +++ b/src/pypeline/helpers/parallel_helpers.py @@ -0,0 +1,120 @@ +# +# Copyright Applied Language Solutions 2012 +# +# This file is part of Pypeline. +# +# Pypeline is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Pypeline is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Pypeline. If not, see . +# +from concurrent.futures import Future +from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit +from pypeline.core.types.state import State, return_ +from pypeline.helpers import helpers + + +class WrapperState(object): + def __init__(self, executor, state): + self.executor = executor + self.state = state + + +def cons_function_component(function, + input_forming_function = None, + output_forming_function = None, + state_mutator = None): + """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned.""" + def get_component_function_wrapper(inner_function): + def component_function_wrapper(future, wrapper_state): + value = future.result() + this_future = wrapper_state.executor.submit(inner_function, value, wrapper_state.state) + return this_future + return component_function_wrapper + + component = helpers.cons_function_component(get_component_function_wrapper(function), + input_forming_function, + output_forming_function, + state_mutator) + + return component + + +def cons_wire(schema_conv_function): + """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema.""" + def get_wire_wrapper(inner_function): + def wire_wrapper(future, wrapper_state): + new_value = inner_function(future.result(), wrapper_state.state) + nf = Future() + nf.set_result(new_value) + return nf + return wire_wrapper + + return helpers.cons_wire(get_wire_wrapper(schema_conv_function)) + + +def cons_dictionary_wire(conversions): + """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}.""" + def get_dictionary_wire_wrapper(conversion_function): + def dictionary_wire_wrapper(f, s): + new_value = conversion_function(f.result(), None) + nf = Future() + nf.set_result(new_value) + return nf + + return dictionary_wire_wrapper + + function = helpers.get_dictionary_conversion_function(conversions) + return helpers.cons_dictionary_wire(get_dictionary_conversion_function(function)) + + +def cons_split_wire(): + """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators.""" + return split(return_) + + +def cons_unsplit_wire(unsplit_function): + """Construct a wire that takes a pair and applies a function to this pair to combine them into one value.""" + def get_unsplit_wrapper(inner_function): + def unsplit_wrapper(top_future, bottom_future): + t_val = top_future.result() + b_val = bottom_future.result() + nf = Future() + nf.set_result(inner_function(t_val, b_val)) + return nf + return unsplit_wrapper + + return unsplit(return_, get_unsplit_wrapper(unsplit_function)) + + +def __kleisli_wrapper(f): + def wrapper(executor, pipeline, input, state): + """Run, evaluate, or execute a pipeline.""" + future = Future() + future.set_result(input) + state_monad = KleisliArrow.runKleisli(pipeline, future) + return f(state_monad, WrapperState(executor, state)) + return wrapper + + +@__kleisli_wrapper +def run_pipeline(state_monad, state): + return State.runState(state_monad, state) + + +@__kleisli_wrapper +def eval_pipeline(state_monad, state): + return State.evalState(state_monad, state) + + +@__kleisli_wrapper +def exec_pipeline(state_monad, state): + return State.execState(state_monad, state) -- cgit v1.2.3