Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/ianj-als/pypeline.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Johnson <ian.johnson@appliedlanguage.com>2012-10-15 18:40:41 +0400
committerIan Johnson <ian.johnson@appliedlanguage.com>2012-10-15 18:40:41 +0400
commit2eadc2754c7c6eb2d4ba2b2abc0aa2b0a2516933 (patch)
tree7c631bdbfceb162e74eb71c5e6a78f30fc1cacbf
parentc2916085c5ab3aa7f010d5ba966354bbaff05ce5 (diff)
Added helpers to create pipeline components that use futures rather than values. These pipeline components run in an executor.
-rw-r--r--src/pypeline/helpers/helpers.py9
-rw-r--r--src/pypeline/helpers/parallel_helpers.py120
2 files changed, 127 insertions, 2 deletions
diff --git a/src/pypeline/helpers/helpers.py b/src/pypeline/helpers/helpers.py
index 5a6b7cd..d00f575 100644
--- a/src/pypeline/helpers/helpers.py
+++ b/src/pypeline/helpers/helpers.py
@@ -111,7 +111,7 @@ def cons_function_component(function,
input_forming_function = None,
output_forming_function = None,
state_mutator = None):
- """Construct a component based on a function. Any input or output forming functions shall be called if provided. In this mode only the Kleisli arrow is returned."""
+ """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
if type(function) is not types.FunctionType and \
type(function) is not types.MethodType:
raise ValueError("Must be a function or method")
@@ -148,7 +148,7 @@ def cons_wire(schema_conv_function):
def cons_dictionary_wire(conversions):
"""Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
- return cons_wire(lambda a, _: {conversions[key]: a[key] for key in conversions})
+ return cons_wire(get_dictionary_conversion_function(conversions))
def cons_split_wire():
@@ -202,3 +202,8 @@ def eval_pipeline(state_monad, state):
@__kleisli_wrapper
def exec_pipeline(state_monad, state):
return State.execState(state_monad, state)
+
+
+def get_dictionary_conversion_function(conversion):
+ """Returns a function that completes the dictionary conversions as part of a wire."""
+ return lambda a, _: {conversions[key]: a[key] for key in conversions}
diff --git a/src/pypeline/helpers/parallel_helpers.py b/src/pypeline/helpers/parallel_helpers.py
new file mode 100644
index 0000000..0c84bb4
--- /dev/null
+++ b/src/pypeline/helpers/parallel_helpers.py
@@ -0,0 +1,120 @@
+#
+# Copyright Applied Language Solutions 2012
+#
+# This file is part of Pypeline.
+#
+# Pypeline is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Lesser General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# Pypeline is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with Pypeline. If not, see <http://www.gnu.org/licenses/>.
+#
+from concurrent.futures import Future
+from pypeline.core.arrows.kleisli_arrow import KleisliArrow, split, unsplit
+from pypeline.core.types.state import State, return_
+from pypeline.helpers import helpers
+
+
+class WrapperState(object):
+ def __init__(self, executor, state):
+ self.executor = executor
+ self.state = state
+
+
+def cons_function_component(function,
+ input_forming_function = None,
+ output_forming_function = None,
+ state_mutator = None):
+ """Construct a component based on a function. Any input or output forming functions shall be called if provided. A Kleisli arrow is returned."""
+ def get_component_function_wrapper(inner_function):
+ def component_function_wrapper(future, wrapper_state):
+ value = future.result()
+ this_future = wrapper_state.executor.submit(inner_function, value, wrapper_state.state)
+ return this_future
+ return component_function_wrapper
+
+ component = helpers.cons_function_component(get_component_function_wrapper(function),
+ input_forming_function,
+ output_forming_function,
+ state_mutator)
+
+ return component
+
+
+def cons_wire(schema_conv_function):
+ """Construct a wire. A wire is a Kleisli arrow that converts data from from one pipeline component's output schema to another pipeline component's input schema."""
+ def get_wire_wrapper(inner_function):
+ def wire_wrapper(future, wrapper_state):
+ new_value = inner_function(future.result(), wrapper_state.state)
+ nf = Future()
+ nf.set_result(new_value)
+ return nf
+ return wire_wrapper
+
+ return helpers.cons_wire(get_wire_wrapper(schema_conv_function))
+
+
+def cons_dictionary_wire(conversions):
+ """Construct a wire that converts between two dictionaries. The keys of the conversions dictionary are keys in the output dictionary, of the preceeding component, whose values will be used to populate a dictionary whose keys are the value of the conversions dictionary.\n\nE.g., output = {'int': 9, 'string': 'hello'}, and conversions = {'int': 'int_two', 'string': 'string_two'}, yields an input dictionary, to the next component, input = {'int_two': 9, 'string_two': 'hello'}."""
+ def get_dictionary_wire_wrapper(conversion_function):
+ def dictionary_wire_wrapper(f, s):
+ new_value = conversion_function(f.result(), None)
+ nf = Future()
+ nf.set_result(new_value)
+ return nf
+
+ return dictionary_wire_wrapper
+
+ function = helpers.get_dictionary_conversion_function(conversions)
+ return helpers.cons_dictionary_wire(get_dictionary_conversion_function(function))
+
+
+def cons_split_wire():
+ """Construct a wire that duplicates its input and produces a pair from this value. See: ***, first, second, and unsplit arrow operators."""
+ return split(return_)
+
+
+def cons_unsplit_wire(unsplit_function):
+ """Construct a wire that takes a pair and applies a function to this pair to combine them into one value."""
+ def get_unsplit_wrapper(inner_function):
+ def unsplit_wrapper(top_future, bottom_future):
+ t_val = top_future.result()
+ b_val = bottom_future.result()
+ nf = Future()
+ nf.set_result(inner_function(t_val, b_val))
+ return nf
+ return unsplit_wrapper
+
+ return unsplit(return_, get_unsplit_wrapper(unsplit_function))
+
+
+def __kleisli_wrapper(f):
+ def wrapper(executor, pipeline, input, state):
+ """Run, evaluate, or execute a pipeline."""
+ future = Future()
+ future.set_result(input)
+ state_monad = KleisliArrow.runKleisli(pipeline, future)
+ return f(state_monad, WrapperState(executor, state))
+ return wrapper
+
+
+@__kleisli_wrapper
+def run_pipeline(state_monad, state):
+ return State.runState(state_monad, state)
+
+
+@__kleisli_wrapper
+def eval_pipeline(state_monad, state):
+ return State.evalState(state_monad, state)
+
+
+@__kleisli_wrapper
+def exec_pipeline(state_monad, state):
+ return State.execState(state_monad, state)