| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import absolute_import |
| | from __future__ import print_function |
| | from __future__ import unicode_literals |
| | from io import BytesIO |
| | import logging |
| | import sys |
| |
|
| | from .bintype import read_type |
| | from .compressed import decompress |
| | from .dataio import UINT32, Flags, Struct |
| | from .errors import InvalidOleStorageError |
| | from .errors import InvalidHwp5FileError |
| | from .storage import ItemWrapper |
| | from .storage import StorageWrapper |
| | from .storage import ItemConversionStorage |
| | from .storage import is_stream |
| | from .storage.ole import OleStorage |
| | from .summaryinfo import CLSID_HWP_SUMMARY_INFORMATION |
| | from .utils import GeneratorTextReader |
| | from .utils import cached_property |
| | from .utils import transcoder |
| |
|
| | PY3 = sys.version_info.major == 3 |
| | if PY3: |
| | basestring = str |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | HWP5_SIGNATURE = b'HWP Document File' + (b'\x00' * 15) |
| |
|
| |
|
| | class BYTES(type): |
| | def __new__(mcs, size): |
| | decode = staticmethod(lambda bytes, *args, **kwargs: bytes) |
| | return type.__new__(mcs, str('BYTES(%d)') % size, (str,), |
| | dict(fixed_size=size, decode=decode)) |
| |
|
| |
|
| | class VERSION(object): |
| | fixed_size = 4 |
| |
|
| | if PY3: |
| | def decode(cls, bytes): |
| | return (bytes[3], bytes[2], bytes[1], bytes[0]) |
| | else: |
| | def decode(cls, bytes): |
| | return (ord(bytes[3]), ord(bytes[2]), |
| | ord(bytes[1]), ord(bytes[0])) |
| | decode = classmethod(decode) |
| |
|
| |
|
| | class FileHeader(Struct): |
| | Flags = Flags(UINT32, |
| | 0, 'compressed', |
| | 1, 'password', |
| | 2, 'distributable', |
| | 3, 'script', |
| | 4, 'drm', |
| | 5, 'xmltemplate_storage', |
| | 6, 'history', |
| | 7, 'cert_signed', |
| | 8, 'cert_encrypted', |
| | 9, 'cert_signature_extra', |
| | 10, 'cert_drm', |
| | 11, 'ccl') |
| |
|
| | def attributes(cls): |
| | yield BYTES(32), 'signature' |
| | yield VERSION, 'version' |
| | yield cls.Flags, 'flags' |
| | yield BYTES(216), 'reserved' |
| | attributes = classmethod(attributes) |
| |
|
| |
|
| | def is_hwp5file(filename): |
| | ''' Test whether it is an HWP format v5 file. ''' |
| | try: |
| | olestg = OleStorage(filename) |
| | except InvalidOleStorageError: |
| | return False |
| | return storage_is_hwp5file(olestg) |
| |
|
| |
|
| | def storage_is_hwp5file(stg): |
| | try: |
| | fileheader = stg['FileHeader'] |
| | except KeyError: |
| | logger.info('stg has no FileHeader') |
| | return False |
| | fileheader = HwpFileHeader(fileheader) |
| | if fileheader.signature == HWP5_SIGNATURE: |
| | return True |
| | else: |
| | logger.info('fileheader.signature = %r', fileheader.signature) |
| | return False |
| |
|
| |
|
| | class CompressedStream(ItemWrapper): |
| |
|
| | def open(self): |
| | return decompress(self.wrapped.open()) |
| |
|
| |
|
| | class CompressedStorage(StorageWrapper): |
| | ''' decompress streams in the underlying storage ''' |
| | def __getitem__(self, name): |
| | item = self.wrapped[name] |
| | if is_stream(item): |
| | return CompressedStream(item) |
| | else: |
| | return item |
| |
|
| |
|
| | class PasswordProtectedStream(ItemWrapper): |
| |
|
| | def open(self): |
| | |
| | logger.warning('Password-encrypted stream: currently decryption is ' |
| | 'not supported') |
| | return self.wrapped.open() |
| |
|
| |
|
| | class PasswordProtectedStorage(StorageWrapper): |
| | def __getitem__(self, name): |
| | item = self.wrapped[name] |
| | if is_stream(item): |
| | return PasswordProtectedStream(item) |
| | else: |
| | return item |
| |
|
| |
|
| | class Hwp5PasswordProtectedDoc(ItemConversionStorage): |
| |
|
| | def resolve_conversion_for(self, name): |
| | if name in ('BinData', 'BodyText', 'Scripts', 'ViewText'): |
| | return PasswordProtectedStorage |
| | elif name in ('DocInfo', ): |
| | return PasswordProtectedStream |
| |
|
| |
|
| | class VersionSensitiveItem(ItemWrapper): |
| |
|
| | def __init__(self, item, version): |
| | ItemWrapper.__init__(self, item) |
| | self.version = version |
| |
|
| | def open(self): |
| | return self.wrapped.open() |
| |
|
| | def other_formats(self): |
| | return dict() |
| |
|
| |
|
| | class Hwp5FileBase(ItemConversionStorage): |
| | ''' Base of an Hwp5File. |
| | |
| | Hwp5FileBase checks basic validity of an HWP format v5 and provides |
| | `fileheader` property. |
| | |
| | :param stg: an OLE2 structured storage. |
| | :type stg: an instance of storage, OleFileIO or filename |
| | :raises InvalidHwp5FileError: `stg` is not a valid HWP format v5 document. |
| | ''' |
| |
|
| | def __init__(self, stg): |
| | if isinstance(stg, basestring): |
| | try: |
| | stg = OleStorage(stg) |
| | except InvalidOleStorageError: |
| | raise InvalidHwp5FileError('Not an OLE2 Compound Binary File.') |
| |
|
| | if not storage_is_hwp5file(stg): |
| | errormsg = 'Not an HWP Document format v5 storage.' |
| | raise InvalidHwp5FileError(errormsg) |
| |
|
| | ItemConversionStorage.__init__(self, stg) |
| |
|
| | def resolve_conversion_for(self, name): |
| | if name == 'FileHeader': |
| | return HwpFileHeader |
| |
|
| | def get_fileheader(self): |
| | return self['FileHeader'] |
| |
|
| | fileheader = cached_property(get_fileheader) |
| |
|
| | header = fileheader |
| |
|
| |
|
| | class Hwp5DistDocStream(VersionSensitiveItem): |
| |
|
| | def open(self): |
| | from hwp5.distdoc import decode |
| | encodedstream = self.wrapped.open() |
| | return decode(encodedstream) |
| |
|
| | def head_record(self): |
| | item = self.wrapped.open() |
| | from .recordstream import read_record |
| | return read_record(item, 0) |
| |
|
| | def head_record_stream(self): |
| | from .recordstream import record_to_json |
| | record = self.head_record() |
| | json = record_to_json(record) |
| | return GeneratorTextReader(iter([json])) |
| |
|
| | def head(self): |
| | record = self.head_record() |
| | return record['payload'] |
| |
|
| | def head_stream(self): |
| | return BytesIO(self.head()) |
| |
|
| | def head_sha1(self): |
| | from hwp5.distdoc import decode_head_to_sha1 |
| | payload = self.head() |
| | return decode_head_to_sha1(payload) |
| |
|
| | def head_key(self): |
| | from hwp5.distdoc import decode_head_to_key |
| | payload = self.head() |
| | return decode_head_to_key(payload) |
| |
|
| | def tail(self): |
| | item = self.wrapped.open() |
| | from .recordstream import read_record |
| | read_record(item, 0) |
| | assert 4 + 256 == item.tell() |
| | return item.read() |
| |
|
| | def tail_decrypted(self): |
| | from hwp5.distdoc import decrypt_tail |
| | key = self.head_key() |
| | tail = self.tail() |
| | return decrypt_tail(key, tail) |
| |
|
| | def tail_stream(self): |
| | return BytesIO(self.tail()) |
| |
|
| |
|
| | class Hwp5DistDocStorage(ItemConversionStorage): |
| |
|
| | def resolve_conversion_for(self, name): |
| | def conversion(item): |
| | return Hwp5DistDocStream(self.wrapped[name], None) |
| | return conversion |
| |
|
| |
|
| | class Hwp5DistDoc(ItemConversionStorage): |
| |
|
| | def resolve_conversion_for(self, name): |
| | if name in ('Scripts', 'ViewText'): |
| | return Hwp5DistDocStorage |
| |
|
| |
|
| | class Hwp5Compression(ItemConversionStorage): |
| | ''' handle compressed streams in HWPv5 files ''' |
| |
|
| | def resolve_conversion_for(self, name): |
| | if name in ('BinData', 'BodyText', 'ViewText'): |
| | return CompressedStorage |
| | elif name == 'DocInfo': |
| | return CompressedStream |
| | elif name == 'Scripts': |
| | return CompressedStorage |
| |
|
| |
|
| | class PreviewText(object): |
| |
|
| | def __init__(self, item): |
| | self.open = item.open |
| |
|
| | def other_formats(self): |
| | return {'.utf8': self.open_utf8} |
| |
|
| | def open_utf8(self): |
| | transcode = transcoder('utf-16le', 'utf-8') |
| | return transcode(self.open()) |
| |
|
| | def get_utf8(self): |
| | f = self.open_utf8() |
| | try: |
| | return f.read() |
| | finally: |
| | f.close() |
| |
|
| | utf8 = cached_property(get_utf8) |
| |
|
| | def get_text(self): |
| | fp = self.open() |
| | try: |
| | data = fp.read() |
| | finally: |
| | fp.close() |
| | return data.decode('utf-16le') |
| |
|
| | text = cached_property(get_text) |
| |
|
| | def __str__(self): |
| | if PY3: |
| | return self.text |
| | return self.utf8 |
| |
|
| | def __unicode__(self): |
| | return self.text |
| |
|
| |
|
| | class Sections(ItemConversionStorage): |
| |
|
| | section_class = VersionSensitiveItem |
| |
|
| | def __init__(self, stg, version): |
| | ItemConversionStorage.__init__(self, stg) |
| | self.version = version |
| |
|
| | def resolve_conversion_for(self, name): |
| | def conversion(item): |
| | return self.section_class(self.wrapped[name], self.version) |
| | return conversion |
| |
|
| | def other_formats(self): |
| | return dict() |
| |
|
| | def section(self, idx): |
| | return self['Section%d' % idx] |
| |
|
| | def section_indexes(self): |
| | def gen(): |
| | for name in self: |
| | if name.startswith('Section'): |
| | idx = name[len('Section'):] |
| | try: |
| | idx = int(idx) |
| | except: |
| | pass |
| | else: |
| | yield idx |
| | indexes = list(gen()) |
| | indexes.sort() |
| | return indexes |
| |
|
| | @property |
| | def sections(self): |
| | return list(self.section(idx) |
| | for idx in self.section_indexes()) |
| |
|
| |
|
| | class HwpFileHeader(object): |
| |
|
| | def __init__(self, item): |
| | self.open = item.open |
| |
|
| | def to_dict(self): |
| | f = self.open() |
| | try: |
| | return read_type(FileHeader, dict(), f) |
| | finally: |
| | f.close() |
| |
|
| | value = cached_property(to_dict) |
| |
|
| | def get_version(self): |
| | return self.value['version'] |
| |
|
| | version = cached_property(get_version) |
| |
|
| | def get_signature(self): |
| | return self.value['signature'] |
| |
|
| | signature = cached_property(get_signature) |
| |
|
| | def get_flags(self): |
| | return FileHeader.Flags(self.value['flags']) |
| |
|
| | flags = cached_property(get_flags) |
| |
|
| | def open_text(self): |
| | signature = self.value['signature'] |
| | signature = signature.decode('latin1') |
| | signature = signature[:len('HWP Document File')] |
| |
|
| | d = FileHeader.Flags.dictvalue(self.value['flags']) |
| | d['signature'] = signature |
| | d['version'] = '%d.%d.%d.%d' % self.value['version'] |
| | out = BytesIO() |
| | for k, v in sorted(d.items()): |
| | out.write('{}: {}\n'.format(k, v).encode('utf-8')) |
| | out.seek(0) |
| | return out |
| |
|
| | def other_formats(self): |
| | return {'.txt': self.open_text} |
| |
|
| |
|
| | class HwpSummaryInfo(VersionSensitiveItem): |
| |
|
| | def other_formats(self): |
| | return {'.txt': self.open_text} |
| |
|
| | def getPropertySetStream(self): |
| | from .msoleprops import PropertySetFormat |
| | from .msoleprops import PropertySetStreamReader |
| | from .summaryinfo import FMTID_HWP_SUMMARY_INFORMATION |
| | from .summaryinfo import HWP_PROPERTIES |
| |
|
| | propertySetFormat = PropertySetFormat( |
| | FMTID_HWP_SUMMARY_INFORMATION, |
| | HWP_PROPERTIES |
| | ) |
| | reader = PropertySetStreamReader([propertySetFormat]) |
| | f = self.open() |
| | try: |
| | return reader.read(f) |
| | finally: |
| | f.close() |
| |
|
| | propertySetStream = cached_property(getPropertySetStream) |
| |
|
| | def getHwpSummaryInfoPropertySet(self): |
| | stream = self.propertySetStream |
| | if stream.clsid == CLSID_HWP_SUMMARY_INFORMATION: |
| | return stream.propertysets[0] |
| |
|
| | propertySet = cached_property(getHwpSummaryInfoPropertySet) |
| |
|
| | @property |
| | def title(self): |
| | from .msoleprops import PIDSI_TITLE |
| | return self.propertySet[PIDSI_TITLE] |
| |
|
| | @property |
| | def subject(self): |
| | from .msoleprops import PIDSI_SUBJECT |
| | return self.propertySet[PIDSI_SUBJECT] |
| |
|
| | @property |
| | def author(self): |
| | from .msoleprops import PIDSI_AUTHOR |
| | return self.propertySet[PIDSI_AUTHOR] |
| |
|
| | @property |
| | def keywords(self): |
| | from .msoleprops import PIDSI_KEYWORDS |
| | return self.propertySet[PIDSI_KEYWORDS] |
| |
|
| | @property |
| | def comments(self): |
| | from .msoleprops import PIDSI_COMMENTS |
| | return self.propertySet[PIDSI_COMMENTS] |
| |
|
| | @property |
| | def lastSavedBy(self): |
| | from .msoleprops import PIDSI_LASTAUTHOR |
| | return self.propertySet[PIDSI_LASTAUTHOR] |
| |
|
| | @property |
| | def revisionNumber(self): |
| | from .msoleprops import PIDSI_REVNUMBER |
| | return self.propertySet[PIDSI_REVNUMBER] |
| |
|
| | @property |
| | def lastPrintedTime(self): |
| | from .msoleprops import PIDSI_LASTPRINTED |
| | return self.propertySet[PIDSI_LASTPRINTED] |
| |
|
| | @property |
| | def createdTime(self): |
| | from .msoleprops import PIDSI_CREATE_DTM |
| | return self.propertySet[PIDSI_CREATE_DTM] |
| |
|
| | @property |
| | def lastSavedTime(self): |
| | from .msoleprops import PIDSI_LASTSAVE_DTM |
| | return self.propertySet[PIDSI_LASTSAVE_DTM] |
| |
|
| | @property |
| | def numberOfPages(self): |
| | from .msoleprops import PIDSI_PAGECOUNT |
| | return self.propertySet[PIDSI_PAGECOUNT] |
| |
|
| | @property |
| | def dateString(self): |
| | from .summaryinfo import HWPPIDSI_DATE_STR |
| | return self.propertySet[HWPPIDSI_DATE_STR] |
| |
|
| | @property |
| | def numberOfParagraphs(self): |
| | from .summaryinfo import HWPPIDSI_PARACOUNT |
| | return self.propertySet[HWPPIDSI_PARACOUNT] |
| |
|
| | @property |
| | def plaintext_lines(self): |
| | from .msoleprops import PropertySetStreamTextFormatter |
| | stream = self.getPropertySetStream() |
| | formatter = PropertySetStreamTextFormatter() |
| | return formatter.formatTextLines(stream) |
| |
|
| | def open_text(self): |
| | out = BytesIO() |
| | for line in self.plaintext_lines: |
| | line = line.encode('utf-8') |
| | out.write(line + b'\n') |
| | out.seek(0) |
| | return out |
| |
|
| |
|
| | class Hwp5File(ItemConversionStorage): |
| | ''' represents HWPv5 File |
| | |
| | Hwp5File(stg) |
| | |
| | stg: an instance of Storage |
| | ''' |
| |
|
| | def __init__(self, stg): |
| | stg = Hwp5FileBase(stg) |
| |
|
| | if stg.header.flags.password: |
| | stg = Hwp5PasswordProtectedDoc(stg) |
| |
|
| | |
| | |
| | |
| | ItemConversionStorage.__init__(self, stg) |
| | return |
| |
|
| | if stg.header.flags.distributable: |
| | stg = Hwp5DistDoc(stg) |
| |
|
| | if stg.header.flags.compressed: |
| | stg = Hwp5Compression(stg) |
| |
|
| | ItemConversionStorage.__init__(self, stg) |
| |
|
| | def resolve_conversion_for(self, name): |
| | if name == 'DocInfo': |
| | return self.with_version(self.docinfo_class) |
| | if name == 'BodyText': |
| | return self.with_version(self.bodytext_class) |
| | if name == 'ViewText': |
| | return self.with_version(self.bodytext_class) |
| | if name == 'PrvText': |
| | return PreviewText |
| | if name == '\005HwpSummaryInformation': |
| | return self.with_version(self.summaryinfo_class) |
| |
|
| | def with_version(self, f): |
| | def wrapped(item): |
| | return f(item, self.header.version) |
| | return wrapped |
| |
|
| | summaryinfo_class = HwpSummaryInfo |
| | docinfo_class = VersionSensitiveItem |
| | bodytext_class = Sections |
| |
|
| | @cached_property |
| | def summaryinfo(self): |
| | return self['\005HwpSummaryInformation'] |
| |
|
| | @cached_property |
| | def docinfo(self): |
| | return self['DocInfo'] |
| |
|
| | @cached_property |
| | def preview_text(self): |
| | return self['PrvText'] |
| |
|
| | @cached_property |
| | def bodytext(self): |
| | return self['BodyText'] |
| |
|
| | @cached_property |
| | def viewtext(self): |
| | return self['ViewText'] |
| |
|
| | @property |
| | def text(self): |
| | if self.header.flags.distributable: |
| | return self.viewtext |
| | else: |
| | return self.bodytext |
| |
|