Welcome to mirror list, hosted at ThFree Co, Russian Federation.

git.blender.org/blender-addons.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'uv_magic_uv/addon_updater.py')
-rw-r--r--uv_magic_uv/addon_updater.py1501
1 files changed, 0 insertions, 1501 deletions
diff --git a/uv_magic_uv/addon_updater.py b/uv_magic_uv/addon_updater.py
deleted file mode 100644
index 70b6a287..00000000
--- a/uv_magic_uv/addon_updater.py
+++ /dev/null
@@ -1,1501 +0,0 @@
-# ##### BEGIN GPL LICENSE BLOCK #####
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software Foundation,
-# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-#
-# ##### END GPL LICENSE BLOCK #####
-
-
-"""
-See documentation for usage
-https://github.com/CGCookie/blender-addon-updater
-
-"""
-
-import ssl
-import urllib.request
-import urllib
-import os
-import json
-import zipfile
-import shutil
-import asyncio
-import threading
-import time
-import fnmatch
-from datetime import datetime, timedelta
-
-# blender imports, used in limited cases
-import bpy
-import addon_utils
-
-# -----------------------------------------------------------------------------
-# Define error messages/notices & hard coded globals
-# -----------------------------------------------------------------------------
-
-# currently not used
-DEFAULT_TIMEOUT = 10
-DEFAULT_PER_PAGE = 30
-
-
-# -----------------------------------------------------------------------------
-# The main class
-# -----------------------------------------------------------------------------
-
-class Singleton_updater(object):
- """
- This is the singleton class to reference a copy from,
- it is the shared module level class
- """
- def __init__(self):
-
- self._engine = GithubEngine()
- self._user = None
- self._repo = None
- self._website = None
- self._current_version = None
- self._subfolder_path = None
- self._tags = []
- self._tag_latest = None
- self._tag_names = []
- self._latest_release = None
- self._use_releases = False
- self._include_branches = False
- self._include_branch_list = ['master']
- self._include_branch_autocheck = False
- self._manual_only = False
- self._version_min_update = None
- self._version_max_update = None
-
- # by default, backup current addon if new is being loaded
- self._backup_current = True
- self._backup_ignore_patterns = None
-
- # set patterns for what files to overwrite on update
- self._overwrite_patterns = ["*.py","*.pyc"]
- self._remove_pre_update_patterns = []
-
- # by default, don't auto enable/disable the addon on update
- # as it is slightly less stable/won't always fully reload module
- self._auto_reload_post_update = False
-
- # settings relating to frequency and whether to enable auto background check
- self._check_interval_enable = False
- self._check_interval_months = 0
- self._check_interval_days = 7
- self._check_interval_hours = 0
- self._check_interval_minutes = 0
-
- # runtime variables, initial conditions
- self._verbose = False
- self._fake_install = False
- self._async_checking = False # only true when async daemon started
- self._update_ready = None
- self._update_link = None
- self._update_version = None
- self._source_zip = None
- self._check_thread = None
- self.skip_tag = None
- self.select_link = None
-
- # get from module data
- self._addon = __package__.lower()
- self._addon_package = __package__ # must not change
- self._updater_path = os.path.join(os.path.dirname(__file__),
- self._addon+"_updater")
- self._addon_root = os.path.dirname(__file__)
- self._json = {}
- self._error = None
- self._error_msg = None
- self._prefiltered_tag_count = 0
-
- # UI code only, ie not used within this module but still useful
- # properties to have
-
- # to verify a valid import, in place of placeholder import
- self.showpopups = True # used in UI to show or not show update popups
- self.invalidupdater = False
-
-
- # -------------------------------------------------------------------------
- # Getters and setters
- # -------------------------------------------------------------------------
-
- @property
- def engine(self):
- return self._engine.name
- @engine.setter
- def engine(self, value):
- if value.lower()=="github":
- self._engine = GithubEngine()
- elif value.lower()=="gitlab":
- self._engine = GitlabEngine()
- elif value.lower()=="bitbucket":
- self._engine = BitbucketEngine()
- else:
- raise ValueError("Invalid engine selection")
-
- @property
- def private_token(self):
- return self._engine.token
- @private_token.setter
- def private_token(self, value):
- if value==None:
- self._engine.token = None
- else:
- self._engine.token = str(value)
-
- @property
- def addon(self):
- return self._addon
- @addon.setter
- def addon(self, value):
- self._addon = str(value)
-
- @property
- def verbose(self):
- return self._verbose
- @verbose.setter
- def verbose(self, value):
- try:
- self._verbose = bool(value)
- if self._verbose == True:
- print(self._addon+" updater verbose is enabled")
- except:
- raise ValueError("Verbose must be a boolean value")
-
- @property
- def include_branches(self):
- return self._include_branches
- @include_branches.setter
- def include_branches(self, value):
- try:
- self._include_branches = bool(value)
- except:
- raise ValueError("include_branches must be a boolean value")
-
- @property
- def use_releases(self):
- return self._use_releases
- @use_releases.setter
- def use_releases(self, value):
- try:
- self._use_releases = bool(value)
- except:
- raise ValueError("use_releases must be a boolean value")
-
- @property
- def include_branch_list(self):
- return self._include_branch_list
- @include_branch_list.setter
- def include_branch_list(self, value):
- try:
- if value == None:
- self._include_branch_list = ['master']
- elif type(value) != type(['master']) or value==[]:
- raise ValueError("include_branch_list should be a list of valid branches")
- else:
- self._include_branch_list = value
- except:
- raise ValueError("include_branch_list should be a list of valid branches")
-
- @property
- def overwrite_patterns(self):
- return self._overwrite_patterns
- @overwrite_patterns.setter
- def overwrite_patterns(self, value):
- if value == None:
- self._overwrite_patterns = ["*.py","*.pyc"]
- elif type(value) != type(['']):
- raise ValueError("overwrite_patterns needs to be in a list format")
- else:
- self._overwrite_patterns = value
-
- @property
- def remove_pre_update_patterns(self):
- return self._remove_pre_update_patterns
- @remove_pre_update_patterns.setter
- def remove_pre_update_patterns(self, value):
- if value == None:
- self._remove_pre_update_patterns = []
- elif type(value) != type(['']):
- raise ValueError("remove_pre_update_patterns needs to be in a list format")
- else:
- self._remove_pre_update_patterns = value
-
- # not currently used
- @property
- def include_branch_autocheck(self):
- return self._include_branch_autocheck
- @include_branch_autocheck.setter
- def include_branch_autocheck(self, value):
- try:
- self._include_branch_autocheck = bool(value)
- except:
- raise ValueError("include_branch_autocheck must be a boolean value")
-
- @property
- def manual_only(self):
- return self._manual_only
- @manual_only.setter
- def manual_only(self, value):
- try:
- self._manual_only = bool(value)
- except:
- raise ValueError("manual_only must be a boolean value")
-
- @property
- def auto_reload_post_update(self):
- return self._auto_reload_post_update
- @auto_reload_post_update.setter
- def auto_reload_post_update(self, value):
- try:
- self._auto_reload_post_update = bool(value)
- except:
- raise ValueError("Must be a boolean value")
-
- @property
- def fake_install(self):
- return self._fake_install
- @fake_install.setter
- def fake_install(self, value):
- if type(value) != type(False):
- raise ValueError("fake_install must be a boolean value")
- self._fake_install = bool(value)
-
- @property
- def user(self):
- return self._user
- @user.setter
- def user(self, value):
- try:
- self._user = str(value)
- except:
- raise ValueError("User must be a string value")
-
- @property
- def json(self):
- if self._json == {}:
- self.set_updater_json()
- return self._json
-
- @property
- def repo(self):
- return self._repo
- @repo.setter
- def repo(self, value):
- try:
- self._repo = str(value)
- except:
- raise ValueError("User must be a string")
-
- @property
- def website(self):
- return self._website
- @website.setter
- def website(self, value):
- if self.check_is_url(value) == False:
- raise ValueError("Not a valid URL: " + value)
- self._website = value
-
- @property
- def async_checking(self):
- return self._async_checking
-
- @property
- def api_url(self):
- return self._engine.api_url
- @api_url.setter
- def api_url(self, value):
- if self.check_is_url(value) == False:
- raise ValueError("Not a valid URL: " + value)
- self._engine.api_url = value
-
- @property
- def stage_path(self):
- return self._updater_path
- @stage_path.setter
- def stage_path(self, value):
- if value == None:
- if self._verbose: print("Aborting assigning stage_path, it's null")
- return
- elif value != None and not os.path.exists(value):
- try:
- os.makedirs(value)
- except:
- if self._verbose: print("Error trying to staging path")
- return
- self._updater_path = value
-
- @property
- def tags(self):
- if self._tags == []:
- return []
- tag_names = []
- for tag in self._tags:
- tag_names.append(tag["name"])
- return tag_names
-
- @property
- def tag_latest(self):
- if self._tag_latest == None:
- return None
- return self._tag_latest["name"]
-
- @property
- def latest_release(self):
- if self._releases_latest == None:
- return None
- return self._latest_release
-
- @property
- def current_version(self):
- return self._current_version
-
- @property
- def subfolder_path(self):
- return self._subfolder_path
-
- @subfolder_path.setter
- def subfolder_path(self, value):
- self._subfolder_path = value
-
- @property
- def update_ready(self):
- return self._update_ready
-
- @property
- def update_version(self):
- return self._update_version
-
- @property
- def update_link(self):
- return self._update_link
-
- @current_version.setter
- def current_version(self, tuple_values):
- if tuple_values==None:
- self._current_version = None
- return
- elif type(tuple_values) is not tuple:
- try:
- tuple(tuple_values)
- except:
- raise ValueError(
- "Not a tuple! current_version must be a tuple of integers")
- for i in tuple_values:
- if type(i) is not int:
- raise ValueError(
- "Not an integer! current_version must be a tuple of integers")
- self._current_version = tuple(tuple_values)
-
- def set_check_interval(self,enable=False,months=0,days=14,hours=0,minutes=0):
- # enabled = False, default initially will not check against frequency
- # if enabled, default is then 2 weeks
-
- if type(enable) is not bool:
- raise ValueError("Enable must be a boolean value")
- if type(months) is not int:
- raise ValueError("Months must be an integer value")
- if type(days) is not int:
- raise ValueError("Days must be an integer value")
- if type(hours) is not int:
- raise ValueError("Hours must be an integer value")
- if type(minutes) is not int:
- raise ValueError("Minutes must be an integer value")
-
- if enable==False:
- self._check_interval_enable = False
- else:
- self._check_interval_enable = True
-
- self._check_interval_months = months
- self._check_interval_days = days
- self._check_interval_hours = hours
- self._check_interval_minutes = minutes
-
- @property
- def check_interval(self):
- return (self._check_interval_enable,
- self._check_interval_months,
- self._check_interval_days,
- self._check_interval_hours,
- self._check_interval_minutes)
-
- @property
- def error(self):
- return self._error
-
- @property
- def error_msg(self):
- return self._error_msg
-
- @property
- def version_min_update(self):
- return self._version_min_update
- @version_min_update.setter
- def version_min_update(self, value):
- if value == None:
- self._version_min_update = None
- return
- if type(value) != type((1,2,3)):
- raise ValueError("Version minimum must be a tuple")
- else:
- # potentially check entries are integers
- self._version_min_update = value
-
- @property
- def version_max_update(self):
- return self._version_max_update
- @version_max_update.setter
- def version_max_update(self, value):
- if value == None:
- self._version_max_update = None
- return
- if type(value) != type((1,2,3)):
- raise ValueError("Version maximum must be a tuple")
- else:
- # potentially check entries are integers
- self._version_max_update = value
-
- @property
- def backup_current(self):
- return self._backup_current
- @backup_current.setter
- def backup_current(self, value):
- if value == None:
- self._backup_current = False
- return
- else:
- self._backup_current = value
-
- @property
- def backup_ignore_patterns(self):
- return self._backup_ignore_patterns
- @backup_ignore_patterns.setter
- def backup_ignore_patterns(self, value):
- if value == None:
- self._backup_ignore_patterns = None
- return
- elif type(value) != type(['list']):
- raise ValueError("Backup pattern must be in list format")
- else:
- self._backup_ignore_patterns = value
-
- # -------------------------------------------------------------------------
- # Parameter validation related functions
- # -------------------------------------------------------------------------
-
-
- def check_is_url(self, url):
- if not ("http://" in url or "https://" in url):
- return False
- if "." not in url:
- return False
- return True
-
- def get_tag_names(self):
- tag_names = []
- self.get_tags(self)
- for tag in self._tags:
- tag_names.append(tag["name"])
- return tag_names
-
-
- # declare how the class gets printed
-
- def __repr__(self):
- return "<Module updater from {a}>".format(a=__file__)
-
- def __str__(self):
- return "Updater, with user: {a}, repository: {b}, url: {c}".format(
- a=self._user,
- b=self._repo, c=self.form_repo_url())
-
-
- # -------------------------------------------------------------------------
- # API-related functions
- # -------------------------------------------------------------------------
-
- def form_repo_url(self):
- return self._engine.form_repo_url(self)
-
- def form_tags_url(self):
- return self._engine.form_tags_url(self)
-
- def form_branch_url(self, branch):
- return self._engine.form_branch_url(branch, self)
-
- def get_tags(self):
- request = self.form_tags_url()
- if self._verbose: print("Getting tags from server")
-
- # get all tags, internet call
- all_tags = self._engine.parse_tags(self.get_api(request), self)
- if all_tags is not None:
- self._prefiltered_tag_count = len(all_tags)
- else:
- self._prefiltered_tag_count = 0
- all_tags = []
-
- # pre-process to skip tags
- if self.skip_tag != None:
- self._tags = [tg for tg in all_tags if self.skip_tag(self, tg)==False]
- else:
- self._tags = all_tags
-
- # get additional branches too, if needed, and place in front
- # Does NO checking here whether branch is valid
- if self._include_branches == True:
- temp_branches = self._include_branch_list.copy()
- temp_branches.reverse()
- for branch in temp_branches:
- request = self.form_branch_url(branch)
- include = {
- "name":branch.title(),
- "zipball_url":request
- }
- self._tags = [include] + self._tags # append to front
-
- if self._tags == None:
- # some error occurred
- self._tag_latest = None
- self._tags = []
- return
- elif self._prefiltered_tag_count == 0 and self._include_branches == False:
- self._tag_latest = None
- if self._error == None: # if not None, could have had no internet
- self._error = "No releases found"
- self._error_msg = "No releases or tags found on this repository"
- if self._verbose: print("No releases or tags found on this repository")
- elif self._prefiltered_tag_count == 0 and self._include_branches == True:
- if not self._error: self._tag_latest = self._tags[0]
- if self._verbose:
- branch = self._include_branch_list[0]
- print("{} branch found, no releases".format(branch), self._tags[0])
- elif (len(self._tags)-len(self._include_branch_list)==0 and self._include_branches==True) \
- or (len(self._tags)==0 and self._include_branches==False) \
- and self._prefiltered_tag_count > 0:
- self._tag_latest = None
- self._error = "No releases available"
- self._error_msg = "No versions found within compatible version range"
- if self._verbose: print("No versions found within compatible version range")
- else:
- if self._include_branches == False:
- self._tag_latest = self._tags[0]
- if self._verbose: print("Most recent tag found:",self._tags[0]['name'])
- else:
- # don't return branch if in list
- n = len(self._include_branch_list)
- self._tag_latest = self._tags[n] # guaranteed at least len()=n+1
- if self._verbose: print("Most recent tag found:",self._tags[n]['name'])
-
-
- # all API calls to base url
- def get_raw(self, url):
- # print("Raw request:", url)
- request = urllib.request.Request(url)
- context = ssl._create_unverified_context()
-
- # setup private request headers if appropriate
- if self._engine.token != None:
- if self._engine.name == "gitlab":
- request.add_header('PRIVATE-TOKEN',self._engine.token)
- else:
- if self._verbose: print("Tokens not setup for engine yet")
-
- # run the request
- try:
- result = urllib.request.urlopen(request,context=context)
- except urllib.error.HTTPError as e:
- self._error = "HTTP error"
- self._error_msg = str(e.code)
- self._update_ready = None
- except urllib.error.URLError as e:
- reason = str(e.reason)
- if "TLSV1_ALERT" in reason or "SSL" in reason:
- self._error = "Connection rejected, download manually"
- self._error_msg = reason
- else:
- self._error = "URL error, check internet connection"
- self._error_msg = reason
- self._update_ready = None
- return None
- else:
- result_string = result.read()
- result.close()
- return result_string.decode()
-
-
- # result of all api calls, decoded into json format
- def get_api(self, url):
- # return the json version
- get = None
- get = self.get_raw(url)
- if get != None:
- try:
- return json.JSONDecoder().decode(get)
- except Exception as e:
- self._error = "API response has invalid JSON format"
- self._error_msg = str(e.reason)
- self._update_ready = None
- return None
- else:
- return None
-
-
- # create a working directory and download the new files
- def stage_repository(self, url):
-
- local = os.path.join(self._updater_path,"update_staging")
- error = None
-
- # make/clear the staging folder
- # ensure the folder is always "clean"
- if self._verbose: print("Preparing staging folder for download:\n",local)
- if os.path.isdir(local) == True:
- try:
- shutil.rmtree(local)
- os.makedirs(local)
- except:
- error = "failed to remove existing staging directory"
- else:
- try:
- os.makedirs(local)
- except:
- error = "failed to create staging directory"
-
- if error != None:
- if self._verbose: print("Error: Aborting update, "+error)
- self._error = "Update aborted, staging path error"
- self._error_msg = "Error: {}".format(error)
- return False
-
- if self._backup_current==True:
- self.create_backup()
- if self._verbose: print("Now retrieving the new source zip")
-
- self._source_zip = os.path.join(local,"source.zip")
-
- if self._verbose: print("Starting download update zip")
- try:
- request = urllib.request.Request(url)
- context = ssl._create_unverified_context()
-
- # setup private token if appropriate
- if self._engine.token != None:
- if self._engine.name == "gitlab":
- request.add_header('PRIVATE-TOKEN',self._engine.token)
- else:
- if self._verbose: print("Tokens not setup for selected engine yet")
- self.urlretrieve(urllib.request.urlopen(request,context=context), self._source_zip)
- # add additional checks on file size being non-zero
- if self._verbose: print("Successfully downloaded update zip")
- return True
- except Exception as e:
- self._error = "Error retrieving download, bad link?"
- self._error_msg = "Error: {}".format(e)
- if self._verbose:
- print("Error retrieving download, bad link?")
- print("Error: {}".format(e))
- return False
-
-
- def create_backup(self):
- if self._verbose: print("Backing up current addon folder")
- local = os.path.join(self._updater_path,"backup")
- tempdest = os.path.join(self._addon_root,
- os.pardir,
- self._addon+"_updater_backup_temp")
-
- if self._verbose: print("Backup destination path: ",local)
-
- if os.path.isdir(local):
- try:
- shutil.rmtree(local)
- except:
- if self._verbose:print("Failed to removed previous backup folder, contininuing")
-
- # remove the temp folder; shouldn't exist but could if previously interrupted
- if os.path.isdir(tempdest):
- try:
- shutil.rmtree(tempdest)
- except:
- if self._verbose:print("Failed to remove existing temp folder, contininuing")
- # make the full addon copy, which temporarily places outside the addon folder
- if self._backup_ignore_patterns != None:
- shutil.copytree(
- self._addon_root,tempdest,
- ignore=shutil.ignore_patterns(*self._backup_ignore_patterns))
- else:
- shutil.copytree(self._addon_root,tempdest)
- shutil.move(tempdest,local)
-
- # save the date for future ref
- now = datetime.now()
- self._json["backup_date"] = "{m}-{d}-{yr}".format(
- m=now.strftime("%B"),d=now.day,yr=now.year)
- self.save_updater_json()
-
- def restore_backup(self):
- if self._verbose: print("Restoring backup")
-
- if self._verbose: print("Backing up current addon folder")
- backuploc = os.path.join(self._updater_path,"backup")
- tempdest = os.path.join(self._addon_root,
- os.pardir,
- self._addon+"_updater_backup_temp")
- tempdest = os.path.abspath(tempdest)
-
- # make the copy
- shutil.move(backuploc,tempdest)
- shutil.rmtree(self._addon_root)
- os.rename(tempdest,self._addon_root)
-
- self._json["backup_date"] = ""
- self._json["just_restored"] = True
- self._json["just_updated"] = True
- self.save_updater_json()
-
- self.reload_addon()
-
- def unpack_staged_zip(self,clean=False):
-
- if os.path.isfile(self._source_zip) == False:
- if self._verbose: print("Error, update zip not found")
- return -1
-
- # clear the existing source folder in case previous files remain
- try:
- shutil.rmtree(os.path.join(self._updater_path,"source"))
- os.makedirs(os.path.join(self._updater_path,"source"))
- if self._verbose: print("Source folder cleared and recreated")
- except:
- pass
-
- if self._verbose: print("Begin extracting source")
- if zipfile.is_zipfile(self._source_zip):
- with zipfile.ZipFile(self._source_zip) as zf:
- # extractall is no longer a security hazard, below is safe
- zf.extractall(os.path.join(self._updater_path,"source"))
- else:
- if self._verbose:
- print("Not a zip file, future add support for just .py files")
- raise ValueError("Resulting file is not a zip")
- if self._verbose: print("Extracted source")
-
- # either directly in root of zip, or one folder level deep
- unpath = os.path.join(self._updater_path,"source")
- if os.path.isfile(os.path.join(unpath,"__init__.py")) == False:
- dirlist = os.listdir(unpath)
- if len(dirlist)>0:
- if self._subfolder_path == "" or self._subfolder_path == None:
- unpath = os.path.join(unpath,dirlist[0])
- else:
- unpath = os.path.join(unpath,dirlist[0],self._subfolder_path)
-
- # smarter check for additional sub folders for a single folder
- # containing __init__.py
- if os.path.isfile(os.path.join(unpath,"__init__.py")) == False:
- if self._verbose:
- print("not a valid addon found")
- print("Paths:")
- print(dirlist)
-
- raise ValueError("__init__ file not found in new source")
-
- # now commence merging in the two locations:
- # note this MAY not be accurate, as updater files could be placed elsewhere
- origpath = os.path.dirname(__file__)
-
- # merge code with running addon directory, using blender default behavior
- # plus any modifiers indicated by user (e.g. force remove/keep)
- self.deepMergeDirectory(origpath,unpath,clean)
-
- # Now save the json state
- # Change to True, to trigger the handler on other side
- # if allowing reloading within same blender instance
- self._json["just_updated"] = True
- self.save_updater_json()
- self.reload_addon()
- self._update_ready = False
-
-
- # merge folder 'merger' into folder 'base' without deleting existing
- def deepMergeDirectory(self,base,merger,clean=False):
- if not os.path.exists(base):
- if self._verbose: print("Base path does not exist")
- return -1
- elif not os.path.exists(merger):
- if self._verbose: print("Merger path does not exist")
- return -1
-
- # paths to be aware of and not overwrite/remove/etc
- staging_path = os.path.join(self._updater_path,"update_staging")
- backup_path = os.path.join(self._updater_path,"backup")
- json_path = os.path.join(self._updater_path,"updater_status.json")
-
- # If clean install is enabled, clear existing files ahead of time
- # note: will not delete the update.json, update folder, staging, or staging
- # but will delete all other folders/files in addon directory
- error = None
- if clean==True:
- try:
- # implement clearing of all folders/files, except the
- # updater folder and updater json
- # Careful, this deletes entire subdirectories recursively...
- # make sure that base is not a high level shared folder, but
- # is dedicated just to the addon itself
- if self._verbose: print("clean=True, clearing addon folder to fresh install state")
-
- # remove root files and folders (except update folder)
- files = [f for f in os.listdir(base) if os.path.isfile(os.path.join(base,f))]
- folders = [f for f in os.listdir(base) if os.path.isdir(os.path.join(base,f))]
-
- for f in files:
- os.remove(os.path.join(base,f))
- print("Clean removing file {}".format(os.path.join(base,f)))
- for f in folders:
- if os.path.join(base,f)==self._updater_path: continue
- shutil.rmtree(os.path.join(base,f))
- print("Clean removing folder and contents {}".format(os.path.join(base,f)))
-
- except error:
- error = "failed to create clean existing addon folder"
- print(error,str(e))
-
- # Walk through the base addon folder for rules on pre-removing
- # but avoid removing/altering backup and updater file
- for path, dirs, files in os.walk(base):
- # prune ie skip updater folder
- dirs[:] = [d for d in dirs if os.path.join(path,d) not in [self._updater_path]]
- for file in files:
- for ptrn in self.remove_pre_update_patterns:
- if fnmatch.filter([file],ptrn):
- try:
- fl = os.path.join(path,file)
- os.remove(fl)
- if self._verbose: print("Pre-removed file "+file)
- except OSError:
- print("Failed to pre-remove "+file)
-
- # Walk through the temp addon sub folder for replacements
- # this implements the overwrite rules, which apply after
- # the above pre-removal rules. This also performs the
- # actual file copying/replacements
- for path, dirs, files in os.walk(merger):
- # verify this structure works to prune updater sub folder overwriting
- dirs[:] = [d for d in dirs if os.path.join(path,d) not in [self._updater_path]]
- relPath = os.path.relpath(path, merger)
- destPath = os.path.join(base, relPath)
- if not os.path.exists(destPath):
- os.makedirs(destPath)
- for file in files:
- # bring in additional logic around copying/replacing
- # Blender default: overwrite .py's, don't overwrite the rest
- destFile = os.path.join(destPath, file)
- srcFile = os.path.join(path, file)
-
- # decide whether to replace if file already exists, and copy new over
- if os.path.isfile(destFile):
- # otherwise, check each file to see if matches an overwrite pattern
- replaced=False
- for ptrn in self._overwrite_patterns:
- if fnmatch.filter([destFile],ptrn):
- replaced=True
- break
- if replaced:
- os.remove(destFile)
- os.rename(srcFile, destFile)
- if self._verbose: print("Overwrote file "+os.path.basename(destFile))
- else:
- if self._verbose: print("Pattern not matched to "+os.path.basename(destFile)+", not overwritten")
- else:
- # file did not previously exist, simply move it over
- os.rename(srcFile, destFile)
- if self._verbose: print("New file "+os.path.basename(destFile))
-
- # now remove the temp staging folder and downloaded zip
- try:
- shutil.rmtree(staging_path)
- except:
- error = "Error: Failed to remove existing staging directory, consider manually removing "+staging_path
- if self._verbose: print(error)
-
-
- def reload_addon(self):
- # if post_update false, skip this function
- # else, unload/reload addon & trigger popup
- if self._auto_reload_post_update == False:
- print("Restart blender to reload addon and complete update")
- return
-
- if self._verbose: print("Reloading addon...")
- addon_utils.modules(refresh=True)
- bpy.utils.refresh_script_paths()
-
- # not allowed in restricted context, such as register module
- # toggle to refresh
- bpy.ops.wm.addon_disable(module=self._addon_package)
- bpy.ops.wm.addon_refresh()
- bpy.ops.wm.addon_enable(module=self._addon_package)
-
-
- # -------------------------------------------------------------------------
- # Other non-api functions and setups
- # -------------------------------------------------------------------------
-
- def clear_state(self):
- self._update_ready = None
- self._update_link = None
- self._update_version = None
- self._source_zip = None
- self._error = None
- self._error_msg = None
-
- # custom urlretrieve implementation
- def urlretrieve(self, urlfile, filepath):
- chunk = 1024*8
- f = open(filepath, "wb")
- while 1:
- data = urlfile.read(chunk)
- if not data:
- #print("done.")
- break
- f.write(data)
- #print("Read %s bytes"%len(data))
- f.close()
-
-
- def version_tuple_from_text(self,text):
- if text == None: return ()
-
- # should go through string and remove all non-integers,
- # and for any given break split into a different section
- segments = []
- tmp = ''
- for l in str(text):
- if l.isdigit()==False:
- if len(tmp)>0:
- segments.append(int(tmp))
- tmp = ''
- else:
- tmp+=l
- if len(tmp)>0:
- segments.append(int(tmp))
-
- if len(segments)==0:
- if self._verbose: print("No version strings found text: ",text)
- if self._include_branches == False:
- return ()
- else:
- return (text)
- return tuple(segments)
-
- # called for running check in a background thread
- def check_for_update_async(self, callback=None):
-
- if self._json != None and "update_ready" in self._json and self._json["version_text"]!={}:
- if self._json["update_ready"] == True:
- self._update_ready = True
- self._update_link = self._json["version_text"]["link"]
- self._update_version = str(self._json["version_text"]["version"])
- # cached update
- callback(True)
- return
-
- # do the check
- if self._check_interval_enable == False:
- return
- elif self._async_checking == True:
- if self._verbose: print("Skipping async check, already started")
- return # already running the bg thread
- elif self._update_ready == None:
- self.start_async_check_update(False, callback)
-
-
- def check_for_update_now(self, callback=None):
-
- self._error = None
- self._error_msg = None
-
- if self._verbose:
- print("Check update pressed, first getting current status")
- if self._async_checking == True:
- if self._verbose: print("Skipping async check, already started")
- return # already running the bg thread
- elif self._update_ready == None:
- self.start_async_check_update(True, callback)
- else:
- self._update_ready = None
- self.start_async_check_update(True, callback)
-
-
- # this function is not async, will always return in sequential fashion
- # but should have a parent which calls it in another thread
- def check_for_update(self, now=False):
- if self._verbose: print("Checking for update function")
-
- # clear the errors if any
- self._error = None
- self._error_msg = None
-
- # avoid running again in, just return past result if found
- # but if force now check, then still do it
- if self._update_ready != None and now == False:
- return (self._update_ready,self._update_version,self._update_link)
-
- if self._current_version == None:
- raise ValueError("current_version not yet defined")
- if self._repo == None:
- raise ValueError("repo not yet defined")
- if self._user == None:
- raise ValueError("username not yet defined")
-
- self.set_updater_json() # self._json
-
- if now == False and self.past_interval_timestamp()==False:
- if self._verbose:
- print("Aborting check for updated, check interval not reached")
- return (False, None, None)
-
- # check if using tags or releases
- # note that if called the first time, this will pull tags from online
- if self._fake_install == True:
- if self._verbose:
- print("fake_install = True, setting fake version as ready")
- self._update_ready = True
- self._update_version = "(999,999,999)"
- self._update_link = "http://127.0.0.1"
-
- return (self._update_ready, self._update_version, self._update_link)
-
- # primary internet call
- self.get_tags() # sets self._tags and self._tag_latest
-
- self._json["last_check"] = str(datetime.now())
- self.save_updater_json()
-
- # can be () or ('master') in addition to branches, and version tag
- new_version = self.version_tuple_from_text(self.tag_latest)
-
- if len(self._tags)==0:
- self._update_ready = False
- self._update_version = None
- self._update_link = None
- return (False, None, None)
- if self._include_branches == False:
- link = self.select_link(self, self._tags[0])
- else:
- n = len(self._include_branch_list)
- if len(self._tags)==n:
- # effectively means no tags found on repo
- # so provide the first one as default
- link = self.select_link(self, self._tags[0])
- else:
- link = self.select_link(self, self._tags[n])
-
- if new_version == ():
- self._update_ready = False
- self._update_version = None
- self._update_link = None
- return (False, None, None)
- elif str(new_version).lower() in self._include_branch_list:
- # handle situation where master/whichever branch is included
- # however, this code effectively is not triggered now
- # as new_version will only be tag names, not branch names
- if self._include_branch_autocheck == False:
- # don't offer update as ready,
- # but set the link for the default
- # branch for installing
- self._update_ready = True
- self._update_version = new_version
- self._update_link = link
- self.save_updater_json()
- return (True, new_version, link)
- else:
- raise ValueError("include_branch_autocheck: NOT YET DEVELOPED")
- # bypass releases and look at timestamp of last update
- # from a branch compared to now, see if commit values
- # match or not.
-
- else:
- # situation where branches not included
-
- if new_version > self._current_version:
-
- self._update_ready = True
- self._update_version = new_version
- self._update_link = link
- self.save_updater_json()
- return (True, new_version, link)
-
- # elif new_version != self._current_version:
- # self._update_ready = False
- # self._update_version = new_version
- # self._update_link = link
- # self.save_updater_json()
- # return (True, new_version, link)
-
- # if no update, set ready to False from None
- self._update_ready = False
- self._update_version = None
- self._update_link = None
- return (False, None, None)
-
-
- def set_tag(self,name):
- tg = None
- for tag in self._tags:
- if name == tag["name"]:
- tg = tag
- break
- if tg == None:
- raise ValueError("Version tag not found: "+revert_tag)
- new_version = self.version_tuple_from_text(self.tag_latest)
- self._update_version = new_version
- self._update_link = self.select_link(self, tg)
-
-
- def run_update(self,force=False,revert_tag=None,clean=False,callback=None):
- # revert_tag: could e.g. get from drop down list
- # different versions of the addon to revert back to
- # clean: not used, but in future could use to totally refresh addon
- self._json["update_ready"] = False
- self._json["ignore"] = False # clear ignore flag
- self._json["version_text"] = {}
-
- if revert_tag != None:
- self.set_tag(revert_tag)
- self._update_ready = True
-
- # clear the errors if any
- self._error = None
- self._error_msg = None
-
- if self._verbose: print("Running update")
-
- if self._fake_install == True:
- # change to True, to trigger the reload/"update installed" handler
- if self._verbose:
- print("fake_install=True")
- print("Just reloading and running any handler triggers")
- self._json["just_updated"] = True
- self.save_updater_json()
- if self._backup_current == True:
- self.create_backup()
- self.reload_addon()
- self._update_ready = False
- res = True # fake "success" zip download flag
-
- elif force==False:
- if self._update_ready != True:
- if self._verbose: print("Update stopped, new version not ready")
- return "Update stopped, new version not ready"
- elif self._update_link == None:
- # this shouldn't happen if update is ready
- if self._verbose: print("Update stopped, update link unavailable")
- return "Update stopped, update link unavailable"
-
- if self._verbose and revert_tag==None:
- print("Staging update")
- elif self._verbose:
- print("Staging install")
-
- res = self.stage_repository(self._update_link)
- if res !=True:
- print("Error in staging repository: "+str(res))
- if callback != None: callback(self._error_msg)
- return self._error_msg
- self.unpack_staged_zip(clean)
-
- else:
- if self._update_link == None:
- if self._verbose: print("Update stopped, could not get link")
- return "Update stopped, could not get link"
- if self._verbose: print("Forcing update")
-
- res = self.stage_repository(self._update_link)
- if res !=True:
- print("Error in staging repository: "+str(res))
- if callback != None: callback(self._error_msg)
- return self._error_msg
- self.unpack_staged_zip(clean)
- # would need to compare against other versions held in tags
-
- # run the front-end's callback if provided
- if callback != None: callback()
-
- # return something meaningful, 0 means it worked
- return 0
-
-
- def past_interval_timestamp(self):
- if self._check_interval_enable == False:
- return True # ie this exact feature is disabled
-
- if "last_check" not in self._json or self._json["last_check"] == "":
- return True
- else:
- now = datetime.now()
- last_check = datetime.strptime(self._json["last_check"],
- "%Y-%m-%d %H:%M:%S.%f")
- next_check = last_check
- offset = timedelta(
- days=self._check_interval_days + 30*self._check_interval_months,
- hours=self._check_interval_hours,
- minutes=self._check_interval_minutes
- )
-
- delta = (now - offset) - last_check
- if delta.total_seconds() > 0:
- if self._verbose:
- print("{} Updater: Time to check for updates!".format(self._addon))
- return True
- else:
- if self._verbose:
- print("{} Updater: Determined it's not yet time to check for updates".format(self._addon))
- return False
-
-
- def set_updater_json(self):
- if self._updater_path == None:
- raise ValueError("updater_path is not defined")
- elif os.path.isdir(self._updater_path) == False:
- os.makedirs(self._updater_path)
-
- jpath = os.path.join(self._updater_path,"updater_status.json")
- if os.path.isfile(jpath):
- with open(jpath) as data_file:
- self._json = json.load(data_file)
- if self._verbose: print("{} Updater: Read in json settings from file".format(self._addon))
- else:
- # set data structure
- self._json = {
- "last_check":"",
- "backup_date":"",
- "update_ready":False,
- "ignore":False,
- "just_restored":False,
- "just_updated":False,
- "version_text":{}
- }
- self.save_updater_json()
-
-
- def save_updater_json(self):
- # first save the state
- if self._update_ready == True:
- if type(self._update_version) == type((0,0,0)):
- self._json["update_ready"] = True
- self._json["version_text"]["link"]=self._update_link
- self._json["version_text"]["version"]=self._update_version
- else:
- self._json["update_ready"] = False
- self._json["version_text"] = {}
- else:
- self._json["update_ready"] = False
- self._json["version_text"] = {}
-
- jpath = os.path.join(self._updater_path,"updater_status.json")
- outf = open(jpath,'w')
- data_out = json.dumps(self._json, indent=4)
- outf.write(data_out)
- outf.close()
- if self._verbose:
- print(self._addon+": Wrote out updater json settings to file, with the contents:")
- print(self._json)
-
- def json_reset_postupdate(self):
- self._json["just_updated"] = False
- self._json["update_ready"] = False
- self._json["version_text"] = {}
- self.save_updater_json()
-
- def json_reset_restore(self):
- self._json["just_restored"] = False
- self._json["update_ready"] = False
- self._json["version_text"] = {}
- self.save_updater_json()
- self._update_ready = None # reset so you could check update again
-
- def ignore_update(self):
- self._json["ignore"] = True
- self.save_updater_json()
-
-
- # -------------------------------------------------------------------------
- # ASYNC stuff
- # -------------------------------------------------------------------------
-
- def start_async_check_update(self, now=False, callback=None):
- if self._async_checking == True:
- return
- if self._verbose: print("{} updater: Starting background checking thread".format(self._addon))
- check_thread = threading.Thread(target=self.async_check_update,
- args=(now,callback,))
- check_thread.daemon = True
- self._check_thread = check_thread
- check_thread.start()
-
- return True
-
- def async_check_update(self, now, callback=None):
- self._async_checking = True
- if self._verbose: print("{} BG thread: Checking for update now in background".format(self._addon))
- # time.sleep(3) # to test background, in case internet too fast to tell
- # try:
- self.check_for_update(now=now)
- # except Exception as exception:
- # print("Checking for update error:")
- # print(exception)
- # self._update_ready = False
- # self._update_version = None
- # self._update_link = None
- # self._error = "Error occurred"
- # self._error_msg = "Encountered an error while checking for updates"
-
- self._async_checking = False
- self._check_thread = None
-
- if self._verbose:
- print("{} BG thread: Finished checking for update, doing callback".format(self._addon))
- if callback != None: callback(self._update_ready)
-
-
- def stop_async_check_update(self):
- if self._check_thread != None:
- try:
- if self._verbose: print("Thread will end in normal course.")
- # however, "There is no direct kill method on a thread object."
- # better to let it run its course
- #self._check_thread.stop()
- except:
- pass
- self._async_checking = False
- self._error = None
- self._error_msg = None
-
-
-# -----------------------------------------------------------------------------
-# Updater Engines
-# -----------------------------------------------------------------------------
-
-
-class BitbucketEngine(object):
-
- def __init__(self):
- self.api_url = 'https://api.bitbucket.org'
- self.token = None
- self.name = "bitbucket"
-
- def form_repo_url(self, updater):
- return self.api_url+"/2.0/repositories/"+updater.user+"/"+updater.repo
-
- def form_tags_url(self, updater):
- return self.form_repo_url(updater) + "/refs/tags?sort=-name"
-
- def form_branch_url(self, branch, updater):
- return self.get_zip_url(branch, updater)
-
- def get_zip_url(self, name, updater):
- return "https://bitbucket.org/{user}/{repo}/get/{name}.zip".format(
- user=updater.user,
- repo=updater.repo,
- name=name)
-
- def parse_tags(self, response, updater):
- if response == None:
- return []
- return [{"name": tag["name"], "zipball_url": self.get_zip_url(tag["name"], updater)} for tag in response["values"]]
-
-
-class GithubEngine(object):
-
- def __init__(self):
- self.api_url = 'https://api.github.com'
- self.token = None
- self.name = "github"
-
- def form_repo_url(self, updater):
- return "{}{}{}{}{}".format(self.api_url,"/repos/",updater.user,
- "/",updater.repo)
-
- def form_tags_url(self, updater):
- if updater.use_releases:
- return "{}{}".format(self.form_repo_url(updater),"/releases")
- else:
- return "{}{}".format(self.form_repo_url(updater),"/tags")
-
- def form_branch_list_url(self, updater):
- return "{}{}".format(self.form_repo_url(updater),"/branches")
-
- def form_branch_url(self, branch, updater):
- return "{}{}{}".format(self.form_repo_url(updater),
- "/zipball/",branch)
-
- def parse_tags(self, response, updater):
- if response == None:
- return []
- return response
-
-
-class GitlabEngine(object):
-
- def __init__(self):
- self.api_url = 'https://gitlab.com'
- self.token = None
- self.name = "gitlab"
-
- def form_repo_url(self, updater):
- return "{}{}{}".format(self.api_url,"/api/v3/projects/",updater.repo)
-
- def form_tags_url(self, updater):
- return "{}{}".format(self.form_repo_url(updater),"/repository/tags")
-
- def form_branch_list_url(self, updater):
- # does not validate branch name.
- return "{}{}".format(
- self.form_repo_url(updater),
- "/repository/branches")
-
- def form_branch_url(self, branch, updater):
- # Could clash with tag names and if it does, it will
- # download TAG zip instead of branch zip to get
- # direct path, would need.
- return "{}{}{}".format(
- self.form_repo_url(updater),
- "/repository/archive.zip?sha=",
- branch)
-
- def get_zip_url(self, sha, updater):
- return "{base}/repository/archive.zip?sha:{sha}".format(
- base=self.form_repo_url(updater),
- sha=sha)
-
- # def get_commit_zip(self, id, updater):
- # return self.form_repo_url(updater)+"/repository/archive.zip?sha:"+id
-
- def parse_tags(self, response, updater):
- if response == None:
- return []
- return [{"name": tag["name"], "zipball_url": self.get_zip_url(tag["commit"]["id"], updater)} for tag in response]
-
-
-# -----------------------------------------------------------------------------
-# The module-shared class instance,
-# should be what's imported to other files
-# -----------------------------------------------------------------------------
-
-Updater = Singleton_updater()