Welcome to mirror list, hosted at ThFree Co, Russian Federation.

AuthorizationHelpers.py « OAuth2 « cura - github.com/Ultimaker/Cura.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: a654ee4bdba3ed01c3535dc56ccff66879856880 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright (c) 2021 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.

from base64 import b64encode
from datetime import datetime
from hashlib import sha512
from PyQt6.QtNetwork import QNetworkReply
import secrets
from typing import Callable, Optional
import urllib.parse

from cura.OAuth2.Models import AuthenticationResponse, UserProfile, OAuth2Settings
from UM.i18n import i18nCatalog
from UM.Logger import Logger
from UM.TaskManagement.HttpRequestManager import HttpRequestManager  # To download log-in tokens.

catalog = i18nCatalog("cura")
TOKEN_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"


class AuthorizationHelpers:
    """Class containing several helpers to deal with the authorization flow."""

    def __init__(self, settings: "OAuth2Settings") -> None:
        self._settings = settings
        self._token_url = "{}/token".format(self._settings.OAUTH_SERVER_URL)

    @property
    def settings(self) -> "OAuth2Settings":
        """The OAuth2 settings object."""

        return self._settings

    def getAccessTokenUsingAuthorizationCode(self, authorization_code: str, verification_code: str, callback: Callable[[AuthenticationResponse], None]) -> None:
        """
        Request the access token from the authorization server.
        :param authorization_code: The authorization code from the 1st step.
        :param verification_code: The verification code needed for the PKCE extension.
        :param callback: Once the token has been obtained, this function will be called with the response.
        """
        data = {
            "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
            "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
            "grant_type": "authorization_code",
            "code": authorization_code,
            "code_verifier": verification_code,
            "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
            }
        headers = {"Content-type": "application/x-www-form-urlencoded"}
        HttpRequestManager.getInstance().post(
            self._token_url,
            data = urllib.parse.urlencode(data).encode("UTF-8"),
            headers_dict = headers,
            callback = lambda response: self.parseTokenResponse(response, callback),
            error_callback = lambda response, _: self.parseTokenResponse(response, callback)
        )

    def getAccessTokenUsingRefreshToken(self, refresh_token: str, callback: Callable[[AuthenticationResponse], None]) -> None:
        """
        Request the access token from the authorization server using a refresh token.
        :param refresh_token: A long-lived token used to refresh the authentication token.
        :param callback: Once the token has been obtained, this function will be called with the response.
        """
        Logger.log("d", "Refreshing the access token for [%s]", self._settings.OAUTH_SERVER_URL)
        data = {
            "client_id": self._settings.CLIENT_ID if self._settings.CLIENT_ID is not None else "",
            "redirect_uri": self._settings.CALLBACK_URL if self._settings.CALLBACK_URL is not None else "",
            "grant_type": "refresh_token",
            "refresh_token": refresh_token,
            "scope": self._settings.CLIENT_SCOPES if self._settings.CLIENT_SCOPES is not None else "",
        }
        headers = {"Content-type": "application/x-www-form-urlencoded"}
        HttpRequestManager.getInstance().post(
            self._token_url,
            data = urllib.parse.urlencode(data).encode("UTF-8"),
            headers_dict = headers,
            callback = lambda response: self.parseTokenResponse(response, callback),
            error_callback = lambda response, _: self.parseTokenResponse(response, callback)
        )

    def parseTokenResponse(self, token_response: QNetworkReply, callback: Callable[[AuthenticationResponse], None]) -> None:
        """Parse the token response from the authorization server into an AuthenticationResponse object.

        :param token_response: The JSON string data response from the authorization server.
        :return: An AuthenticationResponse object.
        """
        token_data = HttpRequestManager.readJSON(token_response)
        if not token_data:
            callback(AuthenticationResponse(success = False, err_message = catalog.i18nc("@message", "Could not read response.")))
            return

        if token_response.error() != QNetworkReply.NetworkError.NoError:
            callback(AuthenticationResponse(success = False, err_message = token_data["error_description"]))
            return

        callback(AuthenticationResponse(success = True,
                                        token_type = token_data["token_type"],
                                        access_token = token_data["access_token"],
                                        refresh_token = token_data["refresh_token"],
                                        expires_in = token_data["expires_in"],
                                        scope = token_data["scope"],
                                        received_at = datetime.now().strftime(TOKEN_TIMESTAMP_FORMAT)))
        return

    def checkToken(self, access_token: str, success_callback: Optional[Callable[[UserProfile], None]] = None, failed_callback: Optional[Callable[[], None]] = None) -> None:
        """Calls the authentication API endpoint to get the token data.

        The API is called asynchronously. When a response is given, the callback is called with the user's profile.
        :param access_token: The encoded JWT token.
        :param success_callback: When a response is given, this function will be called with a user profile. If None,
        there will not be a callback.
        :param failed_callback: When the request failed or the response didn't parse, this function will be called.
        """
        check_token_url = "{}/check-token".format(self._settings.OAUTH_SERVER_URL)
        Logger.log("d", "Checking the access token for [%s]", check_token_url)
        headers = {
            "Authorization": f"Bearer {access_token}"
        }
        HttpRequestManager.getInstance().get(
            check_token_url,
            headers_dict = headers,
            callback = lambda reply: self._parseUserProfile(reply, success_callback, failed_callback),
            error_callback = lambda _, _2: failed_callback() if failed_callback is not None else None
        )

    def _parseUserProfile(self, reply: QNetworkReply, success_callback: Optional[Callable[[UserProfile], None]], failed_callback: Optional[Callable[[], None]] = None) -> None:
        """
        Parses the user profile from a reply to /check-token.

        If the response is valid, the callback will be called to return the user profile to the caller.
        :param reply: A network reply to a request to the /check-token URL.
        :param success_callback: A function to call once a user profile was successfully obtained.
        :param failed_callback: A function to call if parsing the profile failed.
        """
        if reply.error() != QNetworkReply.NetworkError.NoError:
            Logger.warning(f"Could not access account information. QNetworkError {reply.errorString()}")
            if failed_callback is not None:
                failed_callback()
            return

        profile_data = HttpRequestManager.getInstance().readJSON(reply)
        if profile_data is None or "data" not in profile_data:
            Logger.warning("Could not parse user data from token.")
            if failed_callback is not None:
                failed_callback()
            return
        profile_data = profile_data["data"]

        required_fields = {"user_id", "username"}
        if "user_id" not in profile_data or "username" not in profile_data:
            Logger.warning(f"User data missing required field(s): {required_fields - set(profile_data.keys())}")
            if failed_callback is not None:
                failed_callback()
            return

        if success_callback is not None:
            success_callback(UserProfile(
                user_id = profile_data["user_id"],
                username = profile_data["username"],
                profile_image_url = profile_data.get("profile_image_url", ""),
                organization_id = profile_data.get("organization", {}).get("organization_id"),
                subscriptions = profile_data.get("subscriptions", [])
            ))

    @staticmethod
    def generateVerificationCode(code_length: int = 32) -> str:
        """Generate a verification code of arbitrary length.

        :param code_length:: How long should the code be in bytes? This should never be lower than 16, but it's probably
        better to leave it at 32
        """

        return secrets.token_hex(code_length)

    @staticmethod
    def generateVerificationCodeChallenge(verification_code: str) -> str:
        """Generates a base64 encoded sha512 encrypted version of a given string.

        :param verification_code:
        :return: The encrypted code in base64 format.
        """

        encoded = sha512(verification_code.encode()).digest()
        return b64encode(encoded, altchars = b"_-").decode()