| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import absolute_import |
| | from __future__ import print_function |
| | from __future__ import unicode_literals |
| | from contextlib import contextmanager |
| | import io |
| | import logging |
| | import os.path |
| | import shutil |
| | import sys |
| | import tempfile |
| |
|
| | from ..errors import ValidationFailed |
| |
|
| |
|
| | PY3 = sys.version_info.major == 3 |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def is_enabled(): |
| | try: |
| | from lxml import etree |
| | except ImportError: |
| | return False |
| | else: |
| | return True |
| |
|
| |
|
| | def xslt(xsl_path, inp_path, out_path): |
| | ''' Transform XML with XSL |
| | |
| | :param xsl_path: stylesheet path |
| | :param inp_path: input path |
| | :param out_path: output path |
| | ''' |
| | transform = xslt_compile(xsl_path) |
| | with io.open(out_path, 'wb') as f: |
| | return transform(inp_path, f) |
| |
|
| |
|
| | def xslt_compile(xsl_path, **params): |
| | xslt = XSLT(xsl_path, **params) |
| | return xslt.transform_into_stream |
| |
|
| |
|
| | class XSLT: |
| |
|
| | def __init__(self, xsl_path, **params): |
| | ''' Compile XSL Transform function. |
| | :param xsl_path: stylesheet path |
| | :returns: a transform function |
| | ''' |
| | from lxml import etree |
| |
|
| | with io.open(xsl_path, 'rb') as xsl_file: |
| | xsl_doc = etree.parse(xsl_file) |
| |
|
| | self.xsl_path = xsl_path |
| | self.etree_xslt = etree.XSLT(xsl_doc) |
| | self.params = dict((name, etree.XSLT.strparam(value)) |
| | for name, value in params.items()) |
| |
|
| | def transform(self, input, output): |
| | ''' |
| | >>> T.transform('input.xml', 'output.xml') |
| | ''' |
| | with io.open(input, 'rb') as inp_file: |
| | with io.open(output, 'wb') as out_file: |
| | return self._transform(inp_file, out_file) |
| |
|
| | def transform_into_stream(self, input, output): |
| | ''' |
| | >>> T.transform_into_stream('input.xml', sys.stdout) |
| | ''' |
| | with io.open(input, 'rb') as inp_file: |
| | return self._transform(inp_file, output) |
| |
|
| | def _transform(self, input, output): |
| | |
| |
|
| | from lxml import etree |
| | source = etree.parse(input) |
| | logger.info('_lxml.xslt(%s) start', |
| | os.path.basename(self.xsl_path)) |
| | result = self.etree_xslt(source, **self.params) |
| | logger.info('_lxml.xslt(%s) end', |
| | os.path.basename(self.xsl_path)) |
| | |
| | result = bytes(result) |
| | output.write(result) |
| | return dict() |
| |
|
| |
|
| | def relaxng(rng_path, inp_path): |
| | relaxng = RelaxNG(rng_path) |
| | return relaxng.validate(inp_path) |
| |
|
| |
|
| | def relaxng_compile(rng_path): |
| | ''' Compile RelaxNG file |
| | |
| | :param rng_path: RelaxNG path |
| | :returns: a validation function |
| | ''' |
| | return RelaxNG(rng_path) |
| |
|
| |
|
| | class RelaxNG: |
| |
|
| | def __init__(self, rng_path): |
| | from lxml import etree |
| |
|
| | with io.open(rng_path, 'rb') as rng_file: |
| | rng = etree.parse(rng_file) |
| |
|
| | self.rng_path = rng_path |
| | self.etree_relaxng = etree.RelaxNG(rng) |
| |
|
| | @contextmanager |
| | def validating_output(self, output): |
| | fd, name = tempfile.mkstemp() |
| | try: |
| | with os.fdopen(fd, 'wb+') as f: |
| | yield f |
| | f.seek(0) |
| | if not self.validate_stream(f): |
| | raise ValidationFailed('RelaxNG') |
| | f.seek(0) |
| | shutil.copyfileobj(f, output) |
| | finally: |
| | try: |
| | os.unlink(name) |
| | except Exception as e: |
| | logger.warning('%s: can\'t unlink %s', e, name) |
| |
|
| | def validate(self, input): |
| | from lxml import etree |
| | with io.open(input, 'rb') as f: |
| | doc = etree.parse(f) |
| | return self._validate(doc) |
| |
|
| | def validate_stream(self, input): |
| | from lxml import etree |
| | doc = etree.parse(input) |
| | return self._validate(doc) |
| |
|
| | def _validate(self, doc): |
| | logger.info('_lxml.relaxng(%s) start', os.path.basename(self.rng_path)) |
| | try: |
| | valid = self.etree_relaxng.validate(doc) |
| | except Exception as e: |
| | logger.exception(e) |
| | raise |
| | else: |
| | if not valid: |
| | for error in self.etree_relaxng.error_log: |
| | logger.error('%s', error) |
| | return valid |
| | finally: |
| | logger.info( |
| | '_lxml.relaxng(%s) end', |
| | os.path.basename(self.rng_path) |
| | ) |
| |
|
| |
|
| | def errlog_to_dict(error): |
| | return dict(message=error.message, |
| | filename=error.filename, |
| | line=error.line, |
| | column=error.column, |
| | domain=error.domain_name, |
| | type=error.type_name, |
| | level=error.level_name) |
| |
|