Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dev.gajim.org/gajim/python-nbxmpp.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlovetox <philipp@hoerist.com>2021-10-17 13:15:03 +0300
committerlovetox <philipp@hoerist.com>2021-10-17 13:15:03 +0300
commitf5a9acc056bdeb6047f3fcade088004c486e94df (patch)
treef42bd1d00e75216df393bec8126ee263de6487f4
parentb48f0f6c8dd704fe7d47c023e3dca53b3730665a (diff)
Refactor b64decode()
-rw-r--r--nbxmpp/auth.py10
-rw-r--r--nbxmpp/modules/bits_of_binary.py2
-rw-r--r--nbxmpp/modules/ibb.py2
-rw-r--r--nbxmpp/modules/omemo.py16
-rw-r--r--nbxmpp/modules/openpgp.py6
-rw-r--r--nbxmpp/modules/user_avatar.py2
-rw-r--r--nbxmpp/modules/vcard_temp.py2
-rw-r--r--nbxmpp/util.py10
8 files changed, 24 insertions, 26 deletions
diff --git a/nbxmpp/auth.py b/nbxmpp/auth.py
index 11ee64e..faa00e9 100644
--- a/nbxmpp/auth.py
+++ b/nbxmpp/auth.py
@@ -288,7 +288,7 @@ class GSSAPI:
self._client.send_nonza(node)
def response(self, server_message, *args, **kwargs):
- server_message = b64decode(server_message, bytes)
+ server_message = b64decode(server_message)
try:
if not self.ctx.complete:
output_token = self.ctx.step(server_message)
@@ -350,14 +350,14 @@ class SCRAM:
self._client.send_nonza(node)
def response(self, server_first_message):
- server_first_message = b64decode(server_first_message)
+ server_first_message = b64decode(server_first_message).decode()
challenge = self._scram_parse(server_first_message)
client_nonce = challenge['r'][:self.nonce_length]
if client_nonce != self._client_nonce:
raise AuthFail('Invalid client nonce received from server')
- salt = b64decode(challenge['s'], bytes)
+ salt = b64decode(challenge['s'])
iteration_count = int(challenge['i'])
if iteration_count < 4096:
@@ -397,9 +397,9 @@ class SCRAM:
self._client.send_nonza(node)
def success(self, server_last_message):
- server_last_message = b64decode(server_last_message)
+ server_last_message = b64decode(server_last_message).decode()
success = self._scram_parse(server_last_message)
- server_signature = b64decode(success['v'], bytes)
+ server_signature = b64decode(success['v'])
if server_signature != self._server_signature:
raise AuthFail('Invalid server signature')
diff --git a/nbxmpp/modules/bits_of_binary.py b/nbxmpp/modules/bits_of_binary.py
index 250e9b2..dfe4935 100644
--- a/nbxmpp/modules/bits_of_binary.py
+++ b/nbxmpp/modules/bits_of_binary.py
@@ -60,7 +60,7 @@ def parse_bob_data(stanza: Node) -> Optional[BobData]:
return None
try:
- bob_data = b64decode(bob_data, return_type=bytes)
+ bob_data = b64decode(bob_data)
except Exception:
log.warning('Unable to decode data')
log.exception(stanza)
diff --git a/nbxmpp/modules/ibb.py b/nbxmpp/modules/ibb.py
index c68e524..40fb237 100644
--- a/nbxmpp/modules/ibb.py
+++ b/nbxmpp/modules/ibb.py
@@ -113,7 +113,7 @@ class IBB(BaseModule):
raise NodeProcessed
try:
- decoded_data = b64decode(data.getData(), return_type=bytes)
+ decoded_data = b64decode(data.getData())
except Exception:
self._log.exception('Failed to decode IBB data')
self._client.send_stanza(ErrorStanza(stanza, ERR_BAD_REQUEST))
diff --git a/nbxmpp/modules/omemo.py b/nbxmpp/modules/omemo.py
index 0ecec80..fd77af0 100644
--- a/nbxmpp/modules/omemo.py
+++ b/nbxmpp/modules/omemo.py
@@ -194,7 +194,7 @@ def _parse_omemo_message(stanza):
iv_node = header.getTag('iv')
try:
- iv = b64decode(iv_node.getData(), bytes)
+ iv = b64decode(iv_node.getData())
except Exception as error:
raise MalformedStanzaError('failed to decode iv: %s' % error, stanza)
@@ -202,7 +202,7 @@ def _parse_omemo_message(stanza):
payload_node = encrypted.getTag('payload')
if payload_node is not None:
try:
- payload = b64decode(payload_node.getData(), bytes)
+ payload = b64decode(payload_node.getData())
except Exception as error:
raise MalformedStanzaError('failed to decode payload: %s' % error,
stanza)
@@ -227,7 +227,7 @@ def _parse_omemo_message(stanza):
raise MalformedStanzaError(error, stanza)
try:
- keys[int(rid)] = (b64decode(kn.getData(), bytes), prekey)
+ keys[int(rid)] = (b64decode(kn.getData()), prekey)
except Exception as error:
raise MalformedStanzaError('failed to decode key: %s' % error,
stanza)
@@ -273,8 +273,7 @@ def _parse_bundle(item):
result = {}
signed_prekey_node = bundle.getTag('signedPreKeyPublic')
try:
- result['spk'] = {'key': b64decode(signed_prekey_node.getData(),
- bytes)}
+ result['spk'] = {'key': b64decode(signed_prekey_node.getData())}
except Exception as error:
error = 'Failed to decode signedPreKeyPublic: %s' % error
raise MalformedStanzaError(error, item)
@@ -287,15 +286,14 @@ def _parse_bundle(item):
signed_signature_node = bundle.getTag('signedPreKeySignature')
try:
- result['spk_signature'] = b64decode(signed_signature_node.getData(),
- bytes)
+ result['spk_signature'] = b64decode(signed_signature_node.getData())
except Exception as error:
error = 'Failed to decode signedPreKeySignature: %s' % error
raise MalformedStanzaError(error, item)
identity_key_node = bundle.getTag('identityKey')
try:
- result['ik'] = b64decode(identity_key_node.getData(), bytes)
+ result['ik'] = b64decode(identity_key_node.getData())
except Exception as error:
error = 'Failed to decode IdentityKey: %s' % error
raise MalformedStanzaError(error, item)
@@ -312,7 +310,7 @@ def _parse_bundle(item):
raise MalformedStanzaError('Invalid prekey: %s' % error, item)
try:
- key = b64decode(prekey.getData(), bytes)
+ key = b64decode(prekey.getData())
except Exception as error:
raise MalformedStanzaError(
'Failed to decode preKeyPublic: %s' % error, item)
diff --git a/nbxmpp/modules/openpgp.py b/nbxmpp/modules/openpgp.py
index 5ff8238..ce1abe8 100644
--- a/nbxmpp/modules/openpgp.py
+++ b/nbxmpp/modules/openpgp.py
@@ -74,7 +74,7 @@ class OpenPGP(BaseModule):
self._log.info('Encrypted message received')
try:
- properties.openpgp = b64decode(data, return_type=bytes)
+ properties.openpgp = b64decode(data)
except Exception:
self._log.warning('b64decode failed')
self._log.warning(stanza)
@@ -390,7 +390,7 @@ def _parse_public_key(jid, item):
raise ValueError('data node missing')
try:
- key = b64decode(data.getData(), return_type=bytes)
+ key = b64decode(data.getData())
except Exception as error:
raise ValueError(f'decoding error: {error}')
@@ -432,7 +432,7 @@ def _parse_secret_key(item):
raise ValueError('secretkey data missing')
try:
- key = b64decode(data, return_type=bytes)
+ key = b64decode(data)
except Exception as error:
raise ValueError(f'decoding error: {error}')
diff --git a/nbxmpp/modules/user_avatar.py b/nbxmpp/modules/user_avatar.py
index 2911134..31f7815 100644
--- a/nbxmpp/modules/user_avatar.py
+++ b/nbxmpp/modules/user_avatar.py
@@ -220,7 +220,7 @@ def _get_avatar_data(item, id_):
raise MalformedStanzaError('data node empty', item)
try:
- avatar = b64decode(data, return_type=bytes)
+ avatar = b64decode(data)
except Exception as error:
raise MalformedStanzaError(f'decoding error: {error}', item)
diff --git a/nbxmpp/modules/vcard_temp.py b/nbxmpp/modules/vcard_temp.py
index 7d3483b..26ef002 100644
--- a/nbxmpp/modules/vcard_temp.py
+++ b/nbxmpp/modules/vcard_temp.py
@@ -138,6 +138,6 @@ class VCard:
if not avatar:
return None, None
- avatar = b64decode(avatar, return_type=bytes)
+ avatar = b64decode(avatar)
avatar_sha = hashlib.sha1(avatar).hexdigest()
return avatar, avatar_sha
diff --git a/nbxmpp/util.py b/nbxmpp/util.py
index 25beb90..4dcf1f2 100644
--- a/nbxmpp/util.py
+++ b/nbxmpp/util.py
@@ -16,6 +16,7 @@
# along with this program; If not, see <http://www.gnu.org/licenses/>.
from typing import Literal
+from typing import Union
import base64
import hashlib
@@ -50,15 +51,14 @@ from nbxmpp.third_party.hsluv import hsluv_to_rgb
log = logging.getLogger('nbxmpp.util')
-def b64decode(data, return_type=str):
+def b64decode(data: Union[str, bytes]) -> bytes:
if not data:
raise ValueError('No data to decode')
+
if isinstance(data, str):
data = data.encode()
- result = base64.b64decode(data)
- if return_type == bytes:
- return result
- return result.decode()
+
+ return base64.b64decode(data)
def b64encode(data, return_type=str):