diff options
Diffstat (limited to 'magic_uv/utils/addon_updater.py')
-rw-r--r-- | magic_uv/utils/addon_updater.py | 372 |
1 files changed, 0 insertions, 372 deletions
diff --git a/magic_uv/utils/addon_updater.py b/magic_uv/utils/addon_updater.py deleted file mode 100644 index 8c1601b8..00000000 --- a/magic_uv/utils/addon_updater.py +++ /dev/null @@ -1,372 +0,0 @@ -# <pep8-80 compliant> - -# ##### BEGIN GPL LICENSE BLOCK ##### -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# -# ##### END GPL LICENSE BLOCK ##### - -__author__ = "Nutti <nutti.metro@gmail.com>" -__status__ = "production" -__version__ = "6.5" -__date__ = "6 Mar 2021" - -from threading import Lock -import urllib -import urllib.request -import ssl -import json -import os -import zipfile -import shutil -import datetime - - -def get_separator(): - if os.name == "nt": - return "\\" - return "/" - - -def _request(url, json_decode=True): - # pylint: disable=W0212 - ssl._create_default_https_context = ssl._create_unverified_context - req = urllib.request.Request(url) - - try: - result = urllib.request.urlopen(req) - except urllib.error.HTTPError as e: - raise RuntimeError("HTTP error ({})".format(str(e.code))) - except urllib.error.URLError as e: - raise RuntimeError("URL error ({})".format(str(e.reason))) - - data = result.read() - result.close() - - if json_decode: - try: - return json.JSONDecoder().decode(data.decode()) - except Exception as e: - raise RuntimeError("API response has invalid JSON format ({})" - .format(str(e))) - - return data.decode() - - -def _download(url, path): - try: - urllib.request.urlretrieve(url, path) - except urllib.error.HTTPError as e: - raise RuntimeError("HTTP error ({})".format(str(e.code))) - except urllib.error.URLError as e: - raise RuntimeError("URL error ({})".format(str(e.reason))) - - -def _make_workspace_path(addon_dir): - return addon_dir + get_separator() + "addon_updater_workspace" - - -def _make_workspace(addon_dir): - dir_path = _make_workspace_path(addon_dir) - os.mkdir(dir_path) - - -def _make_temp_addon_path(addon_dir, url): - filename = url.split("/")[-1] - filepath = _make_workspace_path(addon_dir) + get_separator() + filename - return filepath - - -def _download_addon(addon_dir, url): - filepath = _make_temp_addon_path(addon_dir, url) - _download(url, filepath) - - -def _replace_addon(addon_dir, info, current_addon_path, offset_path=""): - # remove current add-on - if os.path.isfile(current_addon_path): - os.remove(current_addon_path) - elif os.path.isdir(current_addon_path): - shutil.rmtree(current_addon_path) - - # replace to the new add-on - workspace_path = _make_workspace_path(addon_dir) - tmp_addon_path = _make_temp_addon_path(addon_dir, info.url) - _, ext = os.path.splitext(tmp_addon_path) - if ext == ".zip": - with zipfile.ZipFile(tmp_addon_path) as zf: - zf.extractall(workspace_path) - if offset_path != "": - src = workspace_path + get_separator() + offset_path - dst = addon_dir - shutil.move(src, dst) - elif ext == ".py": - shutil.move(tmp_addon_path, addon_dir) - else: - raise RuntimeError("Unsupported file extension. (ext: {})".format(ext)) - - -def _get_all_releases_data(owner, repository): - url = "https://api.github.com/repos/{}/{}/releases"\ - .format(owner, repository) - data = _request(url) - - return data - - -def _get_all_branches_data(owner, repository): - url = "https://api.github.com/repos/{}/{}/branches"\ - .format(owner, repository) - data = _request(url) - - return data - - -def _parse_release_version(version): - return [int(c) for c in version[1:].split(".")] - - -# ver1 > ver2 : > 0 -# ver1 == ver2 : == 0 -# ver1 < ver2 : < 0 -def _compare_version(ver1, ver2): - if len(ver1) < len(ver2): - ver1.extend([-1 for _ in range(len(ver2) - len(ver1))]) - elif len(ver1) > len(ver2): - ver2.extend([-1 for _ in range(len(ver1) - len(ver2))]) - - def comp(v1, v2, idx): - if len(v1) == idx: - return 0 # v1 == v2 - - if v1[idx] > v2[idx]: - return 1 # v1 > v2 - if v1[idx] < v2[idx]: - return -1 # v1 < v2 - - return comp(v1, v2, idx + 1) - - return comp(ver1, ver2, 0) - - -class AddonUpdaterConfig: - def __init__(self): - # Name of owner - self.owner = "" - - # Name of repository - self.repository = "" - - # Additional branch for update candidate - self.branches = [] - - # Set minimum release version for update candidate. - # e.g. (5, 2) if your release tag name is "v5.2" - # If you specify (-1, -1), ignore versions less than current add-on - # version specified in bl_info. - self.min_release_version = (-1, -1) - - # Target add-on path - # {"branch/tag": "add-on path"} - self.target_addon_path = {} - - # Default target add-on path. - # Search this path if branch/tag is not found in - # self.target_addon_path. - self.default_target_addon_path = "" - - # Current add-on path - self.current_addon_path = "" - - # Blender add-on directory - self.addon_directory = "" - - -class UpdateCandidateInfo: - def __init__(self): - self.name = "" - self.url = "" - self.group = "" # BRANCH|RELEASE - - -class AddonUpdaterManager: - __inst = None - __lock = Lock() - - __initialized = False - __bl_info = None - __config = None - __update_candidate = [] - __candidate_checked = False - __error = "" - __info = "" - - def __init__(self): - raise NotImplementedError("Not allowed to call constructor") - - @classmethod - def __internal_new(cls): - return super().__new__(cls) - - @classmethod - def get_instance(cls): - if not cls.__inst: - with cls.__lock: - if not cls.__inst: - cls.__inst = cls.__internal_new() - - return cls.__inst - - def init(self, bl_info, config): - self.__bl_info = bl_info - self.__config = config - self.__update_candidate = [] - self.__candidate_checked = False - self.__error = "" - self.__info = "" - self.__initialized = True - - def initialized(self): - return self.__initialized - - def candidate_checked(self): - return self.__candidate_checked - - def check_update_candidate(self): - if not self.initialized(): - raise RuntimeError("AddonUpdaterManager must be initialized") - - self.__update_candidate = [] - self.__candidate_checked = False - - try: - # setup branch information - branches = _get_all_branches_data(self.__config.owner, - self.__config.repository) - for b in branches: - if b["name"] in self.__config.branches: - info = UpdateCandidateInfo() - info.name = b["name"] - info.url = "https://github.com/{}/{}/archive/{}.zip"\ - .format(self.__config.owner, - self.__config.repository, b["name"]) - info.group = 'BRANCH' - self.__update_candidate.append(info) - - # setup release information - releases = _get_all_releases_data(self.__config.owner, - self.__config.repository) - for r in releases: - if _compare_version(_parse_release_version(r["tag_name"]), - self.__config.min_release_version) > 0: - info = UpdateCandidateInfo() - info.name = r["tag_name"] - info.url = r["assets"][0]["browser_download_url"] - info.group = 'RELEASE' - self.__update_candidate.append(info) - except RuntimeError as e: - self.__error = "Failed to check update {}. ({})"\ - .format(str(e), datetime.datetime.now()) - - self.__info = "Checked update. ({})"\ - .format(datetime.datetime.now()) - - self.__candidate_checked = True - - def has_error(self): - return self.__error != "" - - def error(self): - return self.__error - - def has_info(self): - return self.__info != "" - - def info(self): - return self.__info - - def update(self, version_name): - if not self.initialized(): - raise RuntimeError("AddonUpdaterManager must be initialized.") - - if not self.candidate_checked(): - raise RuntimeError("Update candidate is not checked.") - - info = None - for info in self.__update_candidate: - if info.name == version_name: - break - else: - raise RuntimeError("{} is not found in update candidate" - .format(version_name)) - - if info is None: - raise RuntimeError("Not found any update candidates") - - try: - # create workspace - _make_workspace(self.__config.addon_directory) - # download add-on - _download_addon(self.__config.addon_directory, info.url) - - # get add-on path - if info.name in self.__config.target_addon_path: - addon_path = self.__config.target_addon_path[info.name] - else: - addon_path = self.__config.default_target_addon_path - - # replace add-on - offset_path = "" - if info.group == 'BRANCH': - offset_path = "{}-{}{}{}".format( - self.__config.repository, info.name, get_separator(), - addon_path) - elif info.group == 'RELEASE': - offset_path = addon_path - _replace_addon(self.__config.addon_directory, - info, self.__config.current_addon_path, - offset_path) - - self.__info = "Updated to {}. ({})" \ - .format(info.name, datetime.datetime.now()) - except RuntimeError as e: - self.__error = "Failed to update {}. ({})"\ - .format(str(e), datetime.datetime.now()) - - shutil.rmtree(_make_workspace_path(self.__config.addon_directory)) - - def get_candidate_branch_names(self): - if not self.initialized(): - raise RuntimeError("AddonUpdaterManager must be initialized.") - - if not self.candidate_checked(): - raise RuntimeError("Update candidate is not checked.") - - return [info.name for info in self.__update_candidate] - - def latest_version(self): - release_versions = [info.name - for info in self.__update_candidate - if info.group == 'RELEASE'] - - latest = "" - for version in release_versions: - if latest == "": - latest = version - elif _compare_version(_parse_release_version(version), - _parse_release_version(latest)) > 0: - latest = version - - return latest |