Welcome to mirror list, hosted at ThFree Co, Russian Federation.

DriveApiService.py « src « CuraDrive « plugins - github.com/Ultimaker/Cura.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: d8349ccc291aedd8a34d74652cf21aac894ec4e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright (c) 2018 Ultimaker B.V.
# Cura is released under the terms of the LGPLv3 or higher.

import base64
import hashlib
from datetime import datetime
from tempfile import NamedTemporaryFile
from typing import Any, Optional, List, Dict

import requests

from UM.Logger import Logger
from UM.Message import Message
from UM.Signal import Signal, signalemitter
from cura.CuraApplication import CuraApplication

from .UploadBackupJob import UploadBackupJob
from .Settings import Settings

from UM.i18n import i18nCatalog
catalog = i18nCatalog("cura")


## The DriveApiService is responsible for interacting with the CuraDrive API and Cura's backup handling.
@signalemitter
class DriveApiService:
    BACKUP_URL = "{}/backups".format(Settings.DRIVE_API_URL)

    # Emit signal when restoring backup started or finished.
    restoringStateChanged = Signal()

    # Emit signal when creating backup started or finished.
    creatingStateChanged = Signal()

    def __init__(self) -> None:
        self._cura_api = CuraApplication.getInstance().getCuraAPI()

    def getBackups(self) -> List[Dict[str, Any]]:
        access_token = self._cura_api.account.accessToken
        if not access_token:
            Logger.log("w", "Could not get access token.")
            return []
        try:
            backup_list_request = requests.get(self.BACKUP_URL, headers = {
                "Authorization": "Bearer {}".format(access_token)
            })
        except requests.exceptions.ConnectionError:
            Logger.logException("w", "Unable to connect with the server.")
            return []

        # HTTP status 300s mean redirection. 400s and 500s are errors.
        # Technically 300s are not errors, but the use case here relies on "requests" to handle redirects automatically.
        if backup_list_request.status_code >= 300:
            Logger.log("w", "Could not get backups list from remote: %s", backup_list_request.text)
            Message(catalog.i18nc("@info:backup_status", "There was an error listing your backups."), title = catalog.i18nc("@info:title", "Backup")).show()
            return []

        backup_list_response = backup_list_request.json()
        if "data" not in backup_list_response:
            Logger.log("w", "Could not get backups from remote, actual response body was: %s", str(backup_list_response))
            return []

        return backup_list_response["data"]

    def createBackup(self) -> None:
        self.creatingStateChanged.emit(is_creating = True)

        # Create the backup.
        backup_zip_file, backup_meta_data = self._cura_api.backups.createBackup()
        if not backup_zip_file or not backup_meta_data:
            self.creatingStateChanged.emit(is_creating = False, error_message ="Could not create backup.")
            return

        # Create an upload entry for the backup.
        timestamp = datetime.now().isoformat()
        backup_meta_data["description"] = "{}.backup.{}.cura.zip".format(timestamp, backup_meta_data["cura_release"])
        backup_upload_url = self._requestBackupUpload(backup_meta_data, len(backup_zip_file))
        if not backup_upload_url:
            self.creatingStateChanged.emit(is_creating = False, error_message ="Could not upload backup.")
            return

        # Upload the backup to storage.
        upload_backup_job = UploadBackupJob(backup_upload_url, backup_zip_file)
        upload_backup_job.finished.connect(self._onUploadFinished)
        upload_backup_job.start()

    def _onUploadFinished(self, job: "UploadBackupJob") -> None:
        if job.backup_upload_error_message != "":
            # If the job contains an error message we pass it along so the UI can display it.
            self.creatingStateChanged.emit(is_creating = False, error_message = job.backup_upload_error_message)
        else:
            self.creatingStateChanged.emit(is_creating = False)

    def restoreBackup(self, backup: Dict[str, Any]) -> None:
        self.restoringStateChanged.emit(is_restoring = True)
        download_url = backup.get("download_url")
        if not download_url:
            # If there is no download URL, we can't restore the backup.
            return self._emitRestoreError()

        try:
            download_package = requests.get(download_url, stream = True)
        except requests.exceptions.ConnectionError:
            Logger.logException("e", "Unable to connect with the server")
            return self._emitRestoreError()

        if download_package.status_code >= 300:
            # Something went wrong when attempting to download the backup.
            Logger.log("w", "Could not download backup from url %s: %s", download_url, download_package.text)
            return self._emitRestoreError()

        # We store the file in a temporary path fist to ensure integrity.
        temporary_backup_file = NamedTemporaryFile(delete = False)
        with open(temporary_backup_file.name, "wb") as write_backup:
            for chunk in download_package:
                write_backup.write(chunk)

        if not self._verifyMd5Hash(temporary_backup_file.name, backup.get("md5_hash", "")):
            # Don't restore the backup if the MD5 hashes do not match.
            # This can happen if the download was interrupted.
            Logger.log("w", "Remote and local MD5 hashes do not match, not restoring backup.")
            return self._emitRestoreError()

        # Tell Cura to place the backup back in the user data folder.
        with open(temporary_backup_file.name, "rb") as read_backup:
            self._cura_api.backups.restoreBackup(read_backup.read(), backup.get("metadata", {}))
            self.restoringStateChanged.emit(is_restoring = False)

    def _emitRestoreError(self) -> None:
        self.restoringStateChanged.emit(is_restoring = False,
                                        error_message = catalog.i18nc("@info:backup_status",
                                                                         "There was an error trying to restore your backup."))

    #   Verify the MD5 hash of a file.
    #   \param file_path Full path to the file.
    #   \param known_hash The known MD5 hash of the file.
    #   \return: Success or not.
    @staticmethod
    def _verifyMd5Hash(file_path: str, known_hash: str) -> bool:
        with open(file_path, "rb") as read_backup:
            local_md5_hash = base64.b64encode(hashlib.md5(read_backup.read()).digest(), altchars = b"_-").decode("utf-8")
            return known_hash == local_md5_hash

    def deleteBackup(self, backup_id: str) -> bool:
        access_token = self._cura_api.account.accessToken
        if not access_token:
            Logger.log("w", "Could not get access token.")
            return False

        try:
            delete_backup = requests.delete("{}/{}".format(self.BACKUP_URL, backup_id), headers = {
                "Authorization": "Bearer {}".format(access_token)
            })
        except requests.exceptions.ConnectionError:
            Logger.logException("e", "Unable to connect with the server")
            return False

        if delete_backup.status_code >= 300:
            Logger.log("w", "Could not delete backup: %s", delete_backup.text)
            return False
        return True

    #   Request a backup upload slot from the API.
    #   \param backup_metadata: A dict containing some meta data about the backup.
    #   \param backup_size The size of the backup file in bytes.
    #   \return: The upload URL for the actual backup file if successful, otherwise None.
    def _requestBackupUpload(self, backup_metadata: Dict[str, Any], backup_size: int) -> Optional[str]:
        access_token = self._cura_api.account.accessToken
        if not access_token:
            Logger.log("w", "Could not get access token.")
            return None
        try:
            backup_upload_request = requests.put(
                self.BACKUP_URL,
                json = {"data": {"backup_size": backup_size,
                                 "metadata": backup_metadata
                                 }
                        },
                headers = {
                    "Authorization": "Bearer {}".format(access_token)
                })
        except requests.exceptions.ConnectionError:
            Logger.logException("e", "Unable to connect with the server")
            return None

        # Any status code of 300 or above indicates an error.
        if backup_upload_request.status_code >= 300:
            Logger.log("w", "Could not request backup upload: %s", backup_upload_request.text)
            return None
        
        return backup_upload_request.json()["data"]["upload_url"]