| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| from contextlib import contextmanager |
| from subprocess import CalledProcessError |
| import logging |
| import subprocess |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
| executable = 'xmllint' |
| enabled = None |
|
|
|
|
| def xmllint_reachable(): |
| args = [executable, '--version'] |
| try: |
| subprocess.check_output(args) |
| except OSError: |
| return False |
| except CalledProcessError: |
| return False |
| except Exception as e: |
| logger.exception(e) |
| return False |
| else: |
| return True |
|
|
|
|
| def is_enabled(): |
| global enabled |
| if enabled is None: |
| enabled = xmllint_reachable() |
| return enabled |
|
|
|
|
| def enable(): |
| global enabled |
| enabled = True |
|
|
|
|
| def disable(): |
| global enabled |
| enabled = False |
|
|
|
|
| def relaxng(rng_path, inp_path): |
| from subprocess import Popen |
| args = [executable, '--noout', '--relaxng', rng_path, inp_path] |
| p = Popen(args) |
| p.wait() |
| return p.returncode == 0 |
|
|
|
|
| def relaxng_compile(rng_path): |
| return RelaxNG(rng_path) |
|
|
|
|
| class RelaxNG: |
|
|
| def __init__(self, rng_path): |
| self.rng_path = rng_path |
|
|
| @contextmanager |
| def validating_output(self, output): |
| args = [executable, '--relaxng', self.rng_path, '-'] |
| p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=output) |
| try: |
| yield p.stdin |
| except: |
| p.stdin.close() |
| p.wait() |
| raise |
| else: |
| p.stdin.close() |
| p.wait() |
| if p.returncode != 0: |
| raise Exception('RelaxNG validation failed') |
|
|