| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | import logging |
| | import os |
| |
|
| |
|
| | |
| | logger = logging.getLogger('hwp5') |
| |
|
| | loglevel = os.environ.get('PYHWP_LOGLEVEL') |
| | if loglevel: |
| | loglevel = dict(DEBUG=logging.DEBUG, |
| | INFO=logging.INFO, |
| | WARNING=logging.WARNING, |
| | ERROR=logging.ERROR, |
| | CRITICAL=logging.CRITICAL).get(loglevel.upper(), |
| | logging.WARNING) |
| | logger.setLevel(loglevel) |
| | del loglevel |
| |
|
| | filename = os.environ.get('PYHWP_LOGFILE') |
| | if filename: |
| | logger.addHandler(logging.FileHandler(filename)) |
| | del filename |
| |
|
| |
|
| | import sys |
| | logger.info('sys.executable = %s', sys.executable) |
| | logger.info('sys.version = %s', sys.version) |
| | logger.info('sys.path:') |
| | for path in sys.path: |
| | logger.info('- %s', path) |
| |
|
| |
|
| | try: |
| | import uno |
| | import unohelper |
| | import unokit |
| | from unokit.util import propseq_to_dict |
| | from unokit.util import dict_to_propseq |
| | from unokit.util import xenumeration_list |
| | from unokit.adapters import InputStreamFromFileLike |
| |
|
| | from com.sun.star.lang import XInitialization |
| | from com.sun.star.document import XFilter, XImporter, XExtendedFilterDetection |
| | from com.sun.star.task import XJobExecutor |
| |
|
| | def log_exception(f): |
| | def wrapper(*args, **kwargs): |
| | try: |
| | return f(*args, **kwargs) |
| | except Exception as e: |
| | logger.exception(e) |
| | raise |
| | return wrapper |
| |
|
| | g_ImplementationHelper = unohelper.ImplementationHelper() |
| |
|
| | def implementation(component_name, *services): |
| | def decorator(cls): |
| | g_ImplementationHelper.addImplementation(cls, component_name, services) |
| | return cls |
| | return decorator |
| |
|
| |
|
| | @implementation('hwp5.Detector', 'com.sun.star.document.ExtendedTypeDetection') |
| | class Detector(unokit.Base, XExtendedFilterDetection): |
| |
|
| | @log_exception |
| | @unokit.component_context |
| | def detect(self, mediadesc): |
| | from hwp5_uno import typedetect |
| |
|
| | logger.info('hwp5.Detector detect()') |
| |
|
| | desc = propseq_to_dict(mediadesc) |
| | for k, v in desc.items(): |
| | logger.debug('\t%s: %s', k, v) |
| |
|
| | inputstream = desc['InputStream'] |
| |
|
| | typename = typedetect(inputstream) |
| |
|
| | logger.info('hwp5.Detector: %s detected.', typename) |
| | return typename, mediadesc |
| |
|
| |
|
| | @implementation('hwp5.Importer', 'com.sun.star.document.ImportFilter') |
| | class Importer(unokit.Base, XInitialization, XFilter, XImporter): |
| |
|
| | @log_exception |
| | @unokit.component_context |
| | def initialize(self, args): |
| | logger.debug('Importer initialize: %s', args) |
| |
|
| | @log_exception |
| | @unokit.component_context |
| | def setTargetDocument(self, target): |
| | logger.debug('Importer setTargetDocument: %s', target) |
| | self.target = target |
| |
|
| | @log_exception |
| | @unokit.component_context |
| | def filter(self, mediadesc): |
| | from hwp5.dataio import ParseError |
| | from hwp5_uno import HwpFileFromInputStream |
| | from hwp5_uno import load_hwp5file_into_doc |
| |
|
| | logger.debug('Importer filter') |
| | desc = propseq_to_dict(mediadesc) |
| |
|
| | logger.debug('mediadesc: %s', str(desc.keys())) |
| | for k, v in desc.items(): |
| | logger.debug('%s: %s', k, str(v)) |
| |
|
| | statusindicator = desc.get('StatusIndicator') |
| |
|
| | inputstream = desc['InputStream'] |
| | hwpfile = HwpFileFromInputStream(inputstream) |
| | try: |
| | load_hwp5file_into_doc(hwpfile, self.target, statusindicator) |
| | except ParseError as e: |
| | e.print_to_logger(logger) |
| | return False |
| | except Exception as e: |
| | logger.exception(e) |
| | return False |
| | else: |
| | return True |
| |
|
| | @unokit.component_context |
| | def cancel(self): |
| | logger.debug('Importer cancel') |
| |
|
| |
|
| | @implementation('hwp5.TestJob') |
| | class TestJob(unokit.Base, XJobExecutor): |
| |
|
| | @unokit.component_context |
| | def trigger(self, args): |
| | logger.debug('testjob %s', args) |
| |
|
| | wd = args |
| |
|
| | import os |
| | original_wd = os.getcwd() |
| | try: |
| | os.chdir(wd) |
| |
|
| | from unittest import TextTestRunner |
| | testrunner = TextTestRunner() |
| |
|
| | from unittest import TestSuite |
| | testrunner.run(TestSuite(self.tests())) |
| | finally: |
| | os.chdir(original_wd) |
| |
|
| | def tests(self): |
| | from unittest import defaultTestLoader |
| | yield defaultTestLoader.loadTestsFromTestCase(DetectorTest) |
| | yield defaultTestLoader.loadTestsFromTestCase(ImporterTest) |
| | from hwp5_uno.tests import test_hwp5_uno |
| | yield defaultTestLoader.loadTestsFromModule(test_hwp5_uno) |
| | from hwp5.tests import test_suite |
| | yield test_suite() |
| |
|
| |
|
| | from unittest import TestCase |
| | class DetectorTest(TestCase): |
| |
|
| | def test_detect(self): |
| | context = uno.getComponentContext() |
| |
|
| | from hwp5.tests.fixtures import open_fixture |
| | f = open_fixture('sample-5017.hwp', 'rb') |
| | stream = InputStreamFromFileLike(f) |
| | mediadesc = dict_to_propseq(dict(InputStream=stream)) |
| |
|
| | svm = context.ServiceManager |
| | detector = svm.createInstanceWithContext('hwp5.Detector', context) |
| | typename, mediadesc2 = detector.detect(mediadesc) |
| | self.assertEqual('hwp5', typename) |
| |
|
| | class ImporterTest(TestCase): |
| |
|
| | def test_filter(self): |
| | context = uno.getComponentContext() |
| | from hwp5.tests.fixtures import open_fixture |
| | f = open_fixture('sample-5017.hwp', 'rb') |
| | stream = InputStreamFromFileLike(f) |
| | mediadesc = dict_to_propseq(dict(InputStream=stream)) |
| |
|
| | svm = context.ServiceManager |
| | importer = svm.createInstanceWithContext('hwp5.Importer', context) |
| | desktop = svm.createInstanceWithContext('com.sun.star.frame.Desktop', |
| | context) |
| | doc = desktop.loadComponentFromURL('private:factory/swriter', '_blank', |
| | 0, ()) |
| |
|
| | importer.setTargetDocument(doc) |
| | importer.filter(mediadesc) |
| |
|
| | text = doc.getText() |
| |
|
| | paragraphs = text.createEnumeration() |
| | paragraphs = xenumeration_list(paragraphs) |
| | for paragraph_ix, paragraph in enumerate(paragraphs): |
| | logger.info('Paragraph %s', paragraph_ix) |
| | logger.debug('%s', paragraph) |
| |
|
| | services = paragraph.SupportedServiceNames |
| | if 'com.sun.star.text.Paragraph' in services: |
| | portions = xenumeration_list(paragraph.createEnumeration()) |
| | for portion_ix, portion in enumerate(portions): |
| | logger.info('Portion %s: %s', portion_ix, |
| | portion.TextPortionType) |
| | if portion.TextPortionType == 'Text': |
| | logger.info('- %s', portion.getString()) |
| | elif portion.TextPortionType == 'Frame': |
| | logger.debug('%s', portion) |
| | textcontent_name = 'com.sun.star.text.TextContent' |
| | en = portion.createContentEnumeration(textcontent_name) |
| | contents = xenumeration_list(en) |
| | for content in contents: |
| | logger.debug('content: %s', content) |
| | content_services = content.SupportedServiceNames |
| | if ('com.sun.star.drawing.GraphicObjectShape' in |
| | content_services): |
| | logger.info('graphic url: %s', |
| | content.GraphicURL) |
| | logger.info('graphic stream url: %s', |
| | content.GraphicStreamURL) |
| | if 'com.sun.star.text.TextTable' in services: |
| | pass |
| | else: |
| | pass |
| |
|
| | paragraph_portions = paragraphs[0].createEnumeration() |
| | paragraph_portions = xenumeration_list(paragraph_portions) |
| | self.assertEqual(u'한글 ', paragraph_portions[0].getString()) |
| |
|
| | paragraph_portions = paragraphs[16].createEnumeration() |
| | paragraph_portions = xenumeration_list(paragraph_portions) |
| | contents = paragraph_portions[1].createContentEnumeration('com.sun.star.text.TextContent') |
| | contents = xenumeration_list(contents) |
| | self.assertEqual('image/x-vclgraphic', contents[0].Bitmap.MimeType) |
| | |
| | |
| |
|
| | graphics = doc.getGraphicObjects() |
| | graphics = xenumeration_list(graphics.createEnumeration()) |
| | logger.debug('graphic: %s', graphics) |
| |
|
| | frames = doc.getTextFrames() |
| | frames = xenumeration_list(frames.createEnumeration()) |
| | logger.debug('frames: %s', frames) |
| | except Exception as e: |
| | logger.exception(e) |
| | raise |
| |
|