Spaces:
Configuration error
Configuration error
| # -*- coding: utf-8 -*- | |
| # | |
| # pyhwp : hwp file format parser in python | |
| # Copyright (C) 2010-2023 mete0r <https://github.com/mete0r> | |
| # | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU Affero General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU Affero General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU Affero General Public License | |
| # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
| # | |
| from __future__ import absolute_import | |
| from __future__ import print_function | |
| from __future__ import unicode_literals | |
| from contextlib import contextmanager | |
| from functools import partial | |
| import codecs | |
| import logging | |
| import os | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from .importhelper import pkg_resources_filename | |
| PY3 = sys.version_info.major == 3 | |
| logger = logging.getLogger(__name__) | |
| class NIL: | |
| pass | |
| class cached_property(object): | |
| def __init__(self, func): | |
| self.func = func | |
| self.__name__ = func.__name__ | |
| self.__doc__ = func.__doc__ | |
| def __get__(self, obj, type=None): | |
| if obj is None: | |
| return self | |
| value = obj.__dict__.get(self.__name__, NIL) | |
| if value is NIL: | |
| value = self.func(obj) | |
| obj.__dict__[self.__name__] = value | |
| return value | |
| def __set__(self, obj, value): | |
| obj.__dict__[self.__name__] = value | |
| def generate_json_array(tokens): | |
| ''' generate json array with given tokens ''' | |
| first = True | |
| for token in tokens: | |
| if first: | |
| yield '[\n' | |
| first = False | |
| else: | |
| yield ',\n' | |
| yield token | |
| yield '\n]' | |
| class JsonObjects(object): | |
| def __init__(self, objects, object_to_json): | |
| self.objects = objects | |
| self.object_to_json = object_to_json | |
| def generate(self, **kwargs): | |
| kwargs.setdefault('sort_keys', True) | |
| kwargs.setdefault('indent', 2) | |
| tokens = (self.object_to_json(obj, **kwargs) | |
| for obj in self.objects) | |
| return generate_json_array(tokens) | |
| def open(self, **kwargs): | |
| chunks = self.generate(**kwargs) | |
| chunks = (chunk.encode('utf-8') for chunk in chunks) | |
| return GeneratorReader(chunks) | |
| def dump(self, outfile, **kwargs): | |
| for s in self.generate(**kwargs): | |
| outfile.write(s) | |
| def unicode_escape(s): | |
| ''' | |
| Escape a string. | |
| :param s: | |
| a string to escape | |
| :type s: | |
| unicode | |
| :returns: | |
| escaped string | |
| :rtype: | |
| unicode | |
| ''' | |
| return s.encode('unicode_escape').decode('utf-8') | |
| def unicode_unescape(s): | |
| ''' | |
| Unescape a string. | |
| :param s: | |
| a string to unescape | |
| :type s: | |
| unicode | |
| :returns: | |
| unescaped string | |
| :rtype: | |
| unicode | |
| ''' | |
| return s.encode('utf-8').decode('unicode_escape') | |
| def transcode(backend_stream, backend_encoding, frontend_encoding, | |
| errors='strict'): | |
| enc = codecs.getencoder(frontend_encoding) | |
| dec = codecs.getdecoder(frontend_encoding) | |
| rd = codecs.getreader(backend_encoding) | |
| wr = codecs.getwriter(backend_encoding) | |
| return codecs.StreamRecoder(backend_stream, enc, dec, rd, wr, errors) | |
| def transcoder(backend_encoding, frontend_encoding, errors='strict'): | |
| return partial(transcode, | |
| backend_encoding=backend_encoding, | |
| frontend_encoding=frontend_encoding, | |
| errors=errors) | |
| class GeneratorReader(object): | |
| ''' convert a string generator into file-like reader | |
| def gen(): | |
| yield b'hello' | |
| yield b'world' | |
| f = GeneratorReader(gen()) | |
| assert 'hell' == f.read(4) | |
| assert 'oworld' == f.read() | |
| ''' | |
| def __init__(self, gen): | |
| self.gen = gen | |
| self.buffer = b'' | |
| def read(self, size=None): | |
| if size is None: | |
| d, self.buffer = self.buffer, b'' | |
| return d + b''.join(self.gen) | |
| for data in self.gen: | |
| self.buffer += data | |
| bufsize = len(self.buffer) | |
| if bufsize >= size: | |
| size = min(bufsize, size) | |
| d, self.buffer = self.buffer[:size], self.buffer[size:] | |
| return d | |
| d, self.buffer = self.buffer, b'' | |
| return d | |
| def close(self): | |
| self.gen = self.buffer = None | |
| class GeneratorTextReader(object): | |
| ''' convert a string generator into file-like reader | |
| def gen(): | |
| yield 'hello' | |
| yield 'world' | |
| f = GeneratorTextReader(gen()) | |
| assert 'hell' == f.read(4) | |
| assert 'oworld' == f.read() | |
| ''' | |
| def __init__(self, gen): | |
| self.gen = gen | |
| self.buffer = '' | |
| def read(self, size=None): | |
| if size is None: | |
| d = self.buffer | |
| self.buffer = '' | |
| return d + ''.join(self.gen) | |
| for data in self.gen: | |
| self.buffer += data | |
| bufsize = len(self.buffer) | |
| if bufsize >= size: | |
| size = min(bufsize, size) | |
| d, self.buffer = self.buffer[:size], self.buffer[size:] | |
| return d | |
| d = self.buffer | |
| self.buffer = '' | |
| return d | |
| def close(self): | |
| self.gen = self.buffer = None | |
| def hwp5_resources_path(res_path): | |
| try: | |
| path = pkg_resources_filename('hwp5', res_path) | |
| except Exception: | |
| logger.info('%s: pkg_resources_filename failed; using resource_stream', | |
| res_path) | |
| with mkstemp_open() as (path, g): | |
| import pkg_resources | |
| f = pkg_resources.resource_stream('hwp5', res_path) | |
| try: | |
| shutil.copyfileobj(f, g) | |
| g.close() | |
| yield path | |
| finally: | |
| f.close() | |
| else: | |
| yield path | |
| def make_open_dest_file(path): | |
| if path: | |
| def open_dest_path(): | |
| with open(path, 'wb') as f: | |
| yield f | |
| return open_dest_path | |
| else: | |
| if PY3: | |
| def open_stdout(): | |
| yield sys.stdout.buffer | |
| return open_stdout | |
| else: | |
| def open_stdout(): | |
| yield sys.stdout | |
| return open_stdout | |
| def wrap_open_dest_for_tty(open_dest, wrappers): | |
| def open_dest_wrapped(): | |
| with open_dest() as output: | |
| if output.isatty(): | |
| with cascade_contextmanager_filters(output, | |
| wrappers) as output: | |
| yield output | |
| else: | |
| yield output | |
| return open_dest_wrapped | |
| def wrap_open_dest(open_dest, wrappers): | |
| def open_dest_wrapped(): | |
| with open_dest() as output: | |
| with cascade_contextmanager_filters(output, wrappers) as output: | |
| yield output | |
| return open_dest_wrapped | |
| def cascade_contextmanager_filters(arg, filters): | |
| if len(filters) == 0: | |
| yield arg | |
| else: | |
| flt, filters = filters[0], filters[1:] | |
| with flt(arg) as ret: | |
| with cascade_contextmanager_filters(ret, filters) as ret: | |
| yield ret | |
| def null_contextmanager_filter(output): | |
| yield output | |
| def output_thru_subprocess(cmd): | |
| def filter(output): | |
| logger.debug('%r', cmd) | |
| try: | |
| p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=output) | |
| except Exception as e: | |
| logger.error('%r: %s', ' '.join(cmd), e) | |
| yield output | |
| else: | |
| try: | |
| yield p.stdin | |
| except IOError as e: | |
| import errno | |
| if e.errno != errno.EPIPE: | |
| raise | |
| finally: | |
| p.stdin.close() | |
| p.wait() | |
| retcode = p.returncode | |
| logger.debug('%r exit %d', cmd, retcode) | |
| return filter | |
| def xmllint(c14n=False, encode=None, format=False, nonet=True): | |
| cmd = ['xmllint'] | |
| if c14n: | |
| cmd.append('--c14n') | |
| if encode: | |
| cmd += ['--encode', encode] | |
| if format: | |
| cmd.append('--format') | |
| if nonet: | |
| cmd.append('--nonet') | |
| cmd.append('-') | |
| return output_thru_subprocess(cmd) | |
| def syntaxhighlight(mimetype): | |
| try: | |
| return syntaxhighlight_pygments(mimetype) | |
| except Exception as e: | |
| logger.info(e) | |
| return null_contextmanager_filter | |
| def syntaxhighlight_pygments(mimetype): | |
| from pygments import highlight | |
| from pygments.lexers import get_lexer_for_mimetype | |
| from pygments.formatters import TerminalFormatter | |
| lexer = get_lexer_for_mimetype(mimetype, encoding='utf-8') | |
| formatter = TerminalFormatter(encoding='utf-8') | |
| def filter(output): | |
| with make_temp_file() as f: | |
| yield f | |
| f.seek(0) | |
| code = f.read() | |
| highlight(code, lexer, formatter, output) | |
| return filter | |
| def make_temp_file(): | |
| fd, name = tempfile.mkstemp() | |
| with unlink_path(name): | |
| with os.fdopen(fd, 'w+') as f: | |
| yield f | |
| def unlink_path(path): | |
| import os | |
| try: | |
| yield | |
| finally: | |
| os.unlink(path) | |
| def pager(): | |
| pager_cmd = os.environ.get('PAGER') | |
| if pager_cmd: | |
| pager_cmd = shlex.split(pager_cmd) | |
| return output_thru_subprocess(pager_cmd) | |
| return pager_less | |
| pager_less = output_thru_subprocess(['less', '-R']) | |
| def mkstemp_open(*args, **kwargs): | |
| if (kwargs.get('text', False) or (len(args) >= 4 and args[3])): | |
| text = True | |
| else: | |
| text = False | |
| mode = 'w+' if text else 'wb+' | |
| fd, path = tempfile.mkstemp(*args, **kwargs) | |
| try: | |
| f = os.fdopen(fd, mode) | |
| try: | |
| yield path, f | |
| finally: | |
| try: | |
| f.close() | |
| except Exception: | |
| pass | |
| finally: | |
| unlink_or_warning(path) | |
| def unlink_or_warning(path): | |
| try: | |
| os.unlink(path) | |
| except Exception as e: | |
| logger.exception(e) | |
| logger.warning('%s cannot be deleted', path) | |