|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import absolute_import |
|
|
from __future__ import print_function |
|
|
from __future__ import unicode_literals |
|
|
from contextlib import contextmanager |
|
|
import io |
|
|
import logging |
|
|
import os.path |
|
|
import shutil |
|
|
import sys |
|
|
import tempfile |
|
|
|
|
|
from ..errors import ValidationFailed |
|
|
|
|
|
|
|
|
PY3 = sys.version_info.major == 3 |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
def is_enabled(): |
|
|
try: |
|
|
from lxml import etree |
|
|
except ImportError: |
|
|
return False |
|
|
else: |
|
|
return True |
|
|
|
|
|
|
|
|
def xslt(xsl_path, inp_path, out_path): |
|
|
''' Transform XML with XSL |
|
|
|
|
|
:param xsl_path: stylesheet path |
|
|
:param inp_path: input path |
|
|
:param out_path: output path |
|
|
''' |
|
|
transform = xslt_compile(xsl_path) |
|
|
with io.open(out_path, 'wb') as f: |
|
|
return transform(inp_path, f) |
|
|
|
|
|
|
|
|
def xslt_compile(xsl_path, **params): |
|
|
xslt = XSLT(xsl_path, **params) |
|
|
return xslt.transform_into_stream |
|
|
|
|
|
|
|
|
class XSLT: |
|
|
|
|
|
def __init__(self, xsl_path, **params): |
|
|
''' Compile XSL Transform function. |
|
|
:param xsl_path: stylesheet path |
|
|
:returns: a transform function |
|
|
''' |
|
|
from lxml import etree |
|
|
|
|
|
with io.open(xsl_path, 'rb') as xsl_file: |
|
|
xsl_doc = etree.parse(xsl_file) |
|
|
|
|
|
self.xsl_path = xsl_path |
|
|
self.etree_xslt = etree.XSLT(xsl_doc) |
|
|
self.params = dict((name, etree.XSLT.strparam(value)) |
|
|
for name, value in params.items()) |
|
|
|
|
|
def transform(self, input, output): |
|
|
''' |
|
|
>>> T.transform('input.xml', 'output.xml') |
|
|
''' |
|
|
with io.open(input, 'rb') as inp_file: |
|
|
with io.open(output, 'wb') as out_file: |
|
|
return self._transform(inp_file, out_file) |
|
|
|
|
|
def transform_into_stream(self, input, output): |
|
|
''' |
|
|
>>> T.transform_into_stream('input.xml', sys.stdout) |
|
|
''' |
|
|
with io.open(input, 'rb') as inp_file: |
|
|
return self._transform(inp_file, output) |
|
|
|
|
|
def _transform(self, input, output): |
|
|
|
|
|
|
|
|
from lxml import etree |
|
|
source = etree.parse(input) |
|
|
logger.info('_lxml.xslt(%s) start', |
|
|
os.path.basename(self.xsl_path)) |
|
|
result = self.etree_xslt(source, **self.params) |
|
|
logger.info('_lxml.xslt(%s) end', |
|
|
os.path.basename(self.xsl_path)) |
|
|
|
|
|
result = bytes(result) |
|
|
output.write(result) |
|
|
return dict() |
|
|
|
|
|
|
|
|
def relaxng(rng_path, inp_path): |
|
|
relaxng = RelaxNG(rng_path) |
|
|
return relaxng.validate(inp_path) |
|
|
|
|
|
|
|
|
def relaxng_compile(rng_path): |
|
|
''' Compile RelaxNG file |
|
|
|
|
|
:param rng_path: RelaxNG path |
|
|
:returns: a validation function |
|
|
''' |
|
|
return RelaxNG(rng_path) |
|
|
|
|
|
|
|
|
class RelaxNG: |
|
|
|
|
|
def __init__(self, rng_path): |
|
|
from lxml import etree |
|
|
|
|
|
with io.open(rng_path, 'rb') as rng_file: |
|
|
rng = etree.parse(rng_file) |
|
|
|
|
|
self.rng_path = rng_path |
|
|
self.etree_relaxng = etree.RelaxNG(rng) |
|
|
|
|
|
@contextmanager |
|
|
def validating_output(self, output): |
|
|
fd, name = tempfile.mkstemp() |
|
|
try: |
|
|
with os.fdopen(fd, 'wb+') as f: |
|
|
yield f |
|
|
f.seek(0) |
|
|
if not self.validate_stream(f): |
|
|
raise ValidationFailed('RelaxNG') |
|
|
f.seek(0) |
|
|
shutil.copyfileobj(f, output) |
|
|
finally: |
|
|
try: |
|
|
os.unlink(name) |
|
|
except Exception as e: |
|
|
logger.warning('%s: can\'t unlink %s', e, name) |
|
|
|
|
|
def validate(self, input): |
|
|
from lxml import etree |
|
|
with io.open(input, 'rb') as f: |
|
|
doc = etree.parse(f) |
|
|
return self._validate(doc) |
|
|
|
|
|
def validate_stream(self, input): |
|
|
from lxml import etree |
|
|
doc = etree.parse(input) |
|
|
return self._validate(doc) |
|
|
|
|
|
def _validate(self, doc): |
|
|
logger.info('_lxml.relaxng(%s) start', os.path.basename(self.rng_path)) |
|
|
try: |
|
|
valid = self.etree_relaxng.validate(doc) |
|
|
except Exception as e: |
|
|
logger.exception(e) |
|
|
raise |
|
|
else: |
|
|
if not valid: |
|
|
for error in self.etree_relaxng.error_log: |
|
|
logger.error('%s', error) |
|
|
return valid |
|
|
finally: |
|
|
logger.info( |
|
|
'_lxml.relaxng(%s) end', |
|
|
os.path.basename(self.rng_path) |
|
|
) |
|
|
|
|
|
|
|
|
def errlog_to_dict(error): |
|
|
return dict(message=error.message, |
|
|
filename=error.filename, |
|
|
line=error.line, |
|
|
column=error.column, |
|
|
domain=error.domain_name, |
|
|
type=error.type_name, |
|
|
level=error.level_name) |
|
|
|