From f5a9acc056bdeb6047f3fcade088004c486e94df Mon Sep 17 00:00:00 2001 From: lovetox Date: Sun, 17 Oct 2021 12:15:03 +0200 Subject: Refactor b64decode() --- nbxmpp/auth.py | 10 +++++----- nbxmpp/modules/bits_of_binary.py | 2 +- nbxmpp/modules/ibb.py | 2 +- nbxmpp/modules/omemo.py | 16 +++++++--------- nbxmpp/modules/openpgp.py | 6 +++--- nbxmpp/modules/user_avatar.py | 2 +- nbxmpp/modules/vcard_temp.py | 2 +- nbxmpp/util.py | 10 +++++----- 8 files changed, 24 insertions(+), 26 deletions(-) diff --git a/nbxmpp/auth.py b/nbxmpp/auth.py index 11ee64e..faa00e9 100644 --- a/nbxmpp/auth.py +++ b/nbxmpp/auth.py @@ -288,7 +288,7 @@ class GSSAPI: self._client.send_nonza(node) def response(self, server_message, *args, **kwargs): - server_message = b64decode(server_message, bytes) + server_message = b64decode(server_message) try: if not self.ctx.complete: output_token = self.ctx.step(server_message) @@ -350,14 +350,14 @@ class SCRAM: self._client.send_nonza(node) def response(self, server_first_message): - server_first_message = b64decode(server_first_message) + server_first_message = b64decode(server_first_message).decode() challenge = self._scram_parse(server_first_message) client_nonce = challenge['r'][:self.nonce_length] if client_nonce != self._client_nonce: raise AuthFail('Invalid client nonce received from server') - salt = b64decode(challenge['s'], bytes) + salt = b64decode(challenge['s']) iteration_count = int(challenge['i']) if iteration_count < 4096: @@ -397,9 +397,9 @@ class SCRAM: self._client.send_nonza(node) def success(self, server_last_message): - server_last_message = b64decode(server_last_message) + server_last_message = b64decode(server_last_message).decode() success = self._scram_parse(server_last_message) - server_signature = b64decode(success['v'], bytes) + server_signature = b64decode(success['v']) if server_signature != self._server_signature: raise AuthFail('Invalid server signature') diff --git a/nbxmpp/modules/bits_of_binary.py b/nbxmpp/modules/bits_of_binary.py index 250e9b2..dfe4935 100644 --- a/nbxmpp/modules/bits_of_binary.py +++ b/nbxmpp/modules/bits_of_binary.py @@ -60,7 +60,7 @@ def parse_bob_data(stanza: Node) -> Optional[BobData]: return None try: - bob_data = b64decode(bob_data, return_type=bytes) + bob_data = b64decode(bob_data) except Exception: log.warning('Unable to decode data') log.exception(stanza) diff --git a/nbxmpp/modules/ibb.py b/nbxmpp/modules/ibb.py index c68e524..40fb237 100644 --- a/nbxmpp/modules/ibb.py +++ b/nbxmpp/modules/ibb.py @@ -113,7 +113,7 @@ class IBB(BaseModule): raise NodeProcessed try: - decoded_data = b64decode(data.getData(), return_type=bytes) + decoded_data = b64decode(data.getData()) except Exception: self._log.exception('Failed to decode IBB data') self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST)) diff --git a/nbxmpp/modules/omemo.py b/nbxmpp/modules/omemo.py index 0ecec80..fd77af0 100644 --- a/nbxmpp/modules/omemo.py +++ b/nbxmpp/modules/omemo.py @@ -194,7 +194,7 @@ def _parse_omemo_message(stanza): iv_node = header.getTag('iv') try: - iv = b64decode(iv_node.getData(), bytes) + iv = b64decode(iv_node.getData()) except Exception as error: raise MalformedStanzaError('failed to decode iv: %s' % error, stanza) @@ -202,7 +202,7 @@ def _parse_omemo_message(stanza): payload_node = encrypted.getTag('payload') if payload_node is not None: try: - payload = b64decode(payload_node.getData(), bytes) + payload = b64decode(payload_node.getData()) except Exception as error: raise MalformedStanzaError('failed to decode payload: %s' % error, stanza) @@ -227,7 +227,7 @@ def _parse_omemo_message(stanza): raise MalformedStanzaError(error, stanza) try: - keys[int(rid)] = (b64decode(kn.getData(), bytes), prekey) + keys[int(rid)] = (b64decode(kn.getData()), prekey) except Exception as error: raise MalformedStanzaError('failed to decode key: %s' % error, stanza) @@ -273,8 +273,7 @@ def _parse_bundle(item): result = {} signed_prekey_node = bundle.getTag('signedPreKeyPublic') try: - result['spk'] = {'key': b64decode(signed_prekey_node.getData(), - bytes)} + result['spk'] = {'key': b64decode(signed_prekey_node.getData())} except Exception as error: error = 'Failed to decode signedPreKeyPublic: %s' % error raise MalformedStanzaError(error, item) @@ -287,15 +286,14 @@ def _parse_bundle(item): signed_signature_node = bundle.getTag('signedPreKeySignature') try: - result['spk_signature'] = b64decode(signed_signature_node.getData(), - bytes) + result['spk_signature'] = b64decode(signed_signature_node.getData()) except Exception as error: error = 'Failed to decode signedPreKeySignature: %s' % error raise MalformedStanzaError(error, item) identity_key_node = bundle.getTag('identityKey') try: - result['ik'] = b64decode(identity_key_node.getData(), bytes) + result['ik'] = b64decode(identity_key_node.getData()) except Exception as error: error = 'Failed to decode IdentityKey: %s' % error raise MalformedStanzaError(error, item) @@ -312,7 +310,7 @@ def _parse_bundle(item): raise MalformedStanzaError('Invalid prekey: %s' % error, item) try: - key = b64decode(prekey.getData(), bytes) + key = b64decode(prekey.getData()) except Exception as error: raise MalformedStanzaError( 'Failed to decode preKeyPublic: %s' % error, item) diff --git a/nbxmpp/modules/openpgp.py b/nbxmpp/modules/openpgp.py index 5ff8238..ce1abe8 100644 --- a/nbxmpp/modules/openpgp.py +++ b/nbxmpp/modules/openpgp.py @@ -74,7 +74,7 @@ class OpenPGP(BaseModule): self._log.info('Encrypted message received') try: - properties.openpgp = b64decode(data, return_type=bytes) + properties.openpgp = b64decode(data) except Exception: self._log.warning('b64decode failed') self._log.warning(stanza) @@ -390,7 +390,7 @@ def _parse_public_key(jid, item): raise ValueError('data node missing') try: - key = b64decode(data.getData(), return_type=bytes) + key = b64decode(data.getData()) except Exception as error: raise ValueError(f'decoding error: {error}') @@ -432,7 +432,7 @@ def _parse_secret_key(item): raise ValueError('secretkey data missing') try: - key = b64decode(data, return_type=bytes) + key = b64decode(data) except Exception as error: raise ValueError(f'decoding error: {error}') diff --git a/nbxmpp/modules/user_avatar.py b/nbxmpp/modules/user_avatar.py index 2911134..31f7815 100644 --- a/nbxmpp/modules/user_avatar.py +++ b/nbxmpp/modules/user_avatar.py @@ -220,7 +220,7 @@ def _get_avatar_data(item, id_): raise MalformedStanzaError('data node empty', item) try: - avatar = b64decode(data, return_type=bytes) + avatar = b64decode(data) except Exception as error: raise MalformedStanzaError(f'decoding error: {error}', item) diff --git a/nbxmpp/modules/vcard_temp.py b/nbxmpp/modules/vcard_temp.py index 7d3483b..26ef002 100644 --- a/nbxmpp/modules/vcard_temp.py +++ b/nbxmpp/modules/vcard_temp.py @@ -138,6 +138,6 @@ class VCard: if not avatar: return None, None - avatar = b64decode(avatar, return_type=bytes) + avatar = b64decode(avatar) avatar_sha = hashlib.sha1(avatar).hexdigest() return avatar, avatar_sha diff --git a/nbxmpp/util.py b/nbxmpp/util.py index 25beb90..4dcf1f2 100644 --- a/nbxmpp/util.py +++ b/nbxmpp/util.py @@ -16,6 +16,7 @@ # along with this program; If not, see . from typing import Literal +from typing import Union import base64 import hashlib @@ -50,15 +51,14 @@ from nbxmpp.third_party.hsluv import hsluv_to_rgb log = logging.getLogger('nbxmpp.util') -def b64decode(data, return_type=str): +def b64decode(data: Union[str, bytes]) -> bytes: if not data: raise ValueError('No data to decode') + if isinstance(data, str): data = data.encode() - result = base64.b64decode(data) - if return_type == bytes: - return result - return result.decode() + + return base64.b64decode(data) def b64encode(data, return_type=str): -- cgit v1.2.3