|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from array import array |
|
|
from binascii import b2a_hex |
|
|
from itertools import takewhile |
|
|
import inspect |
|
|
import logging |
|
|
import struct |
|
|
import sys |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
long = int |
|
|
unicode = str |
|
|
basestring = str |
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class Eof(Exception): |
|
|
def __init__(self, *args): |
|
|
self.args = args |
|
|
|
|
|
|
|
|
class OutOfData(Exception): |
|
|
pass |
|
|
|
|
|
|
|
|
def readn(f, size): |
|
|
data = f.read(size) |
|
|
datasize = len(data) |
|
|
if datasize == 0: |
|
|
try: |
|
|
pos = f.tell() |
|
|
except IOError: |
|
|
pos = '<UNKNOWN>' |
|
|
raise Eof(pos) |
|
|
return data |
|
|
|
|
|
|
|
|
class PrimitiveType(type): |
|
|
def __new__(mcs, name, bases, attrs): |
|
|
basetype = bases[0] |
|
|
attrs['basetype'] = basetype |
|
|
attrs.setdefault('__slots__', []) |
|
|
|
|
|
never_instantiate = attrs.pop('never_instantiate', True) |
|
|
if never_instantiate and '__new__' not in attrs: |
|
|
def __new__(cls, *args, **kwargs): |
|
|
return basetype.__new__(basetype, *args, **kwargs) |
|
|
attrs['__new__'] = __new__ |
|
|
|
|
|
if 'binfmt' in attrs: |
|
|
binfmt = attrs['binfmt'] |
|
|
fixed_size = struct.calcsize(binfmt) |
|
|
|
|
|
if 'fixed_size' in attrs: |
|
|
assert fixed_size == attrs['fixed_size'] |
|
|
else: |
|
|
attrs['fixed_size'] = fixed_size |
|
|
|
|
|
if 'decode' not in attrs: |
|
|
def decode(cls, s): |
|
|
return struct.unpack(binfmt, s)[0] |
|
|
attrs['decode'] = classmethod(decode) |
|
|
|
|
|
if 'fixed_size' in attrs and 'read' not in attrs: |
|
|
fixed_size = attrs['fixed_size'] |
|
|
|
|
|
def read(cls, f): |
|
|
s = readn(f, fixed_size) |
|
|
decode = getattr(cls, 'decode', None) |
|
|
if decode: |
|
|
return decode(s) |
|
|
return s |
|
|
attrs['read'] = classmethod(read) |
|
|
|
|
|
return type.__new__(mcs, str(name), bases, attrs) |
|
|
|
|
|
|
|
|
def Primitive(name, basetype, binfmt, **attrs): |
|
|
attrs['binfmt'] = binfmt |
|
|
return PrimitiveType(name, (basetype,), attrs) |
|
|
|
|
|
|
|
|
UINT32 = Primitive('UINT32', long, '<I') |
|
|
INT32 = Primitive('INT32', int, '<i') |
|
|
UINT16 = Primitive('UINT16', int, '<H') |
|
|
INT16 = Primitive('INT16', int, '<h') |
|
|
UINT8 = Primitive('UINT8', int, '<B') |
|
|
INT8 = Primitive('INT8', int, '<b') |
|
|
WORD = Primitive('WORD', int, '<H') |
|
|
BYTE = Primitive('BYTE', int, '<B') |
|
|
DOUBLE = Primitive('DOUBLE', float, '<d') |
|
|
WCHAR = Primitive('WCHAR', int, '<H') |
|
|
HWPUNIT = Primitive('HWPUNIT', long, '<I') |
|
|
SHWPUNIT = Primitive('SHWPUNIT', int, '<i') |
|
|
HWPUNIT16 = Primitive('HWPUNIT16', int, '<h') |
|
|
|
|
|
inch2mm = lambda x: float(int(x * 25.4 * 100 + 0.5)) / 100 |
|
|
hwp2inch = lambda x: x / 7200.0 |
|
|
hwp2mm = lambda x: inch2mm(hwp2inch(x)) |
|
|
hwp2pt = lambda x: int((x / 100.0) * 10 + 0.5) / 10.0 |
|
|
|
|
|
|
|
|
class HexBytes(type): |
|
|
def __new__(mcs, size): |
|
|
decode = staticmethod(b2a_hex) |
|
|
return type.__new__(mcs, str('HexBytes(%d)' % size), (str,), |
|
|
dict(fixed_size=size, decode=decode)) |
|
|
|
|
|
|
|
|
def decode_uint16le_array_default(bytes): |
|
|
codes = array(str('H'), bytes) |
|
|
if sys.byteorder == 'big': |
|
|
codes.byteswap() |
|
|
return codes |
|
|
|
|
|
|
|
|
def decode_uint16le_array_in_jython(bytes): |
|
|
codes = array(str('h'), bytes) |
|
|
assert codes.itemsize == 2 |
|
|
assert sys.byteorder == 'big' |
|
|
codes.byteswap() |
|
|
codes = array(str('H'), codes.tostring()) |
|
|
assert codes.itemsize == 4 |
|
|
return codes |
|
|
|
|
|
|
|
|
in_jython = sys.platform.startswith('java') |
|
|
if in_jython: |
|
|
decode_uint16le_array = decode_uint16le_array_in_jython |
|
|
else: |
|
|
decode_uint16le_array = decode_uint16le_array_default |
|
|
|
|
|
|
|
|
class BSTR(str, metaclass=PrimitiveType): |
|
|
|
|
|
def read(f): |
|
|
size = UINT16.read(f) |
|
|
if size == 0: |
|
|
return u'' |
|
|
data = readn(f, 2 * size) |
|
|
return decode_utf16le_with_hypua(data) |
|
|
read = staticmethod(read) |
|
|
|
|
|
|
|
|
def decode_utf16le_with_hypua(bytes): |
|
|
''' decode utf-16le encoded bytes with Hanyang-PUA codes into a unicode |
|
|
string with Hangul Jamo codes |
|
|
|
|
|
:param bytes: utf-16le encoded bytes with Hanyang-PUA codes |
|
|
:returns: a unicode string with Hangul Jamo codes |
|
|
''' |
|
|
return bytes.decode('utf-16le') |
|
|
|
|
|
|
|
|
class BitGroupDescriptor(object): |
|
|
def __init__(self, bitgroup): |
|
|
valuetype = int |
|
|
if isinstance(bitgroup, tuple): |
|
|
if len(bitgroup) > 2: |
|
|
lsb, msb, valuetype = bitgroup |
|
|
else: |
|
|
lsb, msb = bitgroup |
|
|
else: |
|
|
lsb = msb = bitgroup |
|
|
self.lsb = lsb |
|
|
self.msb = msb |
|
|
self.valuetype = valuetype |
|
|
|
|
|
def __get__(self, instance, owner): |
|
|
valuetype = self.valuetype |
|
|
return valuetype(self.get_int_value(instance)) |
|
|
|
|
|
def get_int_value(self, instance): |
|
|
lsb = self.lsb |
|
|
msb = self.msb |
|
|
return int(instance >> lsb) & int((2 ** (msb + 1 - lsb)) - 1) |
|
|
|
|
|
|
|
|
class FlagsType(type): |
|
|
def __new__(mcs, name, bases, attrs): |
|
|
basetype = attrs.pop('basetype') |
|
|
bases = (basetype.basetype,) |
|
|
|
|
|
bitgroups = dict((k, BitGroupDescriptor(v)) |
|
|
for k, v in attrs.items()) |
|
|
|
|
|
attrs = dict(bitgroups) |
|
|
attrs['__name__'] = name |
|
|
attrs['__slots__'] = () |
|
|
|
|
|
attrs['basetype'] = basetype |
|
|
attrs['bitfields'] = bitgroups |
|
|
|
|
|
def dictvalue(self): |
|
|
return dict((name, getattr(self, name)) |
|
|
for name in bitgroups.keys()) |
|
|
attrs['dictvalue'] = dictvalue |
|
|
|
|
|
return type.__new__(mcs, str(name), bases, attrs) |
|
|
|
|
|
|
|
|
def _lex_flags_args(args): |
|
|
for idx, arg in enumerate(args): |
|
|
while True: |
|
|
pushback = (yield idx, arg) |
|
|
if pushback is arg: |
|
|
yield |
|
|
continue |
|
|
break |
|
|
|
|
|
|
|
|
def _parse_flags_args(args): |
|
|
args = _lex_flags_args(args) |
|
|
try: |
|
|
idx = -1 |
|
|
while True: |
|
|
|
|
|
try: |
|
|
idx, lsb = next(args) |
|
|
except StopIteration: |
|
|
break |
|
|
assert isinstance(lsb, int), ('#%d arg is expected to be' |
|
|
'a int: %s' % (idx, repr(lsb))) |
|
|
|
|
|
|
|
|
idx, x = next(args) |
|
|
if isinstance(x, int): |
|
|
msb = x |
|
|
elif isinstance(x, (type, basestring)): |
|
|
args.send(x) |
|
|
msb = lsb |
|
|
else: |
|
|
assert False, '#%d arg is unexpected type: %s' % (idx, repr(x)) |
|
|
|
|
|
|
|
|
idx, x = next(args) |
|
|
assert not isinstance(x, int), ('#%d args is expected to be a type' |
|
|
'or name: %s' % (idx, repr(x))) |
|
|
if isinstance(x, type): |
|
|
t = x |
|
|
elif isinstance(x, basestring): |
|
|
args.send(x) |
|
|
t = int |
|
|
else: |
|
|
assert False, '#%d arg is unexpected type: %s' % (idx, repr(x)) |
|
|
|
|
|
|
|
|
idx, name = next(args) |
|
|
assert isinstance(name, basestring), ('#%d args is expected to be ' |
|
|
'a name: %s' % (idx, |
|
|
repr(name))) |
|
|
|
|
|
yield name, (lsb, msb, t) |
|
|
|
|
|
except StopIteration: |
|
|
assert False, '#%d arg is expected' % (idx + 1) |
|
|
|
|
|
|
|
|
def Flags(basetype, *args): |
|
|
attrs = dict(_parse_flags_args(args)) |
|
|
attrs['basetype'] = basetype |
|
|
return FlagsType('Flags', (), attrs) |
|
|
|
|
|
|
|
|
enum_type_instances = set() |
|
|
|
|
|
|
|
|
class EnumType(type): |
|
|
def __new__(mcs, enum_type_name, bases, attrs): |
|
|
items = attrs.pop('items') |
|
|
moreitems = attrs.pop('moreitems') |
|
|
|
|
|
populate_state = [1] |
|
|
|
|
|
names_by_instance = dict() |
|
|
instances_by_name = dict() |
|
|
instances_by_value = dict() |
|
|
|
|
|
def __new__(cls, value, name=None): |
|
|
if isinstance(value, cls): |
|
|
return value |
|
|
|
|
|
if name is None: |
|
|
if value in instances_by_value: |
|
|
return instances_by_value[value] |
|
|
else: |
|
|
logger.warning('undefined %s value: %s', |
|
|
cls.__name__, value) |
|
|
logger.warning('defined name/values: %s', |
|
|
str(instances_by_name)) |
|
|
return int.__new__(cls, value) |
|
|
|
|
|
if len(populate_state) == 0: |
|
|
raise TypeError() |
|
|
|
|
|
assert name not in instances_by_name |
|
|
|
|
|
if value in instances_by_value: |
|
|
self = instances_by_value[value] |
|
|
else: |
|
|
|
|
|
self = int.__new__(cls, value) |
|
|
instances_by_value[value] = self |
|
|
names_by_instance[self] = name |
|
|
|
|
|
instances_by_name[name] = self |
|
|
return self |
|
|
attrs['__new__'] = __new__ |
|
|
attrs['__slots__'] = [] |
|
|
attrs['scoping_struct'] = None |
|
|
|
|
|
class NameDescriptor(object): |
|
|
def __get__(self, instance, owner): |
|
|
if instance is None: |
|
|
return owner.__name__ |
|
|
return names_by_instance.get(instance) |
|
|
|
|
|
attrs['name'] = NameDescriptor() |
|
|
|
|
|
def __repr__(self): |
|
|
enum_name = type(self).__name__ |
|
|
item_name = self.name |
|
|
if item_name is not None: |
|
|
return enum_name + '.' + item_name |
|
|
else: |
|
|
return '%s(%d)' % (enum_name, self) |
|
|
attrs['__repr__'] = __repr__ |
|
|
|
|
|
cls = type.__new__(mcs, str(enum_type_name), bases, attrs) |
|
|
|
|
|
for v, k in enumerate(items): |
|
|
setattr(cls, k, cls(v, k)) |
|
|
for k, v in moreitems.items(): |
|
|
setattr(cls, k, cls(v, k)) |
|
|
|
|
|
cls.names = set(instances_by_name.keys()) |
|
|
cls.instances = set(names_by_instance.keys()) |
|
|
|
|
|
|
|
|
populate_state.pop() |
|
|
|
|
|
enum_type_instances.add(cls) |
|
|
return cls |
|
|
|
|
|
def __init__(cls, *args, **kwargs): |
|
|
pass |
|
|
|
|
|
|
|
|
def Enum(*items, **moreitems): |
|
|
attrs = dict(items=items, moreitems=moreitems) |
|
|
return EnumType('Enum', (int,), attrs) |
|
|
|
|
|
|
|
|
class CompoundType(type): |
|
|
def __new__(mcs, name, bases, attrs): |
|
|
return type.__new__(mcs, str(name), bases, attrs) |
|
|
|
|
|
|
|
|
class ArrayType(CompoundType): |
|
|
def __init__(self, *args, **kwargs): |
|
|
pass |
|
|
|
|
|
|
|
|
class FixedArrayType(ArrayType): |
|
|
|
|
|
classes = dict() |
|
|
|
|
|
def __new__(mcs, itemtype, size): |
|
|
key = itemtype, size |
|
|
|
|
|
cls = mcs.classes.get(key) |
|
|
if cls is not None: |
|
|
return cls |
|
|
|
|
|
attrs = dict(itemtype=itemtype, size=size) |
|
|
name = 'ARRAY(%s,%s)' % (itemtype.__name__, size) |
|
|
cls = ArrayType.__new__(mcs, str(name), (tuple,), attrs) |
|
|
mcs.classes[key] = cls |
|
|
return cls |
|
|
|
|
|
|
|
|
ARRAY = FixedArrayType |
|
|
|
|
|
|
|
|
class VariableLengthArrayType(ArrayType): |
|
|
|
|
|
classes = dict() |
|
|
|
|
|
def __new__(mcs, counttype, itemtype): |
|
|
key = counttype, itemtype |
|
|
|
|
|
cls = mcs.classes.get(key) |
|
|
if cls is not None: |
|
|
return cls |
|
|
|
|
|
attrs = dict(itemtype=itemtype, counttype=counttype) |
|
|
name = 'N_ARRAY(%s,%s)' % (counttype.__name__, itemtype.__name__) |
|
|
cls = ArrayType.__new__(mcs, str(name), (list,), attrs) |
|
|
mcs.classes[key] = cls |
|
|
return cls |
|
|
|
|
|
|
|
|
N_ARRAY = VariableLengthArrayType |
|
|
|
|
|
|
|
|
def ref_member(member_name): |
|
|
def fn(context, values): |
|
|
return values[member_name] |
|
|
fn.__doc__ = member_name |
|
|
return fn |
|
|
|
|
|
|
|
|
def ref_member_flag(member_name, bitfield_name): |
|
|
def fn(context, values): |
|
|
return getattr(values[member_name], bitfield_name) |
|
|
fn.__doc__ = '%s.%s' % (member_name, bitfield_name) |
|
|
return fn |
|
|
|
|
|
|
|
|
class X_ARRAY(object): |
|
|
|
|
|
def __init__(self, itemtype, count_reference): |
|
|
name = 'ARRAY(%s, \'%s\')' % (itemtype.__name__, |
|
|
count_reference.__doc__) |
|
|
self.__doc__ = self.__name__ = name |
|
|
self.itemtype = itemtype |
|
|
self.count_reference = count_reference |
|
|
|
|
|
def __call__(self, context, values): |
|
|
count = self.count_reference(context, values) |
|
|
return ARRAY(self.itemtype, count) |
|
|
|
|
|
|
|
|
class SelectiveType(object): |
|
|
|
|
|
def __init__(self, selector_reference, selections): |
|
|
self.__name__ = 'SelectiveType' |
|
|
self.selections = selections |
|
|
self.selector_reference = selector_reference |
|
|
|
|
|
def __call__(self, context, values): |
|
|
selector = self.selector_reference(context, values) |
|
|
return self.selections.get(selector, Struct) |
|
|
|
|
|
|
|
|
class ParseError(Exception): |
|
|
|
|
|
treegroup = None |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
Exception.__init__(self, *args, **kwargs) |
|
|
self.cause = None |
|
|
self.path = None |
|
|
self.record = None |
|
|
self.binevents = None |
|
|
self.parse_stack_traces = [] |
|
|
|
|
|
def print_to_logger(self, logger): |
|
|
e = self |
|
|
logger.error('ParseError: %s', e) |
|
|
logger.error('Caused by: %s', repr(e.cause)) |
|
|
logger.error('Path: %s', e.path) |
|
|
if e.treegroup is not None: |
|
|
logger.error('Treegroup: %s', e.treegroup) |
|
|
if e.record: |
|
|
logger.error('Record: %s', e.record['seqno']) |
|
|
logger.error('Record Payload:') |
|
|
for line in dumpbytes(e.record['payload'], True): |
|
|
logger.error(' %s', line) |
|
|
logger.error('Problem Offset: at %d (=0x%x)', e.offset, e.offset) |
|
|
if self.binevents: |
|
|
logger.error('Binary Parse Events:') |
|
|
from hwp5.bintype import log_events |
|
|
for ev, item in log_events(self.binevents, logger.error): |
|
|
pass |
|
|
logger.error('Model Stack:') |
|
|
for level, c in enumerate(reversed(e.parse_stack_traces)): |
|
|
model = c['model'] |
|
|
if isinstance(model, StructType): |
|
|
logger.error(' %s', model) |
|
|
parsed_members = c['parsed'] |
|
|
for member in parsed_members: |
|
|
offset = member.get('offset', 0) |
|
|
offset_end = member.get('offset_end', 1) |
|
|
name = member['name'] |
|
|
value = member['value'] |
|
|
logger.error(' %06x:%06x: %s = %s', |
|
|
offset, offset_end - 1, name, value) |
|
|
logger.error(' %06x: : %s', c['offset'], c['member']) |
|
|
pass |
|
|
else: |
|
|
logger.error(' %s%s', ' ' * level, c) |
|
|
|
|
|
|
|
|
def typed_struct_attributes(struct, attributes, context): |
|
|
attributes = dict(attributes) |
|
|
|
|
|
def popvalue(member): |
|
|
name = member['name'] |
|
|
if name in attributes: |
|
|
return attributes.pop(name) |
|
|
else: |
|
|
return member['type']() |
|
|
|
|
|
for member in struct.parse_members_with_inherited(context, popvalue): |
|
|
yield member |
|
|
|
|
|
|
|
|
for name, value in attributes.items(): |
|
|
yield dict(name=name, type=type(value), value=value) |
|
|
|
|
|
|
|
|
class StructType(CompoundType): |
|
|
def __init__(cls, name, bases, attrs): |
|
|
super(StructType, cls).__init__(name, bases, attrs) |
|
|
if 'attributes' in cls.__dict__: |
|
|
members = (dict(type=member[0], name=member[1]) |
|
|
if isinstance(member, tuple) |
|
|
else member |
|
|
for member in cls.attributes()) |
|
|
cls.members = list(members) |
|
|
for k, v in attrs.items(): |
|
|
if isinstance(v, EnumType): |
|
|
v.__name__ = k |
|
|
v.scoping_struct = cls |
|
|
elif isinstance(v, FlagsType): |
|
|
v.__name__ = k |
|
|
|
|
|
def parse_members(cls, context, getvalue): |
|
|
if 'attributes' not in cls.__dict__: |
|
|
return |
|
|
values = dict() |
|
|
for member in cls.members: |
|
|
member = dict(member) |
|
|
if isinstance(member['type'], X_ARRAY): |
|
|
member['type'] = member['type'](context, values) |
|
|
elif isinstance(member['type'], SelectiveType): |
|
|
member['type'] = member['type'](context, values) |
|
|
|
|
|
member_version = member.get('version') |
|
|
if member_version is None or context['version'] >= member_version: |
|
|
condition_func = member.get('condition') |
|
|
if condition_func is None or condition_func(context, values): |
|
|
try: |
|
|
value = getvalue(member) |
|
|
except ParseError as e: |
|
|
tracepoint = dict(model=cls, member=member['name']) |
|
|
e.parse_stack_traces.append(tracepoint) |
|
|
raise |
|
|
values[member['name']] = member['value'] = value |
|
|
yield member |
|
|
|
|
|
def parse_members_with_inherited(cls, context, getvalue, up_to_cls=None): |
|
|
mro = inspect.getmro(cls) |
|
|
mro = takewhile(lambda cls: cls is not up_to_cls, mro) |
|
|
mro = list(cls for cls in mro if 'attributes' in cls.__dict__) |
|
|
mro = reversed(mro) |
|
|
for cls in mro: |
|
|
for member in cls.parse_members(context, getvalue): |
|
|
yield member |
|
|
|
|
|
|
|
|
class Struct(object, metaclass=StructType): |
|
|
pass |
|
|
|
|
|
|
|
|
def dumpbytes(data, crust=False): |
|
|
if PY3: |
|
|
_ord = int |
|
|
else: |
|
|
_ord = ord |
|
|
|
|
|
offsbase = 0 |
|
|
if crust: |
|
|
yield '\t 0 1 2 3 4 5 6 7 8 9 A B C D E F' |
|
|
while len(data) > 16: |
|
|
if crust: |
|
|
line = '%05x0: ' % offsbase |
|
|
else: |
|
|
line = '' |
|
|
line += ' '.join(['%02x' % _ord(ch) for ch in data[0:16]]) |
|
|
yield line |
|
|
data = data[16:] |
|
|
offsbase += 1 |
|
|
|
|
|
if crust: |
|
|
line = '%05x0: ' % offsbase |
|
|
else: |
|
|
line = '' |
|
|
line += ' '.join(['%02x' % _ord(ch) for ch in data]) |
|
|
yield line |
|
|
|
|
|
|
|
|
def hexdump(data, crust=False): |
|
|
return '\n'.join([line for line in dumpbytes(data, crust)]) |
|
|
|
|
|
|
|
|
class IndentedOutput: |
|
|
def __init__(self, base, level): |
|
|
self.base = base |
|
|
self.level = level |
|
|
|
|
|
def write(self, x): |
|
|
for line in x.split('\n'): |
|
|
if len(line) > 0: |
|
|
self.base.write('\t' * self.level) |
|
|
self.base.write(line) |
|
|
self.base.write('\n') |
|
|
|
|
|
|
|
|
class Printer: |
|
|
def __init__(self, baseout): |
|
|
self.baseout = baseout |
|
|
|
|
|
def prints(self, *args): |
|
|
for x in args: |
|
|
self.baseout.write(str(x) + ' ') |
|
|
self.baseout.write('\n') |
|
|
|