Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/nbxmpp
diff options
context:
space:
mode:
authorlovetox <philipp@hoerist.com>2020-08-04 13:13:53 +0300
committerlovetox <philipp@hoerist.com>2020-09-23 00:21:15 +0300
commit8bd67f6f8c3c7327eaa06172469901ef5a42e990 (patch)
tree708331048850013348b8580733144b8d96d07674 /nbxmpp
parent190ce1ec3dcdf371055c9740b80652a65ede7191 (diff)
Add task module
Module provides a decorator which wraps a generator in a Task The Task iterates the generator until it ends and processes the yielded values. Example: ``` @iq_request_task request_something(self, jid): task = yield response = yield get_request_iq() result = process(response) if is_error_result(result): raise StanzaError(result) yield result ```
Diffstat (limited to 'nbxmpp')
-rw-r--r--nbxmpp/client.py14
-rw-r--r--nbxmpp/errors.py9
-rw-r--r--nbxmpp/modules/base.py10
-rw-r--r--nbxmpp/task.py283
4 files changed, 316 insertions, 0 deletions
diff --git a/nbxmpp/client.py b/nbxmpp/client.py
index fb2497c..23585fe 100644
--- a/nbxmpp/client.py
+++ b/nbxmpp/client.py
@@ -119,6 +119,7 @@ class Client(Observable):
self._mode = Mode.CLIENT
self._ping_source_id = None
+ self._tasks = []
self._dispatcher = StanzaDispatcher(self)
self._dispatcher.subscribe('before-dispatch', self._on_before_dispatch)
@@ -130,6 +131,15 @@ class Client(Observable):
self._state = StreamState.DISCONNECTED
+ def add_task(self, task):
+ self._tasks.append(task)
+
+ def remove_task(self, task, _context):
+ try:
+ self._tasks.remove(task)
+ except Exception:
+ pass
+
@property
def log_context(self):
return self._log_context
@@ -447,6 +457,8 @@ class Client(Observable):
def _on_disconnected(self, _connection, _signal_name):
self.state = StreamState.DISCONNECTED
+ for task in self._tasks:
+ task.cancel()
self._remove_ping_timer()
self._dispatcher.remove_ping_callback(self._ping_id)
self._reset_stream()
@@ -833,6 +845,8 @@ class Client(Observable):
self._dispatcher.unregister_handler(*args, **kwargs)
def destroy(self):
+ for task in self._tasks:
+ task.cancel()
self._remove_ping_timer()
self._smacks = None
self._sasl = None
diff --git a/nbxmpp/errors.py b/nbxmpp/errors.py
index a8c22b4..c4e76c7 100644
--- a/nbxmpp/errors.py
+++ b/nbxmpp/errors.py
@@ -104,3 +104,12 @@ class MalformedStanzaError(BaseError):
BaseError.__init__(self, is_fatal=is_fatal)
self.stanza = stanza
self.text = str(text)
+
+
+class CancelledError(BaseError):
+
+ log_level = 'info'
+
+ def __init__(self):
+ BaseError.__init__(self, is_fatal=True)
+ self.text = 'Task has been cancelled'
diff --git a/nbxmpp/modules/base.py b/nbxmpp/modules/base.py
index b245b6a..952cf1b 100644
--- a/nbxmpp/modules/base.py
+++ b/nbxmpp/modules/base.py
@@ -21,7 +21,17 @@ from nbxmpp.util import LogAdapter
class BaseModule:
+
+ _depends = {}
+
def __init__(self, client):
logger_name = 'nbxmpp.m.%s' % self.__class__.__name__.lower()
self._log = LogAdapter(logging.getLogger(logger_name),
{'context': client.log_context})
+
+ def __getattr__(self, name):
+ if name not in self._depends:
+ raise AttributeError
+
+ module = self._client.get_module(self._depends[name])
+ return getattr(module, name)
diff --git a/nbxmpp/task.py b/nbxmpp/task.py
new file mode 100644
index 0000000..1d8a732
--- /dev/null
+++ b/nbxmpp/task.py
@@ -0,0 +1,283 @@
+# Copyright (C) 2020 Philipp Hörist <philipp AT hoerist.com>
+#
+# This file is part of nbxmpp.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 3
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; If not, see <http://www.gnu.org/licenses/>.
+
+import weakref
+import inspect
+import logging
+from enum import IntEnum
+from functools import wraps
+
+from nbxmpp.errors import is_error
+from nbxmpp.errors import CancelledError
+from nbxmpp.simplexml import Node
+
+
+log = logging.getLogger('nbxmpp.task')
+
+
+class _ResultSet:
+ pass
+
+ResultSet = _ResultSet()
+
+
+class NoType:
+ pass
+
+
+class TaskState(IntEnum):
+ INIT = 0
+ RUNNING = 1
+ FINISHED = 2
+ CANCELLED = 3
+
+ @property
+ def is_init(self):
+ return self == TaskState.INIT
+
+ @property
+ def is_running(self):
+ return self == TaskState.RUNNING
+
+ @property
+ def is_finished(self):
+ return self == TaskState.FINISHED
+
+ @property
+ def is_cancelled(self):
+ return self == TaskState.CANCELLED
+
+
+def iq_request_task(func):
+ @wraps(func)
+ def func_wrapper(self, *args,
+ callback=None, user_data=None, **kwargs):
+ task = IqRequestTask(func(self, *args, **kwargs),
+ self._log,
+ self._client)
+ self._client.add_task(task)
+ task.set_finalize_func(self._client.remove_task)
+ task.set_user_data(user_data)
+ if callback is not None:
+ task.add_done_callback(callback)
+ task.start()
+ return task
+ return func_wrapper
+
+
+def is_fatal_error(error):
+ if is_error(error):
+ return error.is_fatal
+ return isinstance(error, Exception)
+
+
+class Task:
+
+ '''
+ Base class for wrapping a generator method.
+
+ It runs the generator depending on what the generator yields. If the
+ generator yields another generator a sub task is created. If it yields
+ a type defined in _process_types, _run_async() is called which needs to
+ be implemented by classes.
+
+ the implementation of _run_async() must be really async, means it should
+ not call _async_finished() in the same mainloop cycle. Otherwise sub tasks
+ may break. _async_finished() needs to call _next_step(result).
+ '''
+
+ _process_types = (NoType,)
+
+ def __init__(self, gen, logger=log):
+ self._logger = logger
+ self._gen = gen
+ self._done_callbacks = []
+ self._sub_task = None
+ self._result = None
+ self._error = None
+ self._user_data = None
+ self._finalize_func = None
+ self._finalize_context = None
+ self._state = TaskState.INIT
+
+ @property
+ def state(self):
+ return self._state
+
+ def add_done_callback(self, callback, weak=True):
+ if self._state in (TaskState.FINISHED, TaskState.CANCELLED):
+ raise RuntimeError('Task is finished')
+
+ if weak:
+ if inspect.ismethod(callback):
+ callback = weakref.WeakMethod(callback)
+ elif inspect.isfunction(callback):
+ callback = weakref.ref(callback)
+ else:
+ ValueError('Unknown callback object')
+
+ self._done_callbacks.append(callback)
+
+ def start(self):
+ if not self._state.is_init:
+ raise RuntimeError('Task already started')
+
+ self._state = TaskState.RUNNING
+ next(self._gen)
+ self._next_step(self)
+
+ def _run_async(self, data):
+ raise NotImplementedError
+
+ def _async_finished(self, *args, **kwargs):
+ raise NotImplementedError
+
+ def _sub_task_completed(self, task):
+ self._sub_task = None
+ if self._state.is_cancelled:
+ return
+
+ result = task.get_result()
+ if is_fatal_error(result):
+ self._error = result
+ self._set_finished()
+ else:
+ self._next_step(result)
+
+ def _next_step(self, result):
+ try:
+ res = self._gen.send(result)
+ except StopIteration:
+ self._set_finished()
+ return
+
+ except Exception as error:
+ self._log_if_fatal(error)
+ self._error = error
+ self._set_finished()
+ return
+
+ if isinstance(res, self._process_types):
+ self._run_async(res)
+
+ elif isinstance(res, Task):
+ if self._sub_task is not None:
+ RuntimeError('Only one sub task can be active')
+
+ self._sub_task = res
+ self._sub_task.add_done_callback(self._sub_task_completed,
+ weak=False)
+
+ else:
+ if res is not ResultSet:
+ self._result = res
+ self._set_finished()
+
+ def _set_finished(self):
+ self._state = TaskState.FINISHED
+ self._invoke_callbacks()
+ self._finalize()
+
+ def _log_if_fatal(self, error):
+ if is_error(error):
+ if error.is_fatal:
+ self._logger.log(error.log_level, error)
+
+ elif isinstance(error, Exception):
+ self._logger.exception('Fatal Exception')
+
+ def _invoke_callbacks(self):
+ for callback in self._done_callbacks:
+ if isinstance(callback, weakref.WeakMethod):
+ callback = callback()
+ if callback is None:
+ return
+
+ callback(self)
+
+ def set_result(self, result):
+ self._result = result
+ return ResultSet
+
+ def get_result(self):
+ # if self._error is None, there was no error
+ # but None is a valid value for self._result
+ if self._error is not None:
+ return self._error
+ return self._result
+
+ def finish(self):
+ if self._error is not None:
+ raise self._error # pylint: disable=raising-bad-type
+ return self._result
+
+ def set_user_data(self, user_data):
+ self._user_data = user_data
+
+ def get_user_data(self):
+ return self._user_data
+
+ def set_finalize_func(self, func, context=None):
+ self._finalize_func = func
+ self._finalize_context = context
+
+ def cancel(self):
+ if self._state.is_cancelled:
+ return
+
+ self._state = TaskState.CANCELLED
+ if self._sub_task is not None:
+ self._sub_task.cancel()
+
+ self._error = CancelledError()
+ self._invoke_callbacks()
+ self._finalize()
+
+ def _finalize(self):
+ self._done_callbacks.clear()
+ self._sub_task = None
+ self._error = None
+ self._result = None
+ self._user_data = None
+ self._gen.close()
+ if self._finalize_func is not None:
+ self._finalize_func(self, self._finalize_context)
+
+
+class IqRequestTask(Task):
+
+ '''
+ A Task for running IQ requests
+
+ '''
+
+ _process_types = (Node,)
+
+ def __init__(self, gen, logger, client):
+ super().__init__(gen, logger)
+ self._client = client
+
+ def _run_async(self, stanza):
+ self._client.send_stanza(stanza, callback=self._async_finished)
+
+ def _async_finished(self, _client, result, *args, **kwargs):
+ if self._state != TaskState.CANCELLED:
+ self._next_step(result)
+
+ def _finalize(self):
+ self._client = None
+ super()._finalize()