diff options
author | Ben Fisher <align.downpoured@gmail.com> | 2017-04-10 03:46:20 +0300 |
---|---|---|
committer | Ben Fisher <align.downpoured@gmail.com> | 2017-04-10 03:46:20 +0300 |
commit | ec079961445dbd79a45afe1cd28acef4419f93a2 (patch) | |
tree | ce5e86e45c1b90d106b17215e20608e3aea54ae4 /Tools | |
parent | 6d39c39a94bd89c6bb635031a329fb0bbf8d76ce (diff) |
restore_from_python.py; codacity's recommendations
no functionality changes
Diffstat (limited to 'Tools')
-rw-r--r-- | Tools/Commandline/ijson.py | 22 | ||||
-rw-r--r-- | Tools/Commandline/pyaescrypt.py | 2 | ||||
-rw-r--r-- | Tools/Commandline/restore_from_python.py | 93 |
3 files changed, 56 insertions, 61 deletions
diff --git a/Tools/Commandline/ijson.py b/Tools/Commandline/ijson.py index 981b08733..d368dbc18 100644 --- a/Tools/Commandline/ijson.py +++ b/Tools/Commandline/ijson.py @@ -46,10 +46,10 @@ class ObjectBuilder(object): if event == 'map_key': self.key = value elif event == 'start_map': - map = {} - self.containers[-1](map) + mapval = {} + self.containers[-1](mapval) def setter(value): - map[self.key] = value + mapval[self.key] = value self.containers.append(setter) elif event == 'start_array': array = [] @@ -118,7 +118,7 @@ BUFSIZE = 16 * 1024 LEXEME_RE = re.compile(r'[a-z0-9eE\.\+-]+|\S') def Lexer(f, buf_size=BUFSIZE): - if type(f.read(0)) == bytetype: + if isinstance(f.read(0), bytetype): f = getreader('utf-8')(f) buf = f.read(buf_size) pos = 0 @@ -281,12 +281,8 @@ def parse(file, buf_size=BUFSIZE): def items(file, prefix): return items_impl(parse(file), prefix) -if sys.version_info[0] < 3: - b2s = lambda s: s - chr = unichr - bytetype = str -else: - def b2s(b): - return b.decode('utf-8') - chr = chr - bytetype = bytes +def b2s(b): + return b.decode('utf-8') + +assert sys.version_info[0] >= 3 +bytetype = bytes diff --git a/Tools/Commandline/pyaescrypt.py b/Tools/Commandline/pyaescrypt.py index df8438764..0b94ae5d3 100644 --- a/Tools/Commandline/pyaescrypt.py +++ b/Tools/Commandline/pyaescrypt.py @@ -153,7 +153,7 @@ def pyAesCryptStretch(passw, iv1): from Crypto.Hash import SHA256 digest=iv1+(16*b"\x00") - for i in range(8192): + for _ in range(8192): passHash=SHA256.new() passHash.update(digest) passHash.update(bytes(passw,"utf_16_le")) diff --git a/Tools/Commandline/restore_from_python.py b/Tools/Commandline/restore_from_python.py index 128dded66..655667ee1 100644 --- a/Tools/Commandline/restore_from_python.py +++ b/Tools/Commandline/restore_from_python.py @@ -25,9 +25,9 @@ from pyaescrypt import pyAesCryptDecrypt, fail_with_msg # increase for faster restores, at the cost of higher RAM usage. maxCacheSizeInMB = 100 -def mainRestore(dir, outdir, passw, scope): +def mainRestore(d, outdir, passw, scope): # locate dlist - dlists = [name for name in os.listdir(dir) if (name.endswith('.dlist.zip') or + dlists = [name for name in os.listdir(d) if (name.endswith('.dlist.zip') or name.endswith('.dlist.zip.aes'))] if dlists: @@ -36,29 +36,28 @@ def mainRestore(dir, outdir, passw, scope): # decrypt dlist file to disk if dlist.endswith('.dlist.zip.aes'): - with open(os.path.join(dir, 'py-restore-dlist-decr.zip'), 'wb') as f: - fn = lambda data: f.write(data) - pyAesCryptDecrypt(os.path.join(dir, dlist), passw, fn) - dlist = os.path.join(dir, 'py-restore-dlist-decr.zip') + with open(os.path.join(d, 'py-restore-dlist-decr.zip'), 'wb') as f: + pyAesCryptDecrypt(os.path.join(d, dlist), passw, f.write) + dlist = os.path.join(d, 'py-restore-dlist-decr.zip') else: fail_with_msg('No .dlist.zip files found.') # create cache - largestDBlock = max(os.path.getsize(os.path.join(dir, name)) - for name in os.listdir(dir) if '.dblock.zip' in name) + largestDBlock = max(os.path.getsize(os.path.join(d, name)) + for name in os.listdir(d) if '.dblock.zip' in name) amountInCache = max(1, (maxCacheSizeInMB * 1024 * 1024) // largestDBlock) cacheDecrypted = MemoizeDecorator(pyAesCryptDecrypt, amountInCache) # read some metadata from the manifest - db, numberToName = createDb(dir, 'py-restore-index.sqlite', passw, cacheDecrypted) + db, numberToName = createDb(d, 'py-restore-index.sqlite', passw, cacheDecrypted) dbopts = (db, numberToName, cacheDecrypted, passw) - opts = getArchiveOptions(dir, dlist) + opts = getArchiveOptions(d, dlist) # restore files i = 0 msgs = 0 print('Restoring files...') - for item in enumerateDlistFiles(dir, dlist): + for item in enumerateDlistFiles(d, dlist): if item['type'] == 'File' and fnmatch.fnmatch(item['path'], scope): # print a dot every 10 files to show we're still working i += 1 @@ -76,7 +75,7 @@ def mainRestore(dir, outdir, passw, scope): outPath = outdir + item['path'] try: - restoreOneFile(dir, dbopts, opts, item, outPath) + restoreOneFile(d, dbopts, opts, item, outPath) except Exception as e: msgs += 1 print(toAscii('\nWhen restoring %s to %s: %s' % @@ -87,7 +86,7 @@ def mainRestore(dir, outdir, passw, scope): db.close() print('\n\n%d warnings/errors seen.' % msgs) -def restoreOneFile(dir, dbopts, opts, listEntry, outPath): +def restoreOneFile(d, dbopts, opts, listEntry, outPath): # create destination directory if not os.path.isdir(os.path.split(outPath)[0]): os.makedirs(os.path.split(outPath)[0]) @@ -96,18 +95,18 @@ def restoreOneFile(dir, dbopts, opts, listEntry, outPath): with open(outPath, 'wb') as f: if 'blocklists' not in listEntry or not listEntry['blocklists']: # small files store data in one block - data = getContentBlock(dir, dbopts, listEntry['hash']) + data = getContentBlock(d, dbopts, listEntry['hash']) f.write(data) else: # large files point to a list of blockids, each of which points # to another list of blockids for blhi, blh in enumerate(listEntry['blocklists']): blockhashoffset = blhi * opts['hashes-per-block'] * opts['blocksize'] - binaryHashes = getContentBlock(dir, dbopts, blh) + binaryHashes = getContentBlock(d, dbopts, blh) for bi, start in enumerate(range(0, len(binaryHashes), opts['hash-size'])): - hash = binaryHashes[start: start + opts['hash-size']] - hash = base64.b64encode(hash) - data = getContentBlock(dir, dbopts, hash) + thehash = binaryHashes[start: start + opts['hash-size']] + thehash = base64.b64encode(thehash) + data = getContentBlock(d, dbopts, thehash) f.seek(blockhashoffset + bi * opts['blocksize']) f.write(data) @@ -125,17 +124,17 @@ def restoreOneFile(dir, dbopts, opts, listEntry, outPath): raise Exception('Restored %s. expected checksum %s and got %s' % (outPath, expected, got)) -def getContentBlock(dir, dbopts, blockId): +def getContentBlock(d, dbopts, blockId): if isinstance(blockId, bytes): blockId = blockId.decode('utf8') db, numberToName, cacheDecrypted, passw = dbopts name = getFilenameFromBlockId(db, numberToName, blockId) - with openAsZipFile(dir, name, passw, cacheDecrypted) as zip: - with zip.open(base64PlainToBase64Url(blockId), 'r') as zipContents: + with openAsZipFile(d, name, passw, cacheDecrypted) as z: + with z.open(base64PlainToBase64Url(blockId), 'r') as zipContents: return zipContents.read() -def openAsZipFile(dir, name, passw, cacheDecrypted): - fullpath = os.path.join(dir, name) +def openAsZipFile(d, name, passw, cacheDecrypted): + fullpath = os.path.join(d, name) assertTrue(os.path.exists(fullpath), 'missing %s' % fullpath) if name.endswith('.zip'): return zipfile.ZipFile(fullpath, 'r') @@ -143,10 +142,10 @@ def openAsZipFile(dir, name, passw, cacheDecrypted): data = io.BytesIO(cacheDecrypted(fullpath, passw)) return zipfile.ZipFile(data, 'r') -def enumerateDlistFiles(dir, dlist): +def enumerateDlistFiles(d, dlist): convertStreamToUtf8 = codecs.getreader('utf-8-sig') - with zipfile.ZipFile(os.path.join(dir, dlist), 'r') as zip: - with zip.open('filelist.json', 'r') as zipentry: + with zipfile.ZipFile(os.path.join(d, dlist), 'r') as z: + with z.open('filelist.json', 'r') as zipentry: with convertStreamToUtf8(zipentry) as zipentryutf8: for item in streamJsonArrayItems(zipentryutf8): yield item @@ -161,7 +160,7 @@ def streamJsonArrayItems(f): assertEqual('start_array', next(parsed)[1]) # construct objects. use level in order to support objects within objects - for path, event, value in parsed: + for _, event, value in parsed: currentObject.event(event, value) if event == 'start_map': level += 1 @@ -173,16 +172,16 @@ def streamJsonArrayItems(f): # ignore the final end_array event. -def createDb(dir, filename, passw, cacheDecrypted): +def createDb(d, filename, passw, cacheDecrypted): # get a summary of the current dblocks - zipfilenames = [s for s in os.listdir(dir) if + zipfilenames = [s for s in os.listdir(d) if s.endswith('.dblock.zip') or s.endswith('.dblock.zip.aes')] zipfilenames.sort() filenamesAndSizes = ';'.join(zipfilenames) filenamesAndSizes += ';'.join(map(str, - [os.path.getsize(os.path.join(dir, s)) for s in zipfilenames])) + [os.path.getsize(os.path.join(d, s)) for s in zipfilenames])) needNew = True - dbpath = os.path.join(dir, filename) + dbpath = os.path.join(d, filename) if os.path.exists(dbpath): # check that the dblocks we have match the dblocks this db has. dbCheckIfComplete = sqlite3.connect(dbpath) @@ -201,14 +200,14 @@ def createDb(dir, filename, passw, cacheDecrypted): numberToName = OrderedDict((n + 1, v) for n, v in enumerate(zipfilenames)) if needNew: print('Creating index, this may take some time...') - createBlockIdsToFilenames(dir, db, passw, cacheDecrypted, + createBlockIdsToFilenames(d, db, passw, cacheDecrypted, numberToName, filenamesAndSizes) else: print('Able to re-use existing index.') return db, numberToName -def createBlockIdsToFilenames(dir, db, passw, cache, numberToName, filenamesAndSizes): +def createBlockIdsToFilenames(d, db, passw, cache, numberToName, filenamesAndSizes): # create an index mapping blockId to filename with db: c = db.cursor() @@ -221,8 +220,8 @@ def createBlockIdsToFilenames(dir, db, passw, cache, numberToName, filenamesAndS name = numberToName[num] sys.stdout.write('.') sys.stdout.flush() - with openAsZipFile(dir, name, passw, cache) as zip: - for entryname in zip.namelist(): + with openAsZipFile(d, name, passw, cache) as z: + for entryname in z.namelist(): entryname = base64UrlToBase64Plain(entryname) c.execute('INSERT INTO BlockIdToFile (BlockId, FileNum) VALUES (?, ?)', [entryname.encode('utf8'), num]) @@ -298,18 +297,18 @@ def MemoizeDecorator(fn, cachesize): def getHasherObject(hashalg): hashalg = hashalg.lower() - if hashalg == 'sha1': return lambda: hashlib.sha1() - elif hashalg == 'md5': return lambda: hashlib.md5() - elif hashalg == 'sha256': return lambda: hashlib.sha256() - elif hashalg == 'sha384': return lambda: hashlib.sha384() - elif hashalg == 'sha512': return lambda: hashlib.sha512() + if hashalg == 'sha1': return hashlib.sha1 + elif hashalg == 'md5': return hashlib.md5 + elif hashalg == 'sha256': return hashlib.sha256 + elif hashalg == 'sha384': return hashlib.sha384 + elif hashalg == 'sha512': return hashlib.sha512 else: assertTrue(False, 'unknown hash algorithm %s' % hashalg) -def getArchiveOptions(dir, dlist): +def getArchiveOptions(d, dlist): opts = {} convertStreamToUtf8 = codecs.getreader('utf-8-sig') - with zipfile.ZipFile(os.path.join(dir, dlist), 'r') as zip: - with zip.open('manifest', 'r') as zipentry: + with zipfile.ZipFile(os.path.join(d, dlist), 'r') as z: + with z.open('manifest', 'r') as zipentry: with convertStreamToUtf8(zipentry) as zipentryutf8: alljson = zipentryutf8.read() manifest = json.loads(alljson) @@ -325,8 +324,8 @@ def getArchiveOptions(dir, dlist): def main(): print('Welcome to Python Duplicati recovery.') - dir = input('Please type the full path to a directory with Duplicati\'s .aes or .zip files:') - assertTrue(os.path.isdir(dir), 'Directory not found') + d = input('Please type the full path to a directory with Duplicati\'s .aes or .zip files:') + assertTrue(os.path.isdir(d), 'Directory not found') scope = input('Please type * to restore all files, or a pattern like /path/to/files/* to ' + 'restore the files in a certain directory)') outdir = input('Please enter the path to an empty destination directory:') @@ -337,10 +336,10 @@ def main(): # get password passw = None - if any(name.endswith('.aes') for name in os.listdir(dir)): + if any(name.endswith('.aes') for name in os.listdir(d)): passw = str(getpass.getpass("Password:")) - mainRestore(dir, outdir, passw, scope) + mainRestore(d, outdir, passw, scope) print('Complete.') if __name__ == '__main__': |