Spaces:
Running
Running
| # -*- coding: utf-8 -*- | |
| # | |
| # pyhwp : hwp file format parser in python | |
| # Copyright (C) 2010-2023 mete0r <https://github.com/mete0r> | |
| # | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU Affero General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU Affero General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU Affero General Public License | |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| # | |
| from __future__ import absolute_import | |
| from __future__ import print_function | |
| from __future__ import unicode_literals | |
| import io | |
| import logging | |
| import os.path | |
| import sys | |
| from ...errors import InvalidOleStorageError | |
| PY3 = sys.version_info.major == 3 | |
| if PY3: | |
| basestring = str | |
| logger = logging.getLogger(__name__) | |
| enabled = False | |
| def is_enabled(): | |
| if 'PYHWP_PLAT_UNO' in os.environ: | |
| PYHWP_PLAT_UNO = os.environ['PYHWP_PLAT_UNO'].strip() | |
| try: | |
| forced = bool(int(PYHWP_PLAT_UNO)) | |
| logger.debug('%s: forced to be %s by PYHWP_PLAT_UNO', __name__, | |
| 'enabled' if forced else 'disabled') | |
| return forced | |
| except: | |
| logger.warning('PYHWP_PLAT_UNO=%s (invalid)', PYHWP_PLAT_UNO) | |
| logger.debug('%s: is %s', __name__, 'enabled' if enabled else 'disabled') | |
| return enabled | |
| def enable(): | |
| import uno | |
| import unohelper | |
| g = globals() | |
| g['uno'] = uno | |
| g['unohelper'] = unohelper | |
| g['enabled'] = True | |
| logger.info('%s: enabled.', __name__) | |
| def disable(): | |
| global enabled | |
| enabled = False | |
| logger.info('%s: disabled.', __name__) | |
| def XSLTTransformer(context, stylesheet_url, source_url, source_url_base): | |
| from com.sun.star.beans import NamedValue | |
| from hwp5.plat._uno.services import css | |
| args = (NamedValue('StylesheetURL', stylesheet_url), | |
| NamedValue('SourceURL', source_url), | |
| NamedValue('SourceBaseURL', source_url_base)) | |
| select = os.environ.get('PYHWP_PLAT_UNO_XSLT', 'libxslt') | |
| logger.debug('PYHWP_PLAT_UNO_XSLT = %s', select) | |
| if select == 'jaxthelper': | |
| logger.debug('%s.xslt: using css.comp.JAXTHelper', __name__) | |
| return css.comp.JAXTHelper(context, *args) | |
| else: | |
| logger.debug('%s.xslt: using %s', __name__, | |
| 'css.comp.documentconversion.LibXSLTTransformer') | |
| return css.comp.documentconversion.LibXSLTTransformer(context, *args) | |
| class OneshotEvent(object): | |
| def __init__(self): | |
| pin, pout = os.pipe() | |
| self.pin = os.fdopen(pin, 'r') | |
| self.pout = os.fdopen(pout, 'w') | |
| def wait(self): | |
| self.pin.read() | |
| self.pin.close() | |
| def signal(self): | |
| self.pout.close() | |
| class XSLT(object): | |
| def __init__(self, context): | |
| self.context = context | |
| def __call__(self, xsl_path, inp_path, out_path): | |
| import uno | |
| import unohelper | |
| from hwp5.plat._uno import ucb | |
| from hwp5.plat._uno.adapters import OutputStreamToFileLike | |
| xsl_path = os.path.abspath(xsl_path) | |
| xsl_url = uno.systemPathToFileUrl(xsl_path) | |
| inp_path = os.path.abspath(inp_path) | |
| inp_url = uno.systemPathToFileUrl(inp_path) | |
| inp_stream = ucb.open_url(self.context, inp_url) | |
| out_path = os.path.abspath(out_path) | |
| with io.open(out_path, 'wb') as out_file: | |
| out_stream = OutputStreamToFileLike(out_file, dontclose=True) | |
| from com.sun.star.io import XStreamListener | |
| class XSLTListener(unohelper.Base, XStreamListener): | |
| def __init__(self): | |
| self.event = OneshotEvent() | |
| def started(self): | |
| logger.info('XSLT started') | |
| def closed(self): | |
| logger.info('XSLT closed') | |
| self.event.signal() | |
| def terminated(self): | |
| logger.info('XSLT terminated') | |
| self.event.signal() | |
| def error(self, exception): | |
| logger.error('XSLT error: %s', exception) | |
| self.event.signal() | |
| def disposing(self, source): | |
| logger.info('XSLT disposing: %s', source) | |
| self.event.signal() | |
| listener = XSLTListener() | |
| transformer = XSLTTransformer(self.context, xsl_url, '', '') | |
| transformer.InputStream = inp_stream | |
| transformer.OutputStream = out_stream | |
| transformer.addListener(listener) | |
| transformer.start() | |
| xsl_name = os.path.basename(xsl_path) | |
| logger.info('xslt.soffice(%s) start', xsl_name) | |
| try: | |
| listener.event.wait() | |
| finally: | |
| logger.info('xslt.soffice(%s) end', xsl_name) | |
| transformer.removeListener(listener) | |
| return dict() | |
| def xslt(xsl_path, inp_path, out_path): | |
| import uno | |
| context = uno.getComponentContext() | |
| xslt = XSLT(context) | |
| return xslt(xsl_path, inp_path, out_path) | |
| def oless_from_filename(filename): | |
| inputstream = inputstream_from_filename(filename) | |
| return oless_from_inputstream(inputstream) | |
| def inputstream_from_filename(filename): | |
| f = io.open(filename, 'rb') | |
| from hwp5.plat._uno.adapters import InputStreamFromFileLike | |
| return InputStreamFromFileLike(f) | |
| def oless_from_inputstream(inputstream): | |
| import uno | |
| context = uno.getComponentContext() | |
| sm = context.ServiceManager | |
| name = 'com.sun.star.embed.OLESimpleStorage' | |
| args = (inputstream, ) | |
| return sm.createInstanceWithArgumentsAndContext(name, args, context) | |
| class OleStorage(object): | |
| def __init__(self, stg): | |
| ''' an OLESimpleStorage to hwp5 storage adapter. | |
| :param stg: a filename or an instance of OLESimpleStorage | |
| ''' | |
| if isinstance(stg, basestring): | |
| self.oless = oless_from_filename(stg) | |
| try: | |
| self.oless.getElementNames() | |
| except: | |
| errormsg = 'Not a valid OLE2 Compound Binary File.' | |
| raise InvalidOleStorageError(errormsg) | |
| else: | |
| # TODO assert stg is an instance of OLESimpleStorage | |
| self.oless = stg | |
| def __iter__(self): | |
| return iter(self.oless.getElementNames()) | |
| def __getitem__(self, name): | |
| from com.sun.star.container import NoSuchElementException | |
| try: | |
| elem = self.oless.getByName(name) | |
| except NoSuchElementException: | |
| raise KeyError(name) | |
| services = elem.SupportedServiceNames | |
| if 'com.sun.star.embed.OLESimpleStorage' in services: | |
| return OleStorage(elem) | |
| else: | |
| elem.closeInput() | |
| return OleStorageStream(self.oless, name) | |
| def close(self): | |
| return | |
| # TODO | |
| # if this is root, close underlying olefile | |
| if self.path == '': | |
| # old version of OleFileIO has no close() | |
| if hasattr(self.olefile, 'close'): | |
| self.olefile.close() | |
| class OleStorageStream(object): | |
| def __init__(self, oless, name): | |
| self.oless = oless | |
| self.name = name | |
| def open(self): | |
| from hwp5.plat._uno.adapters import FileFromStream | |
| stream = self.oless.getByName(self.name) | |
| return FileFromStream(stream) | |