| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| from __future__ import absolute_import |
| from __future__ import print_function |
| from __future__ import unicode_literals |
| from binascii import b2a_hex |
| from functools import partial |
| from subprocess import CalledProcessError |
| from subprocess import Popen |
| import logging |
| import os |
| import subprocess |
| import tempfile |
|
|
| from . import _lxml |
| from . import _uno |
| from . import gir_gsf |
| from . import javax_transform |
| from . import jython_poifs |
| from . import olefileio |
| from . import xmllint |
| from . import xsltproc |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def get_xslt(): |
| if javax_transform.is_enabled(): |
| return javax_transform.xslt |
| if _lxml.is_enabled(): |
| return _lxml.xslt |
| if xsltproc.is_enabled(): |
| return xsltproc.xslt |
| if _uno.is_enabled(): |
| return _uno.xslt |
|
|
|
|
| def get_xslt_compile(): |
| modules = [ |
| javax_transform, |
| _lxml, |
| xsltproc, |
| _uno |
| ] |
| for module in modules: |
| if module.is_enabled(): |
| xslt_compile = getattr(module, 'xslt_compile', None) |
| if xslt_compile: |
| return xslt_compile |
| xslt = getattr(module, 'xslt', None) |
| if xslt: |
| def xslt_compile(xsl_path): |
| return partial(xslt, xsl_path) |
|
|
|
|
| def get_relaxng(): |
| if _lxml.is_enabled(): |
| return _lxml.relaxng |
| if xmllint.is_enabled(): |
| return xmllint.relaxng |
|
|
|
|
| def get_relaxng_compile(): |
| modules = [ |
| _lxml, |
| xmllint, |
| ] |
| for module in modules: |
| if module.is_enabled(): |
| relaxng_compile = getattr(module, 'relaxng_compile', None) |
| if relaxng_compile: |
| return relaxng_compile |
| relaxng = getattr(module, 'relaxng', None) |
| if relaxng: |
| def relaxng_compile(rng_path): |
| return partial(relaxng, rng_path) |
|
|
|
|
| def get_olestorage_class(): |
| if jython_poifs.is_enabled(): |
| return jython_poifs.OleStorage |
| if olefileio.is_enabled(): |
| return olefileio.OleStorage |
| if _uno.is_enabled(): |
| return _uno.OleStorage |
| if gir_gsf.is_enabled(): |
| return gir_gsf.OleStorage |
|
|
|
|
| def get_aes128ecb_decrypt(): |
| try: |
| return get_aes128ecb_decrypt_cryptography() |
| except Exception: |
| pass |
|
|
| try: |
| return get_aes128ecb_decrypt_javax() |
| except Exception: |
| pass |
|
|
| try: |
| return get_aes128ecb_decrypt_openssl() |
| except Exception: |
| pass |
|
|
| raise NotImplementedError('aes128ecb_decrypt') |
|
|
|
|
| def get_aes128ecb_decrypt_cryptography(): |
| from cryptography.hazmat.primitives.ciphers import Cipher |
| from cryptography.hazmat.primitives.ciphers import algorithms |
| from cryptography.hazmat.primitives.ciphers import modes |
| from cryptography.hazmat.backends import default_backend |
|
|
| def decrypt(key, ciphertext): |
| backend = default_backend() |
| cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend) |
| decryptor = cipher.decryptor() |
| return decryptor.update(ciphertext) + decryptor.finalize() |
|
|
| return decrypt |
|
|
|
|
| def get_aes128ecb_decrypt_javax(): |
| from javax.crypto import Cipher |
| from javax.crypto.spec import SecretKeySpec |
|
|
| def decrypt(key, ciphertext): |
| secretkey = SecretKeySpec(key, 'AES') |
| cipher = Cipher.getInstance('AES/ECB/NoPadding') |
| cipher.init(Cipher.DECRYPT_MODE, secretkey) |
| decrypted = cipher.doFinal(ciphertext) |
| return decrypted.tostring() |
|
|
| return decrypt |
|
|
|
|
| def get_aes128ecb_decrypt_openssl(): |
| if not openssl_reachable(): |
| raise NotImplementedError() |
|
|
| def decrypt(key, ciphertext): |
| fd, name = tempfile.mkstemp() |
| fp = os.fdopen(fd, 'wb') |
| try: |
| fp.write(ciphertext) |
| finally: |
| fp.close() |
|
|
| args = [ |
| 'openssl', |
| 'enc', |
| '-d', |
| '-in', |
| name, |
| '-aes-128-ecb', |
| '-K', |
| b2a_hex(key), |
| '-nopad', |
| ] |
| try: |
| p = Popen(args, stdout=subprocess.PIPE) |
| try: |
| return p.stdout.read() |
| finally: |
| p.wait() |
| p.stdout.close() |
| finally: |
| os.unlink(name) |
|
|
| return decrypt |
|
|
|
|
| def openssl_reachable(): |
| args = ['openssl', 'version'] |
| try: |
| subprocess.check_output(args) |
| except OSError: |
| return False |
| except CalledProcessError: |
| return False |
| except Exception as e: |
| logger.exception(e) |
| return False |
| else: |
| return True |
|
|