| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import absolute_import |
| | from __future__ import print_function |
| | from __future__ import unicode_literals |
| | from collections import deque |
| | from pprint import pprint |
| | import logging |
| | import struct |
| | import sys |
| |
|
| | from .dataio import BSTR |
| | from .dataio import FixedArrayType |
| | from .dataio import FlagsType |
| | from .dataio import ParseError |
| | from .dataio import SelectiveType |
| | from .dataio import StructType |
| | from .dataio import VariableLengthArrayType |
| | from .dataio import X_ARRAY |
| | from .dataio import readn |
| | from .treeop import STARTEVENT, ENDEVENT |
| | from .treeop import iter_subevents |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def bintype_map_events(bin_item): |
| | bin_type = bin_item['type'] |
| | if isinstance(bin_type, StructType): |
| | yield STARTEVENT, bin_item |
| | if hasattr(bin_type, 'members'): |
| | for member in bin_type.members: |
| | for x in bintype_map_events(member): |
| | yield x |
| | yield ENDEVENT, bin_item |
| | elif isinstance(bin_type, FixedArrayType): |
| | yield STARTEVENT, bin_item |
| | item = dict(type=bin_type.itemtype) |
| | for x in bintype_map_events(item): |
| | yield x |
| | yield ENDEVENT, bin_item |
| | elif isinstance(bin_type, VariableLengthArrayType): |
| | yield STARTEVENT, bin_item |
| | item = dict(type=bin_type.itemtype) |
| | for x in bintype_map_events(item): |
| | yield x |
| | yield ENDEVENT, bin_item |
| | elif isinstance(bin_type, X_ARRAY): |
| | yield STARTEVENT, bin_item |
| | item = dict(type=bin_type.itemtype) |
| | for x in bintype_map_events(item): |
| | yield x |
| | yield ENDEVENT, bin_item |
| | elif isinstance(bin_type, SelectiveType): |
| | yield STARTEVENT, bin_item |
| | for k, v in bin_type.selections.items(): |
| | item = dict(bin_item, select_when=k, type=v) |
| | for x in bintype_map_events(item): |
| | yield x |
| | yield ENDEVENT, bin_item |
| | elif isinstance(bin_type, FlagsType): |
| | |
| | |
| | |
| | bin_item['bin_type'] = bin_type.basetype |
| | bin_item['flags_type'] = bin_type |
| | yield None, bin_item |
| | else: |
| | yield None, bin_item |
| |
|
| |
|
| | def filter_with_version(events, version): |
| | for ev, item in events: |
| | required_version = item.get('version') |
| | if required_version is not None and version < required_version: |
| | |
| | logger.debug('skip following: (required version: %s)', |
| | required_version) |
| | logger.debug(' %s', (ev, item)) |
| | if ev is STARTEVENT: |
| | for x in iter_subevents(events): |
| | pass |
| | continue |
| | yield ev, item |
| |
|
| |
|
| | def make_items_immutable(events): |
| | stack = [] |
| | for ev, item in events: |
| | if ev is None: |
| | item = tuple(sorted(item.items())) |
| | elif ev is STARTEVENT: |
| | item = tuple(sorted(item.items())) |
| | stack.append(item) |
| | elif ev is ENDEVENT: |
| | item = stack.pop() |
| | yield ev, item |
| |
|
| |
|
| | def compile_type_definition(bin_item): |
| | events = bintype_map_events(bin_item) |
| | events = make_items_immutable(events) |
| | return tuple(events) |
| |
|
| |
|
| | master_typedefs = dict() |
| |
|
| |
|
| | def get_compiled_typedef(type): |
| | if type not in master_typedefs: |
| | logger.info('compile typedef of %s', type) |
| | typedef_events = compile_type_definition(dict(type=type)) |
| | master_typedefs[type] = typedef_events |
| | return master_typedefs[type] |
| |
|
| |
|
| | versioned_typedefs = dict() |
| |
|
| |
|
| | def get_compiled_typedef_with_version(type, version): |
| | if version not in versioned_typedefs: |
| | versioned_typedefs[version] = typedefs = dict() |
| | typedefs = versioned_typedefs[version] |
| |
|
| | if type not in typedefs: |
| | logger.info('filter compiled typedef of %s with version %s', |
| | type, version) |
| | typedef_events = get_compiled_typedef(type) |
| | events = static_to_mutable(typedef_events) |
| | events = filter_with_version(events, version) |
| | events = make_items_immutable(events) |
| | events = tuple(events) |
| | typedefs[type] = events |
| |
|
| | return typedefs[type] |
| |
|
| |
|
| | class ERROREVENT(object): |
| | pass |
| |
|
| |
|
| | def static_to_mutable(events): |
| | stack = [] |
| | for ev, item in events: |
| | if ev is None: |
| | item = dict(item) |
| | elif ev is STARTEVENT: |
| | item = dict(item) |
| | stack.append(item) |
| | elif ev is ENDEVENT: |
| | item = stack.pop() |
| | yield ev, item |
| |
|
| |
|
| | def pop_subevents(events_deque): |
| | level = 0 |
| | while len(events_deque) > 0: |
| | event, item = events_deque.popleft() |
| | yield event, item |
| | if event is STARTEVENT: |
| | level += 1 |
| | elif event is ENDEVENT: |
| | if level > 0: |
| | level -= 1 |
| | else: |
| | return |
| |
|
| |
|
| | def resolve_typedefs(typedef_events, context): |
| |
|
| | array_types = (X_ARRAY, VariableLengthArrayType, FixedArrayType) |
| |
|
| | stack = [] |
| | selective_stack = [] |
| |
|
| | events = static_to_mutable(typedef_events) |
| | events = deque(events) |
| | while len(events) > 0: |
| | ev, item = events.popleft() |
| | if isinstance(item['type'], SelectiveType): |
| | if ev is STARTEVENT: |
| | parent_struct = stack[-1] |
| | struct_value = parent_struct['value'] |
| | selector_reference = item['type'].selector_reference |
| | select_key = selector_reference(context, struct_value) |
| | logger.debug('select_key: %s', select_key) |
| | item['select_key'] = select_key |
| | selective_stack.append(item) |
| | elif ev is ENDEVENT: |
| | selective_stack.pop() |
| | else: |
| | assert False |
| | elif 'select_when' in item: |
| | assert ev in (None, STARTEVENT) |
| | select_key = selective_stack[-1]['select_key'] |
| | select_when = item.pop('select_when') |
| | if select_when != select_key: |
| | |
| | logger.debug('skip following: (select key %r != %r)', |
| | select_key, select_when) |
| | logger.debug(' %s', (ev, item)) |
| | if ev is STARTEVENT: |
| | for x in pop_subevents(events): |
| | logger.debug(' %s', x) |
| | pass |
| | continue |
| | logger.debug('selected for: %r', select_when) |
| | events.appendleft((ev, item)) |
| | elif 'condition' in item: |
| | assert ev in (STARTEVENT, None) |
| | condition = item.pop('condition') |
| | parent_struct = stack[-1] |
| | if not condition(context, parent_struct['value']): |
| | |
| | logger.debug('skip following: (not matched condition: %s)', |
| | condition) |
| | logger.debug(' %s', (ev, item)) |
| | if ev is STARTEVENT: |
| | for x in pop_subevents(events): |
| | logger.debug(' %s', x) |
| | pass |
| | continue |
| | events.appendleft((ev, item)) |
| | elif isinstance(item['type'], array_types) and 'count' not in item: |
| | assert ev is STARTEVENT |
| |
|
| | if isinstance(item['type'], X_ARRAY): |
| | parent_struct = stack[-1] |
| | struct_value = parent_struct['value'] |
| |
|
| | count_reference = item['type'].count_reference |
| | count = count_reference(context, struct_value) |
| | elif isinstance(item['type'], VariableLengthArrayType): |
| | count = dict(type=item['type'].counttype, dontcollect=True) |
| | yield None, count |
| | count = count['value'] |
| | elif isinstance(item['type'], FixedArrayType): |
| | count = item['type'].size |
| | item['count'] = count |
| |
|
| | subevents = list(pop_subevents(events)) |
| | endevent = subevents[-1] |
| | subevents = subevents[:-1] |
| |
|
| | def clone(events): |
| | stack = [] |
| | for ev, item in events: |
| | if ev in (STARTEVENT, None): |
| | item = dict(item) |
| | if ev is STARTEVENT: |
| | stack.append(item) |
| | else: |
| | item = stack.pop() |
| | yield ev, item |
| |
|
| | events.appendleft(endevent) |
| | for _ in range(0, count): |
| | cloned = list(clone(subevents)) |
| | events.extendleft(reversed(cloned)) |
| | events.appendleft((ev, item)) |
| | else: |
| | if ev is STARTEVENT: |
| | stack.append(item) |
| | elif ev is ENDEVENT: |
| | stack.pop() |
| | yield ev, item |
| |
|
| |
|
| | def evaluate_bin_values(events): |
| | for ev, item in events: |
| | if 'flags_type' in item: |
| | flags_type = item['flags_type'] |
| | assert isinstance(flags_type, FlagsType) |
| | item['value'] = flags_type(item['value']) |
| | yield ev, item |
| |
|
| |
|
| | def construct_composite_values(events): |
| |
|
| | stack = [] |
| |
|
| | for ev, item in events: |
| | if ev is STARTEVENT: |
| | if isinstance(item['type'], StructType): |
| | item['value'] = dict() |
| | elif isinstance(item['type'], (X_ARRAY, VariableLengthArrayType, |
| | FixedArrayType)): |
| | item['value'] = list() |
| | else: |
| | assert False |
| | stack.append(item) |
| | elif ev in (None, ENDEVENT): |
| | if ev is ENDEVENT: |
| | item = stack.pop() |
| | if isinstance(item['type'], FixedArrayType): |
| | item['value'] = tuple(item['value']) |
| |
|
| | if len(stack) > 0: |
| | if not item.get('dontcollect', False): |
| | if isinstance(stack[-1]['type'], StructType): |
| | |
| | stack[-1]['value'][item['name']] = item['value'] |
| | elif isinstance(stack[-1]['type'], |
| | (X_ARRAY, |
| | VariableLengthArrayType, |
| | FixedArrayType)): |
| | stack[-1]['value'].append(item['value']) |
| | yield ev, item |
| |
|
| |
|
| | def log_events(events, log_fn): |
| | for ev, item in events: |
| | if ev in (STARTEVENT, ENDEVENT): |
| | fmt = ['%s:'] |
| | val = [ev.__name__] |
| | else: |
| | fmt = [' %04x:'] |
| | val = [item['bin_offset']] |
| |
|
| | fmt.append('%s') |
| | val.append(item['type'].__name__) |
| |
|
| | if 'name' in item: |
| | fmt.append('%r') |
| | val.append(str(item['name'])) |
| |
|
| | if 'value' in item and ev is None: |
| | fmt.append('%r') |
| | val.append(item['value']) |
| |
|
| | if 'exception' in item: |
| | fmt.append('-- Exception: %r') |
| | val.append(item['exception']) |
| |
|
| | log_fn(' '.join(fmt), *val) |
| | yield ev, item |
| |
|
| |
|
| | def eval_typedef_events(typedef_events, context, resolve_values): |
| | events = static_to_mutable(typedef_events) |
| | events = resolve_typedefs(events, context) |
| | events = resolve_values(events) |
| | events = evaluate_bin_values(events) |
| | events = construct_composite_values(events) |
| | events = log_events(events, logger.debug) |
| | return events |
| |
|
| |
|
| | def resolve_values_from_stream(stream): |
| | def resolve_values(events): |
| | for ev, item in events: |
| | if ev is None: |
| | item['bin_offset'] = stream.tell() |
| | try: |
| | item['value'] = resolve_value_from_stream(item, stream) |
| | except Exception as e: |
| | item['exception'] = e |
| | ev = ERROREVENT |
| | yield ev, item |
| | return resolve_values |
| |
|
| |
|
| | def resolve_value_from_stream(item, stream): |
| | from hwp5.binmodel import ParaTextChunks |
| | from hwp5.binmodel import CHID |
| | if 'bin_type' in item: |
| | item_type = item['bin_type'] |
| | else: |
| | item_type = item['type'] |
| | if hasattr(item_type, 'binfmt'): |
| | binfmt = item_type.binfmt |
| | binsize = struct.calcsize(binfmt) |
| | bytes = readn(stream, binsize) |
| | unpacked = struct.unpack(binfmt, bytes) |
| | return unpacked[0] |
| | elif item_type is CHID: |
| | bytes = readn(stream, 4) |
| | return CHID.decode(bytes) |
| | elif item_type is BSTR: |
| | return BSTR.read(stream) |
| | elif item_type is ParaTextChunks: |
| | return ParaTextChunks.read(stream) |
| | elif hasattr(item_type, 'fixed_size'): |
| | bytes = readn(stream, item_type.fixed_size) |
| | if hasattr(item_type, 'decode'): |
| | return item_type.decode(bytes) |
| | return bytes |
| | else: |
| | assert hasattr(item_type, 'read') |
| | logger.warning('%s: item type relies on its read() to resolve a value', |
| | item_type.__name__) |
| | return item_type.read(stream) |
| |
|
| |
|
| | def resolve_type_events(type, context, resolve_values): |
| | |
| | |
| | if 'version' in context: |
| | version = context['version'] |
| | events = get_compiled_typedef_with_version(type, version) |
| | else: |
| | events = get_compiled_typedef(type) |
| |
|
| | |
| | return eval_typedef_events(events, context, resolve_values) |
| |
|
| |
|
| | def read_type_events(type, context, stream): |
| | resolve_values = resolve_values_from_stream(stream) |
| | events = resolve_type_events(type, context, resolve_values) |
| | for ev, item in events: |
| | yield ev, item |
| | if ev is ERROREVENT: |
| | e = item['exception'] |
| | msg = 'can\'t parse %s' % type |
| | pe = ParseError(msg) |
| | pe.cause = e |
| | pe.path = context.get('path') |
| | pe.treegroup = context.get('treegroup') |
| | pe.record = context.get('record') |
| | pe.offset = item.get('bin_offset') |
| | raise pe |
| |
|
| |
|
| | def read_type_item(type, context, stream, binevents=None): |
| | if binevents is None: |
| | binevents = [] |
| | try: |
| | binevents.extend(read_type_events(type, context, stream)) |
| | except ParseError as e: |
| | e.binevents = binevents |
| | raise |
| | return binevents[-1][1] |
| |
|
| |
|
| | def read_type(type, context, stream, binevents=None): |
| | item = read_type_item(type, context, stream, binevents) |
| | return item['value'] |
| |
|
| |
|
| | def dump_events(events): |
| | def prefix_level(event_prefixed_items): |
| | level = 0 |
| | for ev, item in event_prefixed_items: |
| | if ev is STARTEVENT: |
| | yield level, item |
| | level += 1 |
| | elif ev is ENDEVENT: |
| | level -= 1 |
| | else: |
| | yield level, item |
| |
|
| | def item_to_dict(events): |
| | for ev, item in events: |
| | yield ev, dict(item) |
| |
|
| | def type_to_string(events): |
| | for ev, item in events: |
| | item['type'] = item['type'].__name__ |
| | yield ev, item |
| |
|
| | def condition_to_string(events): |
| | for ev, item in events: |
| | if 'condition' in item: |
| | item['condition'] = item['condition'].__name__ |
| | yield ev, item |
| |
|
| | events = item_to_dict(events) |
| | events = type_to_string(events) |
| | events = condition_to_string(events) |
| | for level, item in prefix_level(events): |
| | indents = '' |
| | if level > 0: |
| | if level > 1: |
| | indents = ' ' * (level - 2) + ' ' |
| | indents += '- ' |
| | print('{}{}'.format(indents, item)) |
| |
|
| |
|
| | def main(): |
| | logger.setLevel(logging.DEBUG) |
| | logger.addHandler(logging.StreamHandler()) |
| |
|
| | import hwp5.binmodel |
| | name = sys.argv[1] |
| | type = getattr(hwp5.binmodel, name) |
| | typedef_events = compile_type_definition(dict(type=type)) |
| | pprint(typedef_events) |
| |
|
| | context = {} |
| |
|
| | def resolve_values(events): |
| | for ev, item in events: |
| | if ev is None: |
| | print('') |
| | for k, v in sorted(item.items()): |
| | print('- {} : {}'.format(k, v)) |
| | value = raw_input('>> ') |
| | value = eval(value) |
| | if isinstance(item['type'], FlagsType): |
| | value = item['type'](value) |
| | item['value'] = value |
| | yield ev, item |
| | events = eval_typedef_events(typedef_events, context, resolve_values) |
| | for ev, item in events: |
| | print('{} {}'.format(ev, item)) |
| |
|