# -*- coding: utf-8 -*- # # pyhwp : hwp file format parser in python # Copyright (C) 2010-2023 mete0r # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # from __future__ import absolute_import from __future__ import print_function from __future__ import unicode_literals from collections import deque from pprint import pprint import logging import struct import sys from .dataio import BSTR from .dataio import FixedArrayType from .dataio import FlagsType from .dataio import ParseError from .dataio import SelectiveType from .dataio import StructType from .dataio import VariableLengthArrayType from .dataio import X_ARRAY from .dataio import readn from .treeop import STARTEVENT, ENDEVENT from .treeop import iter_subevents logger = logging.getLogger(__name__) def bintype_map_events(bin_item): bin_type = bin_item['type'] if isinstance(bin_type, StructType): yield STARTEVENT, bin_item if hasattr(bin_type, 'members'): for member in bin_type.members: for x in bintype_map_events(member): yield x yield ENDEVENT, bin_item elif isinstance(bin_type, FixedArrayType): yield STARTEVENT, bin_item item = dict(type=bin_type.itemtype) for x in bintype_map_events(item): yield x yield ENDEVENT, bin_item elif isinstance(bin_type, VariableLengthArrayType): yield STARTEVENT, bin_item item = dict(type=bin_type.itemtype) for x in bintype_map_events(item): yield x yield ENDEVENT, bin_item elif isinstance(bin_type, X_ARRAY): yield STARTEVENT, bin_item item = dict(type=bin_type.itemtype) for x in bintype_map_events(item): yield x yield ENDEVENT, bin_item elif isinstance(bin_type, SelectiveType): yield STARTEVENT, bin_item for k, v in bin_type.selections.items(): item = dict(bin_item, select_when=k, type=v) for x in bintype_map_events(item): yield x yield ENDEVENT, bin_item elif isinstance(bin_type, FlagsType): # TODO: this should be done in model definitions # bin_type: used in binary reading # flags_type: binary value to flags type bin_item['bin_type'] = bin_type.basetype bin_item['flags_type'] = bin_type yield None, bin_item else: yield None, bin_item def filter_with_version(events, version): for ev, item in events: required_version = item.get('version') if required_version is not None and version < required_version: # just consume and skip this tree logger.debug('skip following: (required version: %s)', required_version) logger.debug(' %s', (ev, item)) if ev is STARTEVENT: for x in iter_subevents(events): pass continue yield ev, item def make_items_immutable(events): stack = [] for ev, item in events: if ev is None: item = tuple(sorted(item.items())) elif ev is STARTEVENT: item = tuple(sorted(item.items())) stack.append(item) elif ev is ENDEVENT: item = stack.pop() yield ev, item def compile_type_definition(bin_item): events = bintype_map_events(bin_item) events = make_items_immutable(events) return tuple(events) master_typedefs = dict() def get_compiled_typedef(type): if type not in master_typedefs: logger.info('compile typedef of %s', type) typedef_events = compile_type_definition(dict(type=type)) master_typedefs[type] = typedef_events return master_typedefs[type] versioned_typedefs = dict() def get_compiled_typedef_with_version(type, version): if version not in versioned_typedefs: versioned_typedefs[version] = typedefs = dict() typedefs = versioned_typedefs[version] if type not in typedefs: logger.info('filter compiled typedef of %s with version %s', type, version) typedef_events = get_compiled_typedef(type) events = static_to_mutable(typedef_events) events = filter_with_version(events, version) events = make_items_immutable(events) events = tuple(events) typedefs[type] = events return typedefs[type] class ERROREVENT(object): pass def static_to_mutable(events): stack = [] for ev, item in events: if ev is None: item = dict(item) elif ev is STARTEVENT: item = dict(item) stack.append(item) elif ev is ENDEVENT: item = stack.pop() yield ev, item def pop_subevents(events_deque): level = 0 while len(events_deque) > 0: event, item = events_deque.popleft() yield event, item if event is STARTEVENT: level += 1 elif event is ENDEVENT: if level > 0: level -= 1 else: return def resolve_typedefs(typedef_events, context): array_types = (X_ARRAY, VariableLengthArrayType, FixedArrayType) stack = [] selective_stack = [] events = static_to_mutable(typedef_events) events = deque(events) while len(events) > 0: ev, item = events.popleft() if isinstance(item['type'], SelectiveType): if ev is STARTEVENT: parent_struct = stack[-1] struct_value = parent_struct['value'] selector_reference = item['type'].selector_reference select_key = selector_reference(context, struct_value) logger.debug('select_key: %s', select_key) item['select_key'] = select_key selective_stack.append(item) elif ev is ENDEVENT: selective_stack.pop() else: assert False elif 'select_when' in item: assert ev in (None, STARTEVENT) select_key = selective_stack[-1]['select_key'] select_when = item.pop('select_when') if select_when != select_key: # just consume and skip this tree logger.debug('skip following: (select key %r != %r)', select_key, select_when) logger.debug(' %s', (ev, item)) if ev is STARTEVENT: for x in pop_subevents(events): logger.debug(' %s', x) pass continue logger.debug('selected for: %r', select_when) events.appendleft((ev, item)) elif 'condition' in item: assert ev in (STARTEVENT, None) condition = item.pop('condition') parent_struct = stack[-1] if not condition(context, parent_struct['value']): # just consume and skip this tree logger.debug('skip following: (not matched condition: %s)', condition) logger.debug(' %s', (ev, item)) if ev is STARTEVENT: for x in pop_subevents(events): logger.debug(' %s', x) pass continue events.appendleft((ev, item)) elif isinstance(item['type'], array_types) and 'count' not in item: assert ev is STARTEVENT if isinstance(item['type'], X_ARRAY): parent_struct = stack[-1] struct_value = parent_struct['value'] count_reference = item['type'].count_reference count = count_reference(context, struct_value) elif isinstance(item['type'], VariableLengthArrayType): count = dict(type=item['type'].counttype, dontcollect=True) yield None, count count = count['value'] elif isinstance(item['type'], FixedArrayType): count = item['type'].size item['count'] = count subevents = list(pop_subevents(events)) endevent = subevents[-1] subevents = subevents[:-1] def clone(events): stack = [] for ev, item in events: if ev in (STARTEVENT, None): item = dict(item) if ev is STARTEVENT: stack.append(item) else: item = stack.pop() yield ev, item events.appendleft(endevent) for _ in range(0, count): cloned = list(clone(subevents)) events.extendleft(reversed(cloned)) events.appendleft((ev, item)) else: if ev is STARTEVENT: stack.append(item) elif ev is ENDEVENT: stack.pop() yield ev, item def evaluate_bin_values(events): for ev, item in events: if 'flags_type' in item: flags_type = item['flags_type'] assert isinstance(flags_type, FlagsType) item['value'] = flags_type(item['value']) yield ev, item def construct_composite_values(events): stack = [] for ev, item in events: if ev is STARTEVENT: if isinstance(item['type'], StructType): item['value'] = dict() elif isinstance(item['type'], (X_ARRAY, VariableLengthArrayType, FixedArrayType)): item['value'] = list() else: assert False stack.append(item) elif ev in (None, ENDEVENT): if ev is ENDEVENT: item = stack.pop() if isinstance(item['type'], FixedArrayType): item['value'] = tuple(item['value']) if len(stack) > 0: if not item.get('dontcollect', False): if isinstance(stack[-1]['type'], StructType): # reduce a struct member into struct value stack[-1]['value'][item['name']] = item['value'] elif isinstance(stack[-1]['type'], (X_ARRAY, VariableLengthArrayType, FixedArrayType)): stack[-1]['value'].append(item['value']) yield ev, item def log_events(events, log_fn): for ev, item in events: if ev in (STARTEVENT, ENDEVENT): fmt = ['%s:'] val = [ev.__name__] else: fmt = [' %04x:'] val = [item['bin_offset']] fmt.append('%s') val.append(item['type'].__name__) if 'name' in item: fmt.append('%r') val.append(str(item['name'])) if 'value' in item and ev is None: fmt.append('%r') val.append(item['value']) if 'exception' in item: fmt.append('-- Exception: %r') val.append(item['exception']) log_fn(' '.join(fmt), *val) yield ev, item def eval_typedef_events(typedef_events, context, resolve_values): events = static_to_mutable(typedef_events) events = resolve_typedefs(events, context) events = resolve_values(events) events = evaluate_bin_values(events) events = construct_composite_values(events) events = log_events(events, logger.debug) return events def resolve_values_from_stream(stream): def resolve_values(events): for ev, item in events: if ev is None: item['bin_offset'] = stream.tell() try: item['value'] = resolve_value_from_stream(item, stream) except Exception as e: item['exception'] = e ev = ERROREVENT yield ev, item return resolve_values def resolve_value_from_stream(item, stream): from hwp5.binmodel import ParaTextChunks from hwp5.binmodel import CHID if 'bin_type' in item: item_type = item['bin_type'] else: item_type = item['type'] if hasattr(item_type, 'binfmt'): binfmt = item_type.binfmt binsize = struct.calcsize(binfmt) bytes = readn(stream, binsize) unpacked = struct.unpack(binfmt, bytes) return unpacked[0] elif item_type is CHID: bytes = readn(stream, 4) return CHID.decode(bytes) elif item_type is BSTR: return BSTR.read(stream) elif item_type is ParaTextChunks: return ParaTextChunks.read(stream) elif hasattr(item_type, 'fixed_size'): bytes = readn(stream, item_type.fixed_size) if hasattr(item_type, 'decode'): return item_type.decode(bytes) return bytes else: assert hasattr(item_type, 'read') logger.warning('%s: item type relies on its read() to resolve a value', item_type.__name__) return item_type.read(stream) def resolve_type_events(type, context, resolve_values): # get typedef events: if current version is specified in the context, # get version specific typedef if 'version' in context: version = context['version'] events = get_compiled_typedef_with_version(type, version) else: events = get_compiled_typedef(type) # evaluate with context/stream return eval_typedef_events(events, context, resolve_values) def read_type_events(type, context, stream): resolve_values = resolve_values_from_stream(stream) events = resolve_type_events(type, context, resolve_values) for ev, item in events: yield ev, item if ev is ERROREVENT: e = item['exception'] msg = 'can\'t parse %s' % type pe = ParseError(msg) pe.cause = e pe.path = context.get('path') pe.treegroup = context.get('treegroup') pe.record = context.get('record') pe.offset = item.get('bin_offset') raise pe def read_type_item(type, context, stream, binevents=None): if binevents is None: binevents = [] try: binevents.extend(read_type_events(type, context, stream)) except ParseError as e: e.binevents = binevents raise return binevents[-1][1] def read_type(type, context, stream, binevents=None): item = read_type_item(type, context, stream, binevents) return item['value'] def dump_events(events): def prefix_level(event_prefixed_items): level = 0 for ev, item in event_prefixed_items: if ev is STARTEVENT: yield level, item level += 1 elif ev is ENDEVENT: level -= 1 else: yield level, item def item_to_dict(events): for ev, item in events: yield ev, dict(item) def type_to_string(events): for ev, item in events: item['type'] = item['type'].__name__ yield ev, item def condition_to_string(events): for ev, item in events: if 'condition' in item: item['condition'] = item['condition'].__name__ yield ev, item events = item_to_dict(events) events = type_to_string(events) events = condition_to_string(events) for level, item in prefix_level(events): indents = '' if level > 0: if level > 1: indents = ' ' * (level - 2) + ' ' indents += '- ' print('{}{}'.format(indents, item)) def main(): logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) import hwp5.binmodel name = sys.argv[1] type = getattr(hwp5.binmodel, name) typedef_events = compile_type_definition(dict(type=type)) pprint(typedef_events) context = {} def resolve_values(events): for ev, item in events: if ev is None: print('') for k, v in sorted(item.items()): print('- {} : {}'.format(k, v)) value = raw_input('>> ') value = eval(value) if isinstance(item['type'], FlagsType): value = item['type'](value) item['value'] = value yield ev, item events = eval_typedef_events(typedef_events, context, resolve_values) for ev, item in events: print('{} {}'.format(ev, item))