diff options
author | Philipp Hörist <philipp@hoerist.com> | 2022-12-29 19:24:53 +0300 |
---|---|---|
committer | Philipp Hörist <philipp@hoerist.com> | 2022-12-29 19:24:53 +0300 |
commit | d595d93b4ca69cfd322bdb8eae5b1722cd1c9f82 (patch) | |
tree | 4882c9895e141f36fc422866456d29eb576a2209 | |
parent | 0da8888db96f23a7329802af544b807310792166 (diff) |
imprv: HTTP: Better handle finish of messages
- Always use content sniffer
- Connect `got-body` to determain if body was fully received
- Connect `content-sniffed` and handle header checks there
- Add more logging
-rw-r--r-- | nbxmpp/const.py | 11 | ||||
-rw-r--r-- | nbxmpp/http.py | 104 | ||||
-rw-r--r-- | nbxmpp/util.py | 6 |
3 files changed, 85 insertions, 36 deletions
diff --git a/nbxmpp/const.py b/nbxmpp/const.py index fe0d9c5..932a98f 100644 --- a/nbxmpp/const.py +++ b/nbxmpp/const.py @@ -23,6 +23,7 @@ from packaging.version import Version from gi.repository import Gio from gi.repository import GLib +from gi.repository import Soup GLIB_VERSION = Version( @@ -706,3 +707,13 @@ NOT_ALLOWED_XML_CHARS = { '\x0C': '', '\x1B': '' } + + +SOUP_ENCODING: set[Soup.Encoding] = { + Soup.Encoding.UNRECOGNIZED, + Soup.Encoding.NONE, + Soup.Encoding.CONTENT_LENGTH, + Soup.Encoding.EOF, + Soup.Encoding.CHUNKED, + Soup.Encoding.BYTERANGES, +} diff --git a/nbxmpp/http.py b/nbxmpp/http.py index f0c30de..8f304ef 100644 --- a/nbxmpp/http.py +++ b/nbxmpp/http.py @@ -18,7 +18,6 @@ from __future__ import annotations from pathlib import Path from typing import Any -from typing import cast from typing import Literal from typing import Callable from typing import Optional @@ -31,6 +30,7 @@ from gi.repository import GLib from gi.repository import GObject import nbxmpp +from .util import convert_soup_encoding from .const import HTTPRequestError @@ -59,16 +59,11 @@ class HTTPLogAdapter(logging.LoggerAdapter): class HTTPSession: - def __init__(self, - user_agent: str = DEFAULT_USER_AGENT, - sniffer: bool = False - ) -> None: + def __init__(self, user_agent: str = DEFAULT_USER_AGENT) -> None: self._session = Soup.Session() self._session.set_user_agent(user_agent) - - if sniffer: - self._session.add_feature_by_type(Soup.ContentSniffer) + self._session.add_feature_by_type(Soup.ContentSniffer) def get_soup_session(self) -> Soup.Session: return self._session @@ -88,6 +83,7 @@ class HTTPRequest(GObject.GObject): __gtype_name__ = "HTTPRequest" __gsignals__ = { + 'content-sniffed': (SIGNAL_ACTIONS, None, (int, str)), 'starting-response-body': (SIGNAL_ACTIONS, None, ()), 'response-progress': (SIGNAL_ACTIONS, None, (float,)), 'request-progress': (SIGNAL_ACTIONS, None, (float,)), @@ -106,16 +102,18 @@ class HTTPRequest(GObject.GObject): self._sent_size = 0 self._cancellable = Gio.Cancellable() - self._input_stream = cast(Gio.InputStream, None) + self._input_stream: Optional[Gio.InputStream] = None self._output_stream: Optional[Gio.OutputStream] = None self._is_finished = False self._error: Optional[HTTPRequestError] = None self._is_complete = False self._timeout_reached = False self._timeout_id = None + self._no_content_length_set = False self._response_body_file: Optional[Gio.File] = None self._response_body_data = b'' + self._body_received = False self._request_body_file: Optional[Gio.File] = None self._request_body_data: Optional[bytes] = None @@ -146,7 +144,7 @@ class HTTPRequest(GObject.GObject): def get_error_string(self) -> str: if self._error == HTTPRequestError.STATUS_NOT_OK: return self._message.get_reason_phrase() - return f'{self._error}' + return repr(self._error) def get_response_headers(self) -> Soup.MessageHeaders: return self._message.get_response_headers() @@ -178,6 +176,7 @@ class HTTPRequest(GObject.GObject): if self._is_finished: raise ValueError('Session already finished') + self._log.info('Cancel requested') self._cancellable.cancel() def set_request_body_from_path(self, content_type: str, path: Path) -> None: @@ -277,6 +276,8 @@ class HTTPRequest(GObject.GObject): self._message.connect('wrote-body-data', self._on_request_body_progress) + self._message.connect('content-sniffed', self._on_content_sniffed) + self._message.connect('got-body', self._on_got_body) self._message.connect('finished', self._on_finished) soup_session = self._session.get_soup_session() @@ -312,23 +313,21 @@ class HTTPRequest(GObject.GObject): try: self._input_stream = session.send_finish(result) except GLib.Error as error: - self._log.error(error) quark = GLib.quark_try_string('g-io-error-quark') if error.matches(quark, Gio.IOErrorEnum.CANCELLED): - self._set_failed(HTTPRequestError.CANCELLED) - else: - self._set_failed(HTTPRequestError.UNKNOWN) - return + if self._no_content_length_set: + self._set_failed(HTTPRequestError.MISSING_CONTENT_LENGTH) + else: + self._set_failed(HTTPRequestError.CANCELLED) + return - if self._message.get_status() not in (Soup.Status.OK, Soup.Status.CREATED): - self._set_failed(HTTPRequestError.STATUS_NOT_OK) + self._log.error(error) + self._set_failed(HTTPRequestError.UNKNOWN) return - headers = self.get_response_headers() - self._response_content_length = headers.get_content_length() - self._response_content_type, _params = headers.get_content_type() - if self._response_content_length == 0: - self._set_failed(HTTPRequestError.MISSING_CONTENT_LENGTH) + if self._message.get_status() not in (Soup.Status.OK, + Soup.Status.CREATED): + self._set_failed(HTTPRequestError.STATUS_NOT_OK) return self._log.info('Start downloading response body') @@ -337,6 +336,7 @@ class HTTPRequest(GObject.GObject): self._read_async() def _read_async(self) -> None: + assert self._input_stream is not None self._input_stream.read_bytes_async(CHUNK_SIZE, GLib.PRIORITY_LOW, self._cancellable, @@ -399,26 +399,54 @@ class HTTPRequest(GObject.GObject): self._set_failed(error) - def _on_finished(self, _message: Soup.Message) -> None: - self._set_finished() + def _on_content_sniffed(self, + message: Soup.Message, + content_type: str, + _params: GLib.HashTable, + ) -> None: + + headers = message.get_response_headers() + encoding = headers.get_encoding() + if Soup.Encoding.CONTENT_LENGTH not in convert_soup_encoding(encoding): + self._log.warning('No content-length in response') + self._no_content_length_set = True + self.cancel() + return - def _set_finished(self) -> None: - status = self._message.get_status() - if status == Soup.Status.NONE: - # Message has not been sent, can happen when we cancel the message - # before it is sent. The finished signal triggers before the - # cancelled exception. + self._response_content_length = headers.get_content_length() + if self._response_content_length == 0: + self._log.warning('No content-length in response') + self._no_content_length_set = True + self.cancel() return - if self._is_finished: + self._response_content_type = content_type + self.emit('content-sniffed', + self._response_content_length, + self._response_content_type) + + def _on_got_body(self, _message: Soup.Message) -> None: + # This signal tells us that the full body was received. + # The `finished` signal is not a sure indicator if the message body + # was received in full, as its also triggered when a message is + # cancelled. + self._log.info('Body received') + self._body_received = True + + def _on_finished(self, _message: Soup.Message) -> None: + self._log.info('Message finished') + if not self._body_received: + # This can happen when the message is cancelled. The `finished` + # signal is raised whenever the input stream is closed. + # In the case the message was cancelled other parts of the code + # will call set_failed(). return - headers = self._message.get_response_headers() - content_length = headers.get_content_length() - if self._received_size != content_length: + if self._received_size != self._response_content_length: self._set_failed(HTTPRequestError.INCOMPLETE) return + status = self._message.get_status() if status not in (Soup.Status.OK, Soup.Status.CREATED): self._set_failed(HTTPRequestError.STATUS_NOT_OK) return @@ -426,6 +454,7 @@ class HTTPRequest(GObject.GObject): self._set_complete() def _set_failed(self, error: HTTPRequestError) -> None: + self._log.info('Set Failed: %s', error) self._is_finished = True if self._timeout_reached: self._timeout_id = None @@ -437,6 +466,7 @@ class HTTPRequest(GObject.GObject): self._cleanup() def _set_complete(self) -> None: + self._log.info('Set Complete') self._is_finished = True self._is_complete = True self._close_all_streams() @@ -460,12 +490,14 @@ class HTTPRequest(GObject.GObject): del self._cancellable del self._session - del self._input_stream - del self._output_stream del self._user_data + self._input_stream = None + self._output_stream = None + if self._timeout_id is not None: GLib.source_remove(self._timeout_id) + self._timeout_id = None self.emit('destroy') self.run_dispose() diff --git a/nbxmpp/util.py b/nbxmpp/util.py index 8b5291a..76fffe6 100644 --- a/nbxmpp/util.py +++ b/nbxmpp/util.py @@ -36,9 +36,11 @@ from functools import lru_cache from packaging.version import Version from gi.repository import Gio +from gi.repository import Soup from nbxmpp.protocol import DiscoInfoMalformed from nbxmpp.const import GIO_TLS_ERRORS +from nbxmpp.const import SOUP_ENCODING from nbxmpp.const import GLIB_VERSION from nbxmpp.namespaces import Namespace from nbxmpp.protocol import StanzaMalformed @@ -400,6 +402,10 @@ def convert_tls_error_flags(flags): return set(filter(lambda error: error & flags, GIO_TLS_ERRORS.keys())) +def convert_soup_encoding(flags: int) -> set[Soup.Encoding]: + return set(filter(lambda enc: enc & flags, SOUP_ENCODING)) + + def get_websocket_close_string(websocket: Any) -> str: data = websocket.get_close_data() code = websocket.get_close_code() |