# ***** BEGIN GPL LICENSE BLOCK ***** # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # # ***** END GPL LICENCE BLOCK ***** # # (c) 2009, At Mind B.V. - Jeroen Bakker # (c) 2014, Blender Foundation - Campbell Barton import gzip import logging import os import struct import tempfile log = logging.getLogger("blendfile") FILE_BUFFER_SIZE = 1024 * 1024 class BlendFileError(Exception): """Raised when there was an error reading/parsing a blend file.""" # ----------------------------------------------------------------------------- # module global routines # # read routines # open a filename # determine if the file is compressed # and returns a handle def open_blend(filename, access="rb"): """Opens a blend file for reading or writing pending on the access supports 2 kind of blend files. Uncompressed and compressed. Known issue: does not support packaged blend files """ handle = open(filename, access) magic_test = b"BLENDER" magic = handle.read(len(magic_test)) if magic == magic_test: log.debug("normal blendfile detected") handle.seek(0, os.SEEK_SET) bfile = BlendFile(handle) bfile.is_compressed = False bfile.filepath_orig = filename return bfile elif magic[:2] == b'\x1f\x8b': log.debug("gzip blendfile detected") handle.close() log.debug("decompressing started") fs = gzip.open(filename, "rb") data = fs.read(FILE_BUFFER_SIZE) magic = data[:len(magic_test)] if magic == magic_test: handle = tempfile.TemporaryFile() while data: handle.write(data) data = fs.read(FILE_BUFFER_SIZE) log.debug("decompressing finished") fs.close() log.debug("resetting decompressed file") handle.seek(os.SEEK_SET, 0) bfile = BlendFile(handle) bfile.is_compressed = True bfile.filepath_orig = filename return bfile else: raise BlendFileError("filetype inside gzip not a blend") else: raise BlendFileError("filetype not a blend or a gzip blend") def pad_up_4(offset): return (offset + 3) & ~3 # ----------------------------------------------------------------------------- # module classes class BlendFile: """ Blend file. """ __slots__ = ( # file (result of open()) "handle", # str (original name of the file path) "filepath_orig", # BlendFileHeader "header", # struct.Struct "block_header_struct", # BlendFileBlock "blocks", # [DNAStruct, ...] "structs", # dict {b'StructName': sdna_index} # (where the index is an index into 'structs') "sdna_index_from_id", # dict {addr_old: block} "block_from_offset", # int "code_index", # bool (did we make a change) "is_modified", # bool (is file gzipped) "is_compressed", ) def __init__(self, handle): log.debug("initializing reading blend-file") self.handle = handle self.header = BlendFileHeader(handle) self.block_header_struct = self.header.create_block_header_struct() self.blocks = [] self.code_index = {} self.structs = [] self.sdna_index_from_id = {} block = BlendFileBlock(handle, self) while block.code != b'ENDB': if block.code == b'DNA1': (self.structs, self.sdna_index_from_id, ) = BlendFile.decode_structs(self.header, block, handle) else: handle.seek(block.size, os.SEEK_CUR) self.blocks.append(block) self.code_index.setdefault(block.code, []).append(block) block = BlendFileBlock(handle, self) self.is_modified = False self.blocks.append(block) if not self.structs: raise BlendFileError("No DNA1 block in file, this is not a valid .blend file!") # cache (could lazy init, incase we never use?) self.block_from_offset = {block.addr_old: block for block in self.blocks if block.code != b'ENDB'} def __repr__(self): return '<%s %r>' % (self.__class__.__qualname__, self.handle) def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def find_blocks_from_code(self, code): assert(type(code) == bytes) if code not in self.code_index: return [] return self.code_index[code] def find_block_from_offset(self, offset): # same as looking looping over all blocks, # then checking ``block.addr_old == offset`` assert(type(offset) is int) return self.block_from_offset.get(offset) def close(self): """ Close the blend file writes the blend file to disk if changes has happened """ handle = self.handle if self.is_modified: if self.is_compressed: log.debug("close compressed blend file") handle.seek(os.SEEK_SET, 0) log.debug("compressing started") fs = gzip.open(self.filepath_orig, "wb") data = handle.read(FILE_BUFFER_SIZE) while data: fs.write(data) data = handle.read(FILE_BUFFER_SIZE) fs.close() log.debug("compressing finished") handle.close() def ensure_subtype_smaller(self, sdna_index_curr, sdna_index_next): # never refine to a smaller type if (self.structs[sdna_index_curr].size > self.structs[sdna_index_next].size): raise RuntimeError("cant refine to smaller type (%s -> %s)" % (self.structs[sdna_index_curr].dna_type_id.decode('ascii'), self.structs[sdna_index_next].dna_type_id.decode('ascii'))) @staticmethod def decode_structs(header, block, handle): """ DNACatalog is a catalog of all information in the DNA1 file-block """ log.debug("building DNA catalog") shortstruct = DNA_IO.USHORT[header.endian_index] shortstruct2 = struct.Struct(header.endian_str + b'HH') intstruct = DNA_IO.UINT[header.endian_index] data = handle.read(block.size) types = [] names = [] structs = [] sdna_index_from_id = {} offset = 8 names_len = intstruct.unpack_from(data, offset)[0] offset += 4 log.debug("building #%d names" % names_len) for i in range(names_len): tName = DNA_IO.read_data0_offset(data, offset) offset = offset + len(tName) + 1 names.append(DNAName(tName)) del names_len offset = pad_up_4(offset) offset += 4 types_len = intstruct.unpack_from(data, offset)[0] offset += 4 log.debug("building #%d types" % types_len) for i in range(types_len): dna_type_id = DNA_IO.read_data0_offset(data, offset) # None will be replaced by the DNAStruct, below types.append(DNAStruct(dna_type_id)) offset += len(dna_type_id) + 1 offset = pad_up_4(offset) offset += 4 log.debug("building #%d type-lengths" % types_len) for i in range(types_len): tLen = shortstruct.unpack_from(data, offset)[0] offset = offset + 2 types[i].size = tLen del types_len offset = pad_up_4(offset) offset += 4 structs_len = intstruct.unpack_from(data, offset)[0] offset += 4 log.debug("building #%d structures" % structs_len) for sdna_index in range(structs_len): d = shortstruct2.unpack_from(data, offset) struct_type_index = d[0] offset += 4 dna_struct = types[struct_type_index] sdna_index_from_id[dna_struct.dna_type_id] = sdna_index structs.append(dna_struct) fields_len = d[1] dna_offset = 0 for field_index in range(fields_len): d2 = shortstruct2.unpack_from(data, offset) field_type_index = d2[0] field_name_index = d2[1] offset += 4 dna_type = types[field_type_index] dna_name = names[field_name_index] if dna_name.is_pointer or dna_name.is_method_pointer: dna_size = header.pointer_size * dna_name.array_size else: dna_size = dna_type.size * dna_name.array_size field = DNAField(dna_type, dna_name, dna_size, dna_offset) dna_struct.fields.append(field) dna_struct.field_from_name[dna_name.name_only] = field dna_offset += dna_size return structs, sdna_index_from_id class BlendFileBlock: """ Instance of a struct. """ __slots__ = ( # BlendFile "file", "code", "size", "addr_old", "sdna_index", "count", "file_offset", "user_data", ) def __str__(self): return ("<%s.%s (%s), size=%d at %s>" % # fields=[%s] (self.__class__.__name__, self.dna_type_name, self.code.decode(), self.size, # b", ".join(f.dna_name.name_only for f in self.dna_type.fields).decode('ascii'), hex(self.addr_old), )) def __init__(self, handle, bfile): OLDBLOCK = struct.Struct(b'4sI') self.file = bfile self.user_data = None data = handle.read(bfile.block_header_struct.size) if len(data) != bfile.block_header_struct.size: print("WARNING! Blend file seems to be badly truncated!") self.code = b'ENDB' self.size = 0 self.addr_old = 0 self.sdna_index = 0 self.count = 0 self.file_offset = 0 return # header size can be 8, 20, or 24 bytes long # 8: old blend files ENDB block (exception) # 20: normal headers 32 bit platform # 24: normal headers 64 bit platform if len(data) > 15: blockheader = bfile.block_header_struct.unpack(data) self.code = blockheader[0].partition(b'\0')[0] if self.code != b'ENDB': self.size = blockheader[1] self.addr_old = blockheader[2] self.sdna_index = blockheader[3] self.count = blockheader[4] self.file_offset = handle.tell() else: self.size = 0 self.addr_old = 0 self.sdna_index = 0 self.count = 0 self.file_offset = 0 else: blockheader = OLDBLOCK.unpack(data) self.code = blockheader[0].partition(b'\0')[0] self.code = DNA_IO.read_data0(blockheader[0]) self.size = 0 self.addr_old = 0 self.sdna_index = 0 self.count = 0 self.file_offset = 0 @property def dna_type(self): return self.file.structs[self.sdna_index] @property def dna_type_name(self): return self.dna_type.dna_type_id.decode('ascii') def refine_type_from_index(self, sdna_index_next): assert(type(sdna_index_next) is int) sdna_index_curr = self.sdna_index self.file.ensure_subtype_smaller(sdna_index_curr, sdna_index_next) self.sdna_index = sdna_index_next def refine_type(self, dna_type_id): assert(type(dna_type_id) is bytes) self.refine_type_from_index(self.file.sdna_index_from_id[dna_type_id]) def get_file_offset(self, path, default=..., sdna_index_refine=None, base_index=0, ): """ Return (offset, length) """ assert(type(path) is bytes) ofs = self.file_offset if base_index != 0: assert(base_index < self.count) ofs += (self.size // self.count) * base_index self.file.handle.seek(ofs, os.SEEK_SET) if sdna_index_refine is None: sdna_index_refine = self.sdna_index else: self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine) dna_struct = self.file.structs[sdna_index_refine] field = dna_struct.field_from_path( self.file.header, self.file.handle, path) return (self.file.handle.tell(), field.dna_name.array_size) def get(self, path, default=..., sdna_index_refine=None, use_nil=True, use_str=True, base_index=0, ): ofs = self.file_offset if base_index != 0: assert(base_index < self.count) ofs += (self.size // self.count) * base_index self.file.handle.seek(ofs, os.SEEK_SET) if sdna_index_refine is None: sdna_index_refine = self.sdna_index else: self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine) dna_struct = self.file.structs[sdna_index_refine] return dna_struct.field_get( self.file.header, self.file.handle, path, default=default, use_nil=use_nil, use_str=use_str, ) def get_recursive_iter(self, path, path_root=b"", default=..., sdna_index_refine=None, use_nil=True, use_str=True, base_index=0, ): if path_root: path_full = ( (path_root if type(path_root) is tuple else (path_root, )) + (path if type(path) is tuple else (path, ))) else: path_full = path try: yield (path_full, self.get(path_full, default, sdna_index_refine, use_nil, use_str, base_index)) except NotImplementedError as ex: msg, dna_name, dna_type = ex.args struct_index = self.file.sdna_index_from_id.get(dna_type.dna_type_id, None) if struct_index is None: yield (path_full, "<%s>" % dna_type.dna_type_id.decode('ascii')) else: struct = self.file.structs[struct_index] for f in struct.fields: yield from self.get_recursive_iter( f.dna_name.name_only, path_full, default, None, use_nil, use_str, 0) def items_recursive_iter(self): for k in self.keys(): yield from self.get_recursive_iter(k, use_str=False) def get_data_hash(self): """ Generates a 'hash' that can be used instead of addr_old as block id, and that should be 'stable' across .blend file load & save (i.e. it does not changes due to pointer addresses variations). """ # TODO This implementation is most likely far from optimal... and CRC32 is not renown as the best hashing # algo either. But for now does the job! import zlib def _is_pointer(self, k): return self.file.structs[self.sdna_index].field_from_path( self.file.header, self.file.handle, k).dna_name.is_pointer hsh = 1 for k, v in self.items_recursive_iter(): if not _is_pointer(self, k): hsh = zlib.adler32(str(v).encode(), hsh) return hsh def set(self, path, value, sdna_index_refine=None, ): if sdna_index_refine is None: sdna_index_refine = self.sdna_index else: self.file.ensure_subtype_smaller(self.sdna_index, sdna_index_refine) dna_struct = self.file.structs[sdna_index_refine] self.file.handle.seek(self.file_offset, os.SEEK_SET) self.file.is_modified = True return dna_struct.field_set( self.file.header, self.file.handle, path, value) # --------------- # Utility get/set # # avoid inline pointer casting def get_pointer( self, path, default=..., sdna_index_refine=None, base_index=0, ): if sdna_index_refine is None: sdna_index_refine = self.sdna_index result = self.get(path, default, sdna_index_refine=sdna_index_refine, base_index=base_index) # default if type(result) is not int: return result assert(self.file.structs[sdna_index_refine].field_from_path( self.file.header, self.file.handle, path).dna_name.is_pointer) if result != 0: # possible (but unlikely) # that this fails and returns None # maybe we want to raise some exception in this case return self.file.find_block_from_offset(result) else: return None # ---------------------- # Python convenience API # dict like access def __getitem__(self, item): return self.get(item, use_str=False) def __setitem__(self, item, value): self.set(item, value) def keys(self): return (f.dna_name.name_only for f in self.dna_type.fields) def values(self): for k in self.keys(): try: yield self[k] except NotImplementedError as ex: msg, dna_name, dna_type = ex.args yield "<%s>" % dna_type.dna_type_id.decode('ascii') def items(self): for k in self.keys(): try: yield (k, self[k]) except NotImplementedError as ex: msg, dna_name, dna_type = ex.args yield (k, "<%s>" % dna_type.dna_type_id.decode('ascii')) # ----------------------------------------------------------------------------- # Read Magic # # magic = str # pointer_size = int # is_little_endian = bool # version = int class BlendFileHeader: """ BlendFileHeader allocates the first 12 bytes of a blend file it contains information about the hardware architecture """ __slots__ = ( # str "magic", # int 4/8 "pointer_size", # bool "is_little_endian", # int "version", # str, used to pass to 'struct' "endian_str", # int, used to index common types "endian_index", ) def __init__(self, handle): FILEHEADER = struct.Struct(b'7s1s1s3s') log.debug("reading blend-file-header") values = FILEHEADER.unpack(handle.read(FILEHEADER.size)) self.magic = values[0] pointer_size_id = values[1] if pointer_size_id == b'-': self.pointer_size = 8 elif pointer_size_id == b'_': self.pointer_size = 4 else: assert(0) endian_id = values[2] if endian_id == b'v': self.is_little_endian = True self.endian_str = b'<' self.endian_index = 0 elif endian_id == b'V': self.is_little_endian = False self.endian_index = 1 self.endian_str = b'>' else: assert(0) version_id = values[3] self.version = int(version_id) def create_block_header_struct(self): return struct.Struct(b''.join(( self.endian_str, b'4sI', b'I' if self.pointer_size == 4 else b'Q', b'II', ))) class DNAName: """ DNAName is a C-type name stored in the DNA """ __slots__ = ( "name_full", "name_only", "is_pointer", "is_method_pointer", "array_size", ) def __init__(self, name_full): self.name_full = name_full self.name_only = self.calc_name_only() self.is_pointer = self.calc_is_pointer() self.is_method_pointer = self.calc_is_method_pointer() self.array_size = self.calc_array_size() def __repr__(self): return '%s(%r)' % (type(self).__qualname__, self.name_full) def as_reference(self, parent): if parent is None: result = b'' else: result = parent + b'.' result = result + self.name_only return result def calc_name_only(self): result = self.name_full.strip(b'*()') index = result.find(b'[') if index != -1: result = result[:index] return result def calc_is_pointer(self): return (b'*' in self.name_full) def calc_is_method_pointer(self): return (b'(*' in self.name_full) def calc_array_size(self): result = 1 temp = self.name_full index = temp.find(b'[') while index != -1: index_2 = temp.find(b']') result *= int(temp[index + 1:index_2]) temp = temp[index_2 + 1:] index = temp.find(b'[') return result class DNAField: """ DNAField is a coupled DNAStruct and DNAName and cache offset for reuse """ __slots__ = ( # DNAName "dna_name", # tuple of 3 items # [bytes (struct name), int (struct size), DNAStruct] "dna_type", # size on-disk "dna_size", # cached info (avoid looping over fields each time) "dna_offset", ) def __init__(self, dna_type, dna_name, dna_size, dna_offset): self.dna_type = dna_type self.dna_name = dna_name self.dna_size = dna_size self.dna_offset = dna_offset class DNAStruct: """ DNAStruct is a C-type structure stored in the DNA """ __slots__ = ( "dna_type_id", "size", "fields", "field_from_name", "user_data", ) def __init__(self, dna_type_id): self.dna_type_id = dna_type_id self.fields = [] self.field_from_name = {} self.user_data = None def __repr__(self): return '%s(%r)' % (type(self).__qualname__, self.dna_type_id) def field_from_path(self, header, handle, path): """ Support lookups as bytes or a tuple of bytes and optional index. C style 'id.name' --> (b'id', b'name') C style 'array[4]' --> ('array', 4) """ if type(path) is tuple: name = path[0] if len(path) >= 2 and type(path[1]) is not bytes: name_tail = path[2:] index = path[1] assert(type(index) is int) else: name_tail = path[1:] index = 0 else: name = path name_tail = None index = 0 assert(type(name) is bytes) field = self.field_from_name.get(name) if field is not None: handle.seek(field.dna_offset, os.SEEK_CUR) if index != 0: if field.dna_name.is_pointer: index_offset = header.pointer_size * index else: index_offset = field.dna_type.size * index assert(index_offset < field.dna_size) handle.seek(index_offset, os.SEEK_CUR) if not name_tail: # None or () return field else: return field.dna_type.field_from_path(header, handle, name_tail) def field_get(self, header, handle, path, default=..., use_nil=True, use_str=True, ): field = self.field_from_path(header, handle, path) if field is None: if default is not ...: return default else: raise KeyError("%r not found in %r (%r)" % (path, [f.dna_name.name_only for f in self.fields], self.dna_type_id)) dna_type = field.dna_type dna_name = field.dna_name dna_size = field.dna_size if dna_name.is_pointer: return DNA_IO.read_pointer(handle, header) elif dna_type.dna_type_id == b'int': if dna_name.array_size > 1: return [DNA_IO.read_int(handle, header) for i in range(dna_name.array_size)] return DNA_IO.read_int(handle, header) elif dna_type.dna_type_id == b'short': if dna_name.array_size > 1: return [DNA_IO.read_short(handle, header) for i in range(dna_name.array_size)] return DNA_IO.read_short(handle, header) elif dna_type.dna_type_id == b'uint64_t': if dna_name.array_size > 1: return [DNA_IO.read_ulong(handle, header) for i in range(dna_name.array_size)] return DNA_IO.read_ulong(handle, header) elif dna_type.dna_type_id == b'float': if dna_name.array_size > 1: return [DNA_IO.read_float(handle, header) for i in range(dna_name.array_size)] return DNA_IO.read_float(handle, header) elif dna_type.dna_type_id == b'char': if dna_size == 1: # Single char, assume it's bitflag or int value, and not a string/bytes data... return DNA_IO.read_char(handle, header) if use_str: if use_nil: return DNA_IO.read_string0(handle, dna_name.array_size) else: return DNA_IO.read_string(handle, dna_name.array_size) else: if use_nil: return DNA_IO.read_bytes0(handle, dna_name.array_size) else: return DNA_IO.read_bytes(handle, dna_name.array_size) else: raise NotImplementedError("%r exists but isn't pointer, can't resolve field %r" % (path, dna_name.name_only), dna_name, dna_type) def field_set(self, header, handle, path, value): assert(type(path) == bytes) field = self.field_from_path(header, handle, path) if field is None: raise KeyError("%r not found in %r" % (path, [f.dna_name.name_only for f in self.fields])) dna_type = field.dna_type dna_name = field.dna_name if dna_type.dna_type_id == b'char': if type(value) is str: return DNA_IO.write_string(handle, value, dna_name.array_size) else: return DNA_IO.write_bytes(handle, value, dna_name.array_size) else: raise NotImplementedError("Setting %r is not yet supported for %r" % (dna_type, dna_name), dna_name, dna_type) class DNA_IO: """ Module like class, for read-write utility functions. Only stores static methods & constants. """ __slots__ = () def __new__(cls, *args, **kwargs): raise RuntimeError("%s should not be instantiated" % cls) @staticmethod def write_string(handle, astring, fieldlen): assert(isinstance(astring, str)) if len(astring) >= fieldlen: stringw = astring[0:fieldlen] else: stringw = astring + '\0' handle.write(stringw.encode('utf-8')) @staticmethod def write_bytes(handle, astring, fieldlen): assert(isinstance(astring, (bytes, bytearray))) if len(astring) >= fieldlen: stringw = astring[0:fieldlen] else: stringw = astring + b'\0' handle.write(stringw) @staticmethod def read_bytes(handle, length): data = handle.read(length) return data @staticmethod def read_bytes0(handle, length): data = handle.read(length) return DNA_IO.read_data0(data) @staticmethod def read_string(handle, length): return DNA_IO.read_bytes(handle, length).decode('utf-8') @staticmethod def read_string0(handle, length): return DNA_IO.read_bytes0(handle, length).decode('utf-8') @staticmethod def read_data0_offset(data, offset): add = data.find(b'\0', offset) - offset return data[offset:offset + add] @staticmethod def read_data0(data): add = data.find(b'\0') return data[:add] UCHAR = struct.Struct(b'b') @staticmethod def read_char(handle, fileheader): st = DNA_IO.UCHAR[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] USHORT = struct.Struct(b'H') @staticmethod def read_ushort(handle, fileheader): st = DNA_IO.USHORT[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] SSHORT = struct.Struct(b'h') @staticmethod def read_short(handle, fileheader): st = DNA_IO.SSHORT[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] UINT = struct.Struct(b'I') @staticmethod def read_uint(handle, fileheader): st = DNA_IO.UINT[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] SINT = struct.Struct(b'i') @staticmethod def read_int(handle, fileheader): st = DNA_IO.SINT[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] FLOAT = struct.Struct(b'f') @staticmethod def read_float(handle, fileheader): st = DNA_IO.FLOAT[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] ULONG = struct.Struct(b'Q') @staticmethod def read_ulong(handle, fileheader): st = DNA_IO.ULONG[fileheader.endian_index] return st.unpack(handle.read(st.size))[0] @staticmethod def read_pointer(handle, header): """ reads an pointer from a file handle the pointer size is given by the header (BlendFileHeader) """ if header.pointer_size == 4: st = DNA_IO.UINT[header.endian_index] return st.unpack(handle.read(st.size))[0] if header.pointer_size == 8: st = DNA_IO.ULONG[header.endian_index] return st.unpack(handle.read(st.size))[0]