diff options
author | lovetox <philipp@hoerist.com> | 2020-08-04 13:13:53 +0300 |
---|---|---|
committer | lovetox <philipp@hoerist.com> | 2020-09-23 00:21:15 +0300 |
commit | 8bd67f6f8c3c7327eaa06172469901ef5a42e990 (patch) | |
tree | 708331048850013348b8580733144b8d96d07674 /nbxmpp | |
parent | 190ce1ec3dcdf371055c9740b80652a65ede7191 (diff) |
Add task module
Module provides a decorator which wraps a generator in a Task
The Task iterates the generator until it ends and processes the yielded values.
Example:
```
@iq_request_task
request_something(self, jid):
task = yield
response = yield get_request_iq()
result = process(response)
if is_error_result(result):
raise StanzaError(result)
yield result
```
Diffstat (limited to 'nbxmpp')
-rw-r--r-- | nbxmpp/client.py | 14 | ||||
-rw-r--r-- | nbxmpp/errors.py | 9 | ||||
-rw-r--r-- | nbxmpp/modules/base.py | 10 | ||||
-rw-r--r-- | nbxmpp/task.py | 283 |
4 files changed, 316 insertions, 0 deletions
diff --git a/nbxmpp/client.py b/nbxmpp/client.py index fb2497c..23585fe 100644 --- a/nbxmpp/client.py +++ b/nbxmpp/client.py @@ -119,6 +119,7 @@ class Client(Observable): self._mode = Mode.CLIENT self._ping_source_id = None + self._tasks = [] self._dispatcher = StanzaDispatcher(self) self._dispatcher.subscribe('before-dispatch', self._on_before_dispatch) @@ -130,6 +131,15 @@ class Client(Observable): self._state = StreamState.DISCONNECTED + def add_task(self, task): + self._tasks.append(task) + + def remove_task(self, task, _context): + try: + self._tasks.remove(task) + except Exception: + pass + @property def log_context(self): return self._log_context @@ -447,6 +457,8 @@ class Client(Observable): def _on_disconnected(self, _connection, _signal_name): self.state = StreamState.DISCONNECTED + for task in self._tasks: + task.cancel() self._remove_ping_timer() self._dispatcher.remove_ping_callback(self._ping_id) self._reset_stream() @@ -833,6 +845,8 @@ class Client(Observable): self._dispatcher.unregister_handler(*args, **kwargs) def destroy(self): + for task in self._tasks: + task.cancel() self._remove_ping_timer() self._smacks = None self._sasl = None diff --git a/nbxmpp/errors.py b/nbxmpp/errors.py index a8c22b4..c4e76c7 100644 --- a/nbxmpp/errors.py +++ b/nbxmpp/errors.py @@ -104,3 +104,12 @@ class MalformedStanzaError(BaseError): BaseError.__init__(self, is_fatal=is_fatal) self.stanza = stanza self.text = str(text) + + +class CancelledError(BaseError): + + log_level = 'info' + + def __init__(self): + BaseError.__init__(self, is_fatal=True) + self.text = 'Task has been cancelled' diff --git a/nbxmpp/modules/base.py b/nbxmpp/modules/base.py index b245b6a..952cf1b 100644 --- a/nbxmpp/modules/base.py +++ b/nbxmpp/modules/base.py @@ -21,7 +21,17 @@ from nbxmpp.util import LogAdapter class BaseModule: + + _depends = {} + def __init__(self, client): logger_name = 'nbxmpp.m.%s' % self.__class__.__name__.lower() self._log = LogAdapter(logging.getLogger(logger_name), {'context': client.log_context}) + + def __getattr__(self, name): + if name not in self._depends: + raise AttributeError + + module = self._client.get_module(self._depends[name]) + return getattr(module, name) diff --git a/nbxmpp/task.py b/nbxmpp/task.py new file mode 100644 index 0000000..1d8a732 --- /dev/null +++ b/nbxmpp/task.py @@ -0,0 +1,283 @@ +# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com> +# +# This file is part of nbxmpp. +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 3 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; If not, see <http://www.gnu.org/licenses/>. + +import weakref +import inspect +import logging +from enum import IntEnum +from functools import wraps + +from nbxmpp.errors import is_error +from nbxmpp.errors import CancelledError +from nbxmpp.simplexml import Node + + +log = logging.getLogger('nbxmpp.task') + + +class _ResultSet: + pass + +ResultSet = _ResultSet() + + +class NoType: + pass + + +class TaskState(IntEnum): + INIT = 0 + RUNNING = 1 + FINISHED = 2 + CANCELLED = 3 + + @property + def is_init(self): + return self == TaskState.INIT + + @property + def is_running(self): + return self == TaskState.RUNNING + + @property + def is_finished(self): + return self == TaskState.FINISHED + + @property + def is_cancelled(self): + return self == TaskState.CANCELLED + + +def iq_request_task(func): + @wraps(func) + def func_wrapper(self, *args, + callback=None, user_data=None, **kwargs): + task = IqRequestTask(func(self, *args, **kwargs), + self._log, + self._client) + self._client.add_task(task) + task.set_finalize_func(self._client.remove_task) + task.set_user_data(user_data) + if callback is not None: + task.add_done_callback(callback) + task.start() + return task + return func_wrapper + + +def is_fatal_error(error): + if is_error(error): + return error.is_fatal + return isinstance(error, Exception) + + +class Task: + + ''' + Base class for wrapping a generator method. + + It runs the generator depending on what the generator yields. If the + generator yields another generator a sub task is created. If it yields + a type defined in _process_types, _run_async() is called which needs to + be implemented by classes. + + the implementation of _run_async() must be really async, means it should + not call _async_finished() in the same mainloop cycle. Otherwise sub tasks + may break. _async_finished() needs to call _next_step(result). + ''' + + _process_types = (NoType,) + + def __init__(self, gen, logger=log): + self._logger = logger + self._gen = gen + self._done_callbacks = [] + self._sub_task = None + self._result = None + self._error = None + self._user_data = None + self._finalize_func = None + self._finalize_context = None + self._state = TaskState.INIT + + @property + def state(self): + return self._state + + def add_done_callback(self, callback, weak=True): + if self._state in (TaskState.FINISHED, TaskState.CANCELLED): + raise RuntimeError('Task is finished') + + if weak: + if inspect.ismethod(callback): + callback = weakref.WeakMethod(callback) + elif inspect.isfunction(callback): + callback = weakref.ref(callback) + else: + ValueError('Unknown callback object') + + self._done_callbacks.append(callback) + + def start(self): + if not self._state.is_init: + raise RuntimeError('Task already started') + + self._state = TaskState.RUNNING + next(self._gen) + self._next_step(self) + + def _run_async(self, data): + raise NotImplementedError + + def _async_finished(self, *args, **kwargs): + raise NotImplementedError + + def _sub_task_completed(self, task): + self._sub_task = None + if self._state.is_cancelled: + return + + result = task.get_result() + if is_fatal_error(result): + self._error = result + self._set_finished() + else: + self._next_step(result) + + def _next_step(self, result): + try: + res = self._gen.send(result) + except StopIteration: + self._set_finished() + return + + except Exception as error: + self._log_if_fatal(error) + self._error = error + self._set_finished() + return + + if isinstance(res, self._process_types): + self._run_async(res) + + elif isinstance(res, Task): + if self._sub_task is not None: + RuntimeError('Only one sub task can be active') + + self._sub_task = res + self._sub_task.add_done_callback(self._sub_task_completed, + weak=False) + + else: + if res is not ResultSet: + self._result = res + self._set_finished() + + def _set_finished(self): + self._state = TaskState.FINISHED + self._invoke_callbacks() + self._finalize() + + def _log_if_fatal(self, error): + if is_error(error): + if error.is_fatal: + self._logger.log(error.log_level, error) + + elif isinstance(error, Exception): + self._logger.exception('Fatal Exception') + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + if isinstance(callback, weakref.WeakMethod): + callback = callback() + if callback is None: + return + + callback(self) + + def set_result(self, result): + self._result = result + return ResultSet + + def get_result(self): + # if self._error is None, there was no error + # but None is a valid value for self._result + if self._error is not None: + return self._error + return self._result + + def finish(self): + if self._error is not None: + raise self._error # pylint: disable=raising-bad-type + return self._result + + def set_user_data(self, user_data): + self._user_data = user_data + + def get_user_data(self): + return self._user_data + + def set_finalize_func(self, func, context=None): + self._finalize_func = func + self._finalize_context = context + + def cancel(self): + if self._state.is_cancelled: + return + + self._state = TaskState.CANCELLED + if self._sub_task is not None: + self._sub_task.cancel() + + self._error = CancelledError() + self._invoke_callbacks() + self._finalize() + + def _finalize(self): + self._done_callbacks.clear() + self._sub_task = None + self._error = None + self._result = None + self._user_data = None + self._gen.close() + if self._finalize_func is not None: + self._finalize_func(self, self._finalize_context) + + +class IqRequestTask(Task): + + ''' + A Task for running IQ requests + + ''' + + _process_types = (Node,) + + def __init__(self, gen, logger, client): + super().__init__(gen, logger) + self._client = client + + def _run_async(self, stanza): + self._client.send_stanza(stanza, callback=self._async_finished) + + def _async_finished(self, _client, result, *args, **kwargs): + if self._state != TaskState.CANCELLED: + self._next_step(result) + + def _finalize(self): + self._client = None + super()._finalize() |