diff options
Diffstat (limited to 'uv_magic_uv/addon_updater.py')
-rw-r--r-- | uv_magic_uv/addon_updater.py | 1501 |
1 files changed, 1501 insertions, 0 deletions
diff --git a/uv_magic_uv/addon_updater.py b/uv_magic_uv/addon_updater.py new file mode 100644 index 00000000..70b6a287 --- /dev/null +++ b/uv_magic_uv/addon_updater.py @@ -0,0 +1,1501 @@ +# ##### BEGIN GPL LICENSE BLOCK ##### +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# +# ##### END GPL LICENSE BLOCK ##### + + +""" +See documentation for usage +https://github.com/CGCookie/blender-addon-updater + +""" + +import ssl +import urllib.request +import urllib +import os +import json +import zipfile +import shutil +import asyncio +import threading +import time +import fnmatch +from datetime import datetime, timedelta + +# blender imports, used in limited cases +import bpy +import addon_utils + +# ----------------------------------------------------------------------------- +# Define error messages/notices & hard coded globals +# ----------------------------------------------------------------------------- + +# currently not used +DEFAULT_TIMEOUT = 10 +DEFAULT_PER_PAGE = 30 + + +# ----------------------------------------------------------------------------- +# The main class +# ----------------------------------------------------------------------------- + +class Singleton_updater(object): + """ + This is the singleton class to reference a copy from, + it is the shared module level class + """ + def __init__(self): + + self._engine = GithubEngine() + self._user = None + self._repo = None + self._website = None + self._current_version = None + self._subfolder_path = None + self._tags = [] + self._tag_latest = None + self._tag_names = [] + self._latest_release = None + self._use_releases = False + self._include_branches = False + self._include_branch_list = ['master'] + self._include_branch_autocheck = False + self._manual_only = False + self._version_min_update = None + self._version_max_update = None + + # by default, backup current addon if new is being loaded + self._backup_current = True + self._backup_ignore_patterns = None + + # set patterns for what files to overwrite on update + self._overwrite_patterns = ["*.py","*.pyc"] + self._remove_pre_update_patterns = [] + + # by default, don't auto enable/disable the addon on update + # as it is slightly less stable/won't always fully reload module + self._auto_reload_post_update = False + + # settings relating to frequency and whether to enable auto background check + self._check_interval_enable = False + self._check_interval_months = 0 + self._check_interval_days = 7 + self._check_interval_hours = 0 + self._check_interval_minutes = 0 + + # runtime variables, initial conditions + self._verbose = False + self._fake_install = False + self._async_checking = False # only true when async daemon started + self._update_ready = None + self._update_link = None + self._update_version = None + self._source_zip = None + self._check_thread = None + self.skip_tag = None + self.select_link = None + + # get from module data + self._addon = __package__.lower() + self._addon_package = __package__ # must not change + self._updater_path = os.path.join(os.path.dirname(__file__), + self._addon+"_updater") + self._addon_root = os.path.dirname(__file__) + self._json = {} + self._error = None + self._error_msg = None + self._prefiltered_tag_count = 0 + + # UI code only, ie not used within this module but still useful + # properties to have + + # to verify a valid import, in place of placeholder import + self.showpopups = True # used in UI to show or not show update popups + self.invalidupdater = False + + + # ------------------------------------------------------------------------- + # Getters and setters + # ------------------------------------------------------------------------- + + @property + def engine(self): + return self._engine.name + @engine.setter + def engine(self, value): + if value.lower()=="github": + self._engine = GithubEngine() + elif value.lower()=="gitlab": + self._engine = GitlabEngine() + elif value.lower()=="bitbucket": + self._engine = BitbucketEngine() + else: + raise ValueError("Invalid engine selection") + + @property + def private_token(self): + return self._engine.token + @private_token.setter + def private_token(self, value): + if value==None: + self._engine.token = None + else: + self._engine.token = str(value) + + @property + def addon(self): + return self._addon + @addon.setter + def addon(self, value): + self._addon = str(value) + + @property + def verbose(self): + return self._verbose + @verbose.setter + def verbose(self, value): + try: + self._verbose = bool(value) + if self._verbose == True: + print(self._addon+" updater verbose is enabled") + except: + raise ValueError("Verbose must be a boolean value") + + @property + def include_branches(self): + return self._include_branches + @include_branches.setter + def include_branches(self, value): + try: + self._include_branches = bool(value) + except: + raise ValueError("include_branches must be a boolean value") + + @property + def use_releases(self): + return self._use_releases + @use_releases.setter + def use_releases(self, value): + try: + self._use_releases = bool(value) + except: + raise ValueError("use_releases must be a boolean value") + + @property + def include_branch_list(self): + return self._include_branch_list + @include_branch_list.setter + def include_branch_list(self, value): + try: + if value == None: + self._include_branch_list = ['master'] + elif type(value) != type(['master']) or value==[]: + raise ValueError("include_branch_list should be a list of valid branches") + else: + self._include_branch_list = value + except: + raise ValueError("include_branch_list should be a list of valid branches") + + @property + def overwrite_patterns(self): + return self._overwrite_patterns + @overwrite_patterns.setter + def overwrite_patterns(self, value): + if value == None: + self._overwrite_patterns = ["*.py","*.pyc"] + elif type(value) != type(['']): + raise ValueError("overwrite_patterns needs to be in a list format") + else: + self._overwrite_patterns = value + + @property + def remove_pre_update_patterns(self): + return self._remove_pre_update_patterns + @remove_pre_update_patterns.setter + def remove_pre_update_patterns(self, value): + if value == None: + self._remove_pre_update_patterns = [] + elif type(value) != type(['']): + raise ValueError("remove_pre_update_patterns needs to be in a list format") + else: + self._remove_pre_update_patterns = value + + # not currently used + @property + def include_branch_autocheck(self): + return self._include_branch_autocheck + @include_branch_autocheck.setter + def include_branch_autocheck(self, value): + try: + self._include_branch_autocheck = bool(value) + except: + raise ValueError("include_branch_autocheck must be a boolean value") + + @property + def manual_only(self): + return self._manual_only + @manual_only.setter + def manual_only(self, value): + try: + self._manual_only = bool(value) + except: + raise ValueError("manual_only must be a boolean value") + + @property + def auto_reload_post_update(self): + return self._auto_reload_post_update + @auto_reload_post_update.setter + def auto_reload_post_update(self, value): + try: + self._auto_reload_post_update = bool(value) + except: + raise ValueError("Must be a boolean value") + + @property + def fake_install(self): + return self._fake_install + @fake_install.setter + def fake_install(self, value): + if type(value) != type(False): + raise ValueError("fake_install must be a boolean value") + self._fake_install = bool(value) + + @property + def user(self): + return self._user + @user.setter + def user(self, value): + try: + self._user = str(value) + except: + raise ValueError("User must be a string value") + + @property + def json(self): + if self._json == {}: + self.set_updater_json() + return self._json + + @property + def repo(self): + return self._repo + @repo.setter + def repo(self, value): + try: + self._repo = str(value) + except: + raise ValueError("User must be a string") + + @property + def website(self): + return self._website + @website.setter + def website(self, value): + if self.check_is_url(value) == False: + raise ValueError("Not a valid URL: " + value) + self._website = value + + @property + def async_checking(self): + return self._async_checking + + @property + def api_url(self): + return self._engine.api_url + @api_url.setter + def api_url(self, value): + if self.check_is_url(value) == False: + raise ValueError("Not a valid URL: " + value) + self._engine.api_url = value + + @property + def stage_path(self): + return self._updater_path + @stage_path.setter + def stage_path(self, value): + if value == None: + if self._verbose: print("Aborting assigning stage_path, it's null") + return + elif value != None and not os.path.exists(value): + try: + os.makedirs(value) + except: + if self._verbose: print("Error trying to staging path") + return + self._updater_path = value + + @property + def tags(self): + if self._tags == []: + return [] + tag_names = [] + for tag in self._tags: + tag_names.append(tag["name"]) + return tag_names + + @property + def tag_latest(self): + if self._tag_latest == None: + return None + return self._tag_latest["name"] + + @property + def latest_release(self): + if self._releases_latest == None: + return None + return self._latest_release + + @property + def current_version(self): + return self._current_version + + @property + def subfolder_path(self): + return self._subfolder_path + + @subfolder_path.setter + def subfolder_path(self, value): + self._subfolder_path = value + + @property + def update_ready(self): + return self._update_ready + + @property + def update_version(self): + return self._update_version + + @property + def update_link(self): + return self._update_link + + @current_version.setter + def current_version(self, tuple_values): + if tuple_values==None: + self._current_version = None + return + elif type(tuple_values) is not tuple: + try: + tuple(tuple_values) + except: + raise ValueError( + "Not a tuple! current_version must be a tuple of integers") + for i in tuple_values: + if type(i) is not int: + raise ValueError( + "Not an integer! current_version must be a tuple of integers") + self._current_version = tuple(tuple_values) + + def set_check_interval(self,enable=False,months=0,days=14,hours=0,minutes=0): + # enabled = False, default initially will not check against frequency + # if enabled, default is then 2 weeks + + if type(enable) is not bool: + raise ValueError("Enable must be a boolean value") + if type(months) is not int: + raise ValueError("Months must be an integer value") + if type(days) is not int: + raise ValueError("Days must be an integer value") + if type(hours) is not int: + raise ValueError("Hours must be an integer value") + if type(minutes) is not int: + raise ValueError("Minutes must be an integer value") + + if enable==False: + self._check_interval_enable = False + else: + self._check_interval_enable = True + + self._check_interval_months = months + self._check_interval_days = days + self._check_interval_hours = hours + self._check_interval_minutes = minutes + + @property + def check_interval(self): + return (self._check_interval_enable, + self._check_interval_months, + self._check_interval_days, + self._check_interval_hours, + self._check_interval_minutes) + + @property + def error(self): + return self._error + + @property + def error_msg(self): + return self._error_msg + + @property + def version_min_update(self): + return self._version_min_update + @version_min_update.setter + def version_min_update(self, value): + if value == None: + self._version_min_update = None + return + if type(value) != type((1,2,3)): + raise ValueError("Version minimum must be a tuple") + else: + # potentially check entries are integers + self._version_min_update = value + + @property + def version_max_update(self): + return self._version_max_update + @version_max_update.setter + def version_max_update(self, value): + if value == None: + self._version_max_update = None + return + if type(value) != type((1,2,3)): + raise ValueError("Version maximum must be a tuple") + else: + # potentially check entries are integers + self._version_max_update = value + + @property + def backup_current(self): + return self._backup_current + @backup_current.setter + def backup_current(self, value): + if value == None: + self._backup_current = False + return + else: + self._backup_current = value + + @property + def backup_ignore_patterns(self): + return self._backup_ignore_patterns + @backup_ignore_patterns.setter + def backup_ignore_patterns(self, value): + if value == None: + self._backup_ignore_patterns = None + return + elif type(value) != type(['list']): + raise ValueError("Backup pattern must be in list format") + else: + self._backup_ignore_patterns = value + + # ------------------------------------------------------------------------- + # Parameter validation related functions + # ------------------------------------------------------------------------- + + + def check_is_url(self, url): + if not ("http://" in url or "https://" in url): + return False + if "." not in url: + return False + return True + + def get_tag_names(self): + tag_names = [] + self.get_tags(self) + for tag in self._tags: + tag_names.append(tag["name"]) + return tag_names + + + # declare how the class gets printed + + def __repr__(self): + return "<Module updater from {a}>".format(a=__file__) + + def __str__(self): + return "Updater, with user: {a}, repository: {b}, url: {c}".format( + a=self._user, + b=self._repo, c=self.form_repo_url()) + + + # ------------------------------------------------------------------------- + # API-related functions + # ------------------------------------------------------------------------- + + def form_repo_url(self): + return self._engine.form_repo_url(self) + + def form_tags_url(self): + return self._engine.form_tags_url(self) + + def form_branch_url(self, branch): + return self._engine.form_branch_url(branch, self) + + def get_tags(self): + request = self.form_tags_url() + if self._verbose: print("Getting tags from server") + + # get all tags, internet call + all_tags = self._engine.parse_tags(self.get_api(request), self) + if all_tags is not None: + self._prefiltered_tag_count = len(all_tags) + else: + self._prefiltered_tag_count = 0 + all_tags = [] + + # pre-process to skip tags + if self.skip_tag != None: + self._tags = [tg for tg in all_tags if self.skip_tag(self, tg)==False] + else: + self._tags = all_tags + + # get additional branches too, if needed, and place in front + # Does NO checking here whether branch is valid + if self._include_branches == True: + temp_branches = self._include_branch_list.copy() + temp_branches.reverse() + for branch in temp_branches: + request = self.form_branch_url(branch) + include = { + "name":branch.title(), + "zipball_url":request + } + self._tags = [include] + self._tags # append to front + + if self._tags == None: + # some error occurred + self._tag_latest = None + self._tags = [] + return + elif self._prefiltered_tag_count == 0 and self._include_branches == False: + self._tag_latest = None + if self._error == None: # if not None, could have had no internet + self._error = "No releases found" + self._error_msg = "No releases or tags found on this repository" + if self._verbose: print("No releases or tags found on this repository") + elif self._prefiltered_tag_count == 0 and self._include_branches == True: + if not self._error: self._tag_latest = self._tags[0] + if self._verbose: + branch = self._include_branch_list[0] + print("{} branch found, no releases".format(branch), self._tags[0]) + elif (len(self._tags)-len(self._include_branch_list)==0 and self._include_branches==True) \ + or (len(self._tags)==0 and self._include_branches==False) \ + and self._prefiltered_tag_count > 0: + self._tag_latest = None + self._error = "No releases available" + self._error_msg = "No versions found within compatible version range" + if self._verbose: print("No versions found within compatible version range") + else: + if self._include_branches == False: + self._tag_latest = self._tags[0] + if self._verbose: print("Most recent tag found:",self._tags[0]['name']) + else: + # don't return branch if in list + n = len(self._include_branch_list) + self._tag_latest = self._tags[n] # guaranteed at least len()=n+1 + if self._verbose: print("Most recent tag found:",self._tags[n]['name']) + + + # all API calls to base url + def get_raw(self, url): + # print("Raw request:", url) + request = urllib.request.Request(url) + context = ssl._create_unverified_context() + + # setup private request headers if appropriate + if self._engine.token != None: + if self._engine.name == "gitlab": + request.add_header('PRIVATE-TOKEN',self._engine.token) + else: + if self._verbose: print("Tokens not setup for engine yet") + + # run the request + try: + result = urllib.request.urlopen(request,context=context) + except urllib.error.HTTPError as e: + self._error = "HTTP error" + self._error_msg = str(e.code) + self._update_ready = None + except urllib.error.URLError as e: + reason = str(e.reason) + if "TLSV1_ALERT" in reason or "SSL" in reason: + self._error = "Connection rejected, download manually" + self._error_msg = reason + else: + self._error = "URL error, check internet connection" + self._error_msg = reason + self._update_ready = None + return None + else: + result_string = result.read() + result.close() + return result_string.decode() + + + # result of all api calls, decoded into json format + def get_api(self, url): + # return the json version + get = None + get = self.get_raw(url) + if get != None: + try: + return json.JSONDecoder().decode(get) + except Exception as e: + self._error = "API response has invalid JSON format" + self._error_msg = str(e.reason) + self._update_ready = None + return None + else: + return None + + + # create a working directory and download the new files + def stage_repository(self, url): + + local = os.path.join(self._updater_path,"update_staging") + error = None + + # make/clear the staging folder + # ensure the folder is always "clean" + if self._verbose: print("Preparing staging folder for download:\n",local) + if os.path.isdir(local) == True: + try: + shutil.rmtree(local) + os.makedirs(local) + except: + error = "failed to remove existing staging directory" + else: + try: + os.makedirs(local) + except: + error = "failed to create staging directory" + + if error != None: + if self._verbose: print("Error: Aborting update, "+error) + self._error = "Update aborted, staging path error" + self._error_msg = "Error: {}".format(error) + return False + + if self._backup_current==True: + self.create_backup() + if self._verbose: print("Now retrieving the new source zip") + + self._source_zip = os.path.join(local,"source.zip") + + if self._verbose: print("Starting download update zip") + try: + request = urllib.request.Request(url) + context = ssl._create_unverified_context() + + # setup private token if appropriate + if self._engine.token != None: + if self._engine.name == "gitlab": + request.add_header('PRIVATE-TOKEN',self._engine.token) + else: + if self._verbose: print("Tokens not setup for selected engine yet") + self.urlretrieve(urllib.request.urlopen(request,context=context), self._source_zip) + # add additional checks on file size being non-zero + if self._verbose: print("Successfully downloaded update zip") + return True + except Exception as e: + self._error = "Error retrieving download, bad link?" + self._error_msg = "Error: {}".format(e) + if self._verbose: + print("Error retrieving download, bad link?") + print("Error: {}".format(e)) + return False + + + def create_backup(self): + if self._verbose: print("Backing up current addon folder") + local = os.path.join(self._updater_path,"backup") + tempdest = os.path.join(self._addon_root, + os.pardir, + self._addon+"_updater_backup_temp") + + if self._verbose: print("Backup destination path: ",local) + + if os.path.isdir(local): + try: + shutil.rmtree(local) + except: + if self._verbose:print("Failed to removed previous backup folder, contininuing") + + # remove the temp folder; shouldn't exist but could if previously interrupted + if os.path.isdir(tempdest): + try: + shutil.rmtree(tempdest) + except: + if self._verbose:print("Failed to remove existing temp folder, contininuing") + # make the full addon copy, which temporarily places outside the addon folder + if self._backup_ignore_patterns != None: + shutil.copytree( + self._addon_root,tempdest, + ignore=shutil.ignore_patterns(*self._backup_ignore_patterns)) + else: + shutil.copytree(self._addon_root,tempdest) + shutil.move(tempdest,local) + + # save the date for future ref + now = datetime.now() + self._json["backup_date"] = "{m}-{d}-{yr}".format( + m=now.strftime("%B"),d=now.day,yr=now.year) + self.save_updater_json() + + def restore_backup(self): + if self._verbose: print("Restoring backup") + + if self._verbose: print("Backing up current addon folder") + backuploc = os.path.join(self._updater_path,"backup") + tempdest = os.path.join(self._addon_root, + os.pardir, + self._addon+"_updater_backup_temp") + tempdest = os.path.abspath(tempdest) + + # make the copy + shutil.move(backuploc,tempdest) + shutil.rmtree(self._addon_root) + os.rename(tempdest,self._addon_root) + + self._json["backup_date"] = "" + self._json["just_restored"] = True + self._json["just_updated"] = True + self.save_updater_json() + + self.reload_addon() + + def unpack_staged_zip(self,clean=False): + + if os.path.isfile(self._source_zip) == False: + if self._verbose: print("Error, update zip not found") + return -1 + + # clear the existing source folder in case previous files remain + try: + shutil.rmtree(os.path.join(self._updater_path,"source")) + os.makedirs(os.path.join(self._updater_path,"source")) + if self._verbose: print("Source folder cleared and recreated") + except: + pass + + if self._verbose: print("Begin extracting source") + if zipfile.is_zipfile(self._source_zip): + with zipfile.ZipFile(self._source_zip) as zf: + # extractall is no longer a security hazard, below is safe + zf.extractall(os.path.join(self._updater_path,"source")) + else: + if self._verbose: + print("Not a zip file, future add support for just .py files") + raise ValueError("Resulting file is not a zip") + if self._verbose: print("Extracted source") + + # either directly in root of zip, or one folder level deep + unpath = os.path.join(self._updater_path,"source") + if os.path.isfile(os.path.join(unpath,"__init__.py")) == False: + dirlist = os.listdir(unpath) + if len(dirlist)>0: + if self._subfolder_path == "" or self._subfolder_path == None: + unpath = os.path.join(unpath,dirlist[0]) + else: + unpath = os.path.join(unpath,dirlist[0],self._subfolder_path) + + # smarter check for additional sub folders for a single folder + # containing __init__.py + if os.path.isfile(os.path.join(unpath,"__init__.py")) == False: + if self._verbose: + print("not a valid addon found") + print("Paths:") + print(dirlist) + + raise ValueError("__init__ file not found in new source") + + # now commence merging in the two locations: + # note this MAY not be accurate, as updater files could be placed elsewhere + origpath = os.path.dirname(__file__) + + # merge code with running addon directory, using blender default behavior + # plus any modifiers indicated by user (e.g. force remove/keep) + self.deepMergeDirectory(origpath,unpath,clean) + + # Now save the json state + # Change to True, to trigger the handler on other side + # if allowing reloading within same blender instance + self._json["just_updated"] = True + self.save_updater_json() + self.reload_addon() + self._update_ready = False + + + # merge folder 'merger' into folder 'base' without deleting existing + def deepMergeDirectory(self,base,merger,clean=False): + if not os.path.exists(base): + if self._verbose: print("Base path does not exist") + return -1 + elif not os.path.exists(merger): + if self._verbose: print("Merger path does not exist") + return -1 + + # paths to be aware of and not overwrite/remove/etc + staging_path = os.path.join(self._updater_path,"update_staging") + backup_path = os.path.join(self._updater_path,"backup") + json_path = os.path.join(self._updater_path,"updater_status.json") + + # If clean install is enabled, clear existing files ahead of time + # note: will not delete the update.json, update folder, staging, or staging + # but will delete all other folders/files in addon directory + error = None + if clean==True: + try: + # implement clearing of all folders/files, except the + # updater folder and updater json + # Careful, this deletes entire subdirectories recursively... + # make sure that base is not a high level shared folder, but + # is dedicated just to the addon itself + if self._verbose: print("clean=True, clearing addon folder to fresh install state") + + # remove root files and folders (except update folder) + files = [f for f in os.listdir(base) if os.path.isfile(os.path.join(base,f))] + folders = [f for f in os.listdir(base) if os.path.isdir(os.path.join(base,f))] + + for f in files: + os.remove(os.path.join(base,f)) + print("Clean removing file {}".format(os.path.join(base,f))) + for f in folders: + if os.path.join(base,f)==self._updater_path: continue + shutil.rmtree(os.path.join(base,f)) + print("Clean removing folder and contents {}".format(os.path.join(base,f))) + + except error: + error = "failed to create clean existing addon folder" + print(error,str(e)) + + # Walk through the base addon folder for rules on pre-removing + # but avoid removing/altering backup and updater file + for path, dirs, files in os.walk(base): + # prune ie skip updater folder + dirs[:] = [d for d in dirs if os.path.join(path,d) not in [self._updater_path]] + for file in files: + for ptrn in self.remove_pre_update_patterns: + if fnmatch.filter([file],ptrn): + try: + fl = os.path.join(path,file) + os.remove(fl) + if self._verbose: print("Pre-removed file "+file) + except OSError: + print("Failed to pre-remove "+file) + + # Walk through the temp addon sub folder for replacements + # this implements the overwrite rules, which apply after + # the above pre-removal rules. This also performs the + # actual file copying/replacements + for path, dirs, files in os.walk(merger): + # verify this structure works to prune updater sub folder overwriting + dirs[:] = [d for d in dirs if os.path.join(path,d) not in [self._updater_path]] + relPath = os.path.relpath(path, merger) + destPath = os.path.join(base, relPath) + if not os.path.exists(destPath): + os.makedirs(destPath) + for file in files: + # bring in additional logic around copying/replacing + # Blender default: overwrite .py's, don't overwrite the rest + destFile = os.path.join(destPath, file) + srcFile = os.path.join(path, file) + + # decide whether to replace if file already exists, and copy new over + if os.path.isfile(destFile): + # otherwise, check each file to see if matches an overwrite pattern + replaced=False + for ptrn in self._overwrite_patterns: + if fnmatch.filter([destFile],ptrn): + replaced=True + break + if replaced: + os.remove(destFile) + os.rename(srcFile, destFile) + if self._verbose: print("Overwrote file "+os.path.basename(destFile)) + else: + if self._verbose: print("Pattern not matched to "+os.path.basename(destFile)+", not overwritten") + else: + # file did not previously exist, simply move it over + os.rename(srcFile, destFile) + if self._verbose: print("New file "+os.path.basename(destFile)) + + # now remove the temp staging folder and downloaded zip + try: + shutil.rmtree(staging_path) + except: + error = "Error: Failed to remove existing staging directory, consider manually removing "+staging_path + if self._verbose: print(error) + + + def reload_addon(self): + # if post_update false, skip this function + # else, unload/reload addon & trigger popup + if self._auto_reload_post_update == False: + print("Restart blender to reload addon and complete update") + return + + if self._verbose: print("Reloading addon...") + addon_utils.modules(refresh=True) + bpy.utils.refresh_script_paths() + + # not allowed in restricted context, such as register module + # toggle to refresh + bpy.ops.wm.addon_disable(module=self._addon_package) + bpy.ops.wm.addon_refresh() + bpy.ops.wm.addon_enable(module=self._addon_package) + + + # ------------------------------------------------------------------------- + # Other non-api functions and setups + # ------------------------------------------------------------------------- + + def clear_state(self): + self._update_ready = None + self._update_link = None + self._update_version = None + self._source_zip = None + self._error = None + self._error_msg = None + + # custom urlretrieve implementation + def urlretrieve(self, urlfile, filepath): + chunk = 1024*8 + f = open(filepath, "wb") + while 1: + data = urlfile.read(chunk) + if not data: + #print("done.") + break + f.write(data) + #print("Read %s bytes"%len(data)) + f.close() + + + def version_tuple_from_text(self,text): + if text == None: return () + + # should go through string and remove all non-integers, + # and for any given break split into a different section + segments = [] + tmp = '' + for l in str(text): + if l.isdigit()==False: + if len(tmp)>0: + segments.append(int(tmp)) + tmp = '' + else: + tmp+=l + if len(tmp)>0: + segments.append(int(tmp)) + + if len(segments)==0: + if self._verbose: print("No version strings found text: ",text) + if self._include_branches == False: + return () + else: + return (text) + return tuple(segments) + + # called for running check in a background thread + def check_for_update_async(self, callback=None): + + if self._json != None and "update_ready" in self._json and self._json["version_text"]!={}: + if self._json["update_ready"] == True: + self._update_ready = True + self._update_link = self._json["version_text"]["link"] + self._update_version = str(self._json["version_text"]["version"]) + # cached update + callback(True) + return + + # do the check + if self._check_interval_enable == False: + return + elif self._async_checking == True: + if self._verbose: print("Skipping async check, already started") + return # already running the bg thread + elif self._update_ready == None: + self.start_async_check_update(False, callback) + + + def check_for_update_now(self, callback=None): + + self._error = None + self._error_msg = None + + if self._verbose: + print("Check update pressed, first getting current status") + if self._async_checking == True: + if self._verbose: print("Skipping async check, already started") + return # already running the bg thread + elif self._update_ready == None: + self.start_async_check_update(True, callback) + else: + self._update_ready = None + self.start_async_check_update(True, callback) + + + # this function is not async, will always return in sequential fashion + # but should have a parent which calls it in another thread + def check_for_update(self, now=False): + if self._verbose: print("Checking for update function") + + # clear the errors if any + self._error = None + self._error_msg = None + + # avoid running again in, just return past result if found + # but if force now check, then still do it + if self._update_ready != None and now == False: + return (self._update_ready,self._update_version,self._update_link) + + if self._current_version == None: + raise ValueError("current_version not yet defined") + if self._repo == None: + raise ValueError("repo not yet defined") + if self._user == None: + raise ValueError("username not yet defined") + + self.set_updater_json() # self._json + + if now == False and self.past_interval_timestamp()==False: + if self._verbose: + print("Aborting check for updated, check interval not reached") + return (False, None, None) + + # check if using tags or releases + # note that if called the first time, this will pull tags from online + if self._fake_install == True: + if self._verbose: + print("fake_install = True, setting fake version as ready") + self._update_ready = True + self._update_version = "(999,999,999)" + self._update_link = "http://127.0.0.1" + + return (self._update_ready, self._update_version, self._update_link) + + # primary internet call + self.get_tags() # sets self._tags and self._tag_latest + + self._json["last_check"] = str(datetime.now()) + self.save_updater_json() + + # can be () or ('master') in addition to branches, and version tag + new_version = self.version_tuple_from_text(self.tag_latest) + + if len(self._tags)==0: + self._update_ready = False + self._update_version = None + self._update_link = None + return (False, None, None) + if self._include_branches == False: + link = self.select_link(self, self._tags[0]) + else: + n = len(self._include_branch_list) + if len(self._tags)==n: + # effectively means no tags found on repo + # so provide the first one as default + link = self.select_link(self, self._tags[0]) + else: + link = self.select_link(self, self._tags[n]) + + if new_version == (): + self._update_ready = False + self._update_version = None + self._update_link = None + return (False, None, None) + elif str(new_version).lower() in self._include_branch_list: + # handle situation where master/whichever branch is included + # however, this code effectively is not triggered now + # as new_version will only be tag names, not branch names + if self._include_branch_autocheck == False: + # don't offer update as ready, + # but set the link for the default + # branch for installing + self._update_ready = True + self._update_version = new_version + self._update_link = link + self.save_updater_json() + return (True, new_version, link) + else: + raise ValueError("include_branch_autocheck: NOT YET DEVELOPED") + # bypass releases and look at timestamp of last update + # from a branch compared to now, see if commit values + # match or not. + + else: + # situation where branches not included + + if new_version > self._current_version: + + self._update_ready = True + self._update_version = new_version + self._update_link = link + self.save_updater_json() + return (True, new_version, link) + + # elif new_version != self._current_version: + # self._update_ready = False + # self._update_version = new_version + # self._update_link = link + # self.save_updater_json() + # return (True, new_version, link) + + # if no update, set ready to False from None + self._update_ready = False + self._update_version = None + self._update_link = None + return (False, None, None) + + + def set_tag(self,name): + tg = None + for tag in self._tags: + if name == tag["name"]: + tg = tag + break + if tg == None: + raise ValueError("Version tag not found: "+revert_tag) + new_version = self.version_tuple_from_text(self.tag_latest) + self._update_version = new_version + self._update_link = self.select_link(self, tg) + + + def run_update(self,force=False,revert_tag=None,clean=False,callback=None): + # revert_tag: could e.g. get from drop down list + # different versions of the addon to revert back to + # clean: not used, but in future could use to totally refresh addon + self._json["update_ready"] = False + self._json["ignore"] = False # clear ignore flag + self._json["version_text"] = {} + + if revert_tag != None: + self.set_tag(revert_tag) + self._update_ready = True + + # clear the errors if any + self._error = None + self._error_msg = None + + if self._verbose: print("Running update") + + if self._fake_install == True: + # change to True, to trigger the reload/"update installed" handler + if self._verbose: + print("fake_install=True") + print("Just reloading and running any handler triggers") + self._json["just_updated"] = True + self.save_updater_json() + if self._backup_current == True: + self.create_backup() + self.reload_addon() + self._update_ready = False + res = True # fake "success" zip download flag + + elif force==False: + if self._update_ready != True: + if self._verbose: print("Update stopped, new version not ready") + return "Update stopped, new version not ready" + elif self._update_link == None: + # this shouldn't happen if update is ready + if self._verbose: print("Update stopped, update link unavailable") + return "Update stopped, update link unavailable" + + if self._verbose and revert_tag==None: + print("Staging update") + elif self._verbose: + print("Staging install") + + res = self.stage_repository(self._update_link) + if res !=True: + print("Error in staging repository: "+str(res)) + if callback != None: callback(self._error_msg) + return self._error_msg + self.unpack_staged_zip(clean) + + else: + if self._update_link == None: + if self._verbose: print("Update stopped, could not get link") + return "Update stopped, could not get link" + if self._verbose: print("Forcing update") + + res = self.stage_repository(self._update_link) + if res !=True: + print("Error in staging repository: "+str(res)) + if callback != None: callback(self._error_msg) + return self._error_msg + self.unpack_staged_zip(clean) + # would need to compare against other versions held in tags + + # run the front-end's callback if provided + if callback != None: callback() + + # return something meaningful, 0 means it worked + return 0 + + + def past_interval_timestamp(self): + if self._check_interval_enable == False: + return True # ie this exact feature is disabled + + if "last_check" not in self._json or self._json["last_check"] == "": + return True + else: + now = datetime.now() + last_check = datetime.strptime(self._json["last_check"], + "%Y-%m-%d %H:%M:%S.%f") + next_check = last_check + offset = timedelta( + days=self._check_interval_days + 30*self._check_interval_months, + hours=self._check_interval_hours, + minutes=self._check_interval_minutes + ) + + delta = (now - offset) - last_check + if delta.total_seconds() > 0: + if self._verbose: + print("{} Updater: Time to check for updates!".format(self._addon)) + return True + else: + if self._verbose: + print("{} Updater: Determined it's not yet time to check for updates".format(self._addon)) + return False + + + def set_updater_json(self): + if self._updater_path == None: + raise ValueError("updater_path is not defined") + elif os.path.isdir(self._updater_path) == False: + os.makedirs(self._updater_path) + + jpath = os.path.join(self._updater_path,"updater_status.json") + if os.path.isfile(jpath): + with open(jpath) as data_file: + self._json = json.load(data_file) + if self._verbose: print("{} Updater: Read in json settings from file".format(self._addon)) + else: + # set data structure + self._json = { + "last_check":"", + "backup_date":"", + "update_ready":False, + "ignore":False, + "just_restored":False, + "just_updated":False, + "version_text":{} + } + self.save_updater_json() + + + def save_updater_json(self): + # first save the state + if self._update_ready == True: + if type(self._update_version) == type((0,0,0)): + self._json["update_ready"] = True + self._json["version_text"]["link"]=self._update_link + self._json["version_text"]["version"]=self._update_version + else: + self._json["update_ready"] = False + self._json["version_text"] = {} + else: + self._json["update_ready"] = False + self._json["version_text"] = {} + + jpath = os.path.join(self._updater_path,"updater_status.json") + outf = open(jpath,'w') + data_out = json.dumps(self._json, indent=4) + outf.write(data_out) + outf.close() + if self._verbose: + print(self._addon+": Wrote out updater json settings to file, with the contents:") + print(self._json) + + def json_reset_postupdate(self): + self._json["just_updated"] = False + self._json["update_ready"] = False + self._json["version_text"] = {} + self.save_updater_json() + + def json_reset_restore(self): + self._json["just_restored"] = False + self._json["update_ready"] = False + self._json["version_text"] = {} + self.save_updater_json() + self._update_ready = None # reset so you could check update again + + def ignore_update(self): + self._json["ignore"] = True + self.save_updater_json() + + + # ------------------------------------------------------------------------- + # ASYNC stuff + # ------------------------------------------------------------------------- + + def start_async_check_update(self, now=False, callback=None): + if self._async_checking == True: + return + if self._verbose: print("{} updater: Starting background checking thread".format(self._addon)) + check_thread = threading.Thread(target=self.async_check_update, + args=(now,callback,)) + check_thread.daemon = True + self._check_thread = check_thread + check_thread.start() + + return True + + def async_check_update(self, now, callback=None): + self._async_checking = True + if self._verbose: print("{} BG thread: Checking for update now in background".format(self._addon)) + # time.sleep(3) # to test background, in case internet too fast to tell + # try: + self.check_for_update(now=now) + # except Exception as exception: + # print("Checking for update error:") + # print(exception) + # self._update_ready = False + # self._update_version = None + # self._update_link = None + # self._error = "Error occurred" + # self._error_msg = "Encountered an error while checking for updates" + + self._async_checking = False + self._check_thread = None + + if self._verbose: + print("{} BG thread: Finished checking for update, doing callback".format(self._addon)) + if callback != None: callback(self._update_ready) + + + def stop_async_check_update(self): + if self._check_thread != None: + try: + if self._verbose: print("Thread will end in normal course.") + # however, "There is no direct kill method on a thread object." + # better to let it run its course + #self._check_thread.stop() + except: + pass + self._async_checking = False + self._error = None + self._error_msg = None + + +# ----------------------------------------------------------------------------- +# Updater Engines +# ----------------------------------------------------------------------------- + + +class BitbucketEngine(object): + + def __init__(self): + self.api_url = 'https://api.bitbucket.org' + self.token = None + self.name = "bitbucket" + + def form_repo_url(self, updater): + return self.api_url+"/2.0/repositories/"+updater.user+"/"+updater.repo + + def form_tags_url(self, updater): + return self.form_repo_url(updater) + "/refs/tags?sort=-name" + + def form_branch_url(self, branch, updater): + return self.get_zip_url(branch, updater) + + def get_zip_url(self, name, updater): + return "https://bitbucket.org/{user}/{repo}/get/{name}.zip".format( + user=updater.user, + repo=updater.repo, + name=name) + + def parse_tags(self, response, updater): + if response == None: + return [] + return [{"name": tag["name"], "zipball_url": self.get_zip_url(tag["name"], updater)} for tag in response["values"]] + + +class GithubEngine(object): + + def __init__(self): + self.api_url = 'https://api.github.com' + self.token = None + self.name = "github" + + def form_repo_url(self, updater): + return "{}{}{}{}{}".format(self.api_url,"/repos/",updater.user, + "/",updater.repo) + + def form_tags_url(self, updater): + if updater.use_releases: + return "{}{}".format(self.form_repo_url(updater),"/releases") + else: + return "{}{}".format(self.form_repo_url(updater),"/tags") + + def form_branch_list_url(self, updater): + return "{}{}".format(self.form_repo_url(updater),"/branches") + + def form_branch_url(self, branch, updater): + return "{}{}{}".format(self.form_repo_url(updater), + "/zipball/",branch) + + def parse_tags(self, response, updater): + if response == None: + return [] + return response + + +class GitlabEngine(object): + + def __init__(self): + self.api_url = 'https://gitlab.com' + self.token = None + self.name = "gitlab" + + def form_repo_url(self, updater): + return "{}{}{}".format(self.api_url,"/api/v3/projects/",updater.repo) + + def form_tags_url(self, updater): + return "{}{}".format(self.form_repo_url(updater),"/repository/tags") + + def form_branch_list_url(self, updater): + # does not validate branch name. + return "{}{}".format( + self.form_repo_url(updater), + "/repository/branches") + + def form_branch_url(self, branch, updater): + # Could clash with tag names and if it does, it will + # download TAG zip instead of branch zip to get + # direct path, would need. + return "{}{}{}".format( + self.form_repo_url(updater), + "/repository/archive.zip?sha=", + branch) + + def get_zip_url(self, sha, updater): + return "{base}/repository/archive.zip?sha:{sha}".format( + base=self.form_repo_url(updater), + sha=sha) + + # def get_commit_zip(self, id, updater): + # return self.form_repo_url(updater)+"/repository/archive.zip?sha:"+id + + def parse_tags(self, response, updater): + if response == None: + return [] + return [{"name": tag["name"], "zipball_url": self.get_zip_url(tag["commit"]["id"], updater)} for tag in response] + + +# ----------------------------------------------------------------------------- +# The module-shared class instance, +# should be what's imported to other files +# ----------------------------------------------------------------------------- + +Updater = Singleton_updater() |