| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import absolute_import |
| | from __future__ import print_function |
| | from __future__ import unicode_literals |
| | from binascii import b2a_hex |
| | from functools import partial |
| | from subprocess import CalledProcessError |
| | from subprocess import Popen |
| | import logging |
| | import os |
| | import subprocess |
| | import tempfile |
| |
|
| | from . import _lxml |
| | from . import _uno |
| | from . import gir_gsf |
| | from . import javax_transform |
| | from . import jython_poifs |
| | from . import olefileio |
| | from . import xmllint |
| | from . import xsltproc |
| |
|
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | def get_xslt(): |
| | if javax_transform.is_enabled(): |
| | return javax_transform.xslt |
| | if _lxml.is_enabled(): |
| | return _lxml.xslt |
| | if xsltproc.is_enabled(): |
| | return xsltproc.xslt |
| | if _uno.is_enabled(): |
| | return _uno.xslt |
| |
|
| |
|
| | def get_xslt_compile(): |
| | modules = [ |
| | javax_transform, |
| | _lxml, |
| | xsltproc, |
| | _uno |
| | ] |
| | for module in modules: |
| | if module.is_enabled(): |
| | xslt_compile = getattr(module, 'xslt_compile', None) |
| | if xslt_compile: |
| | return xslt_compile |
| | xslt = getattr(module, 'xslt', None) |
| | if xslt: |
| | def xslt_compile(xsl_path): |
| | return partial(xslt, xsl_path) |
| |
|
| |
|
| | def get_relaxng(): |
| | if _lxml.is_enabled(): |
| | return _lxml.relaxng |
| | if xmllint.is_enabled(): |
| | return xmllint.relaxng |
| |
|
| |
|
| | def get_relaxng_compile(): |
| | modules = [ |
| | _lxml, |
| | xmllint, |
| | ] |
| | for module in modules: |
| | if module.is_enabled(): |
| | relaxng_compile = getattr(module, 'relaxng_compile', None) |
| | if relaxng_compile: |
| | return relaxng_compile |
| | relaxng = getattr(module, 'relaxng', None) |
| | if relaxng: |
| | def relaxng_compile(rng_path): |
| | return partial(relaxng, rng_path) |
| |
|
| |
|
| | def get_olestorage_class(): |
| | if jython_poifs.is_enabled(): |
| | return jython_poifs.OleStorage |
| | if olefileio.is_enabled(): |
| | return olefileio.OleStorage |
| | if _uno.is_enabled(): |
| | return _uno.OleStorage |
| | if gir_gsf.is_enabled(): |
| | return gir_gsf.OleStorage |
| |
|
| |
|
| | def get_aes128ecb_decrypt(): |
| | try: |
| | return get_aes128ecb_decrypt_cryptography() |
| | except Exception: |
| | pass |
| |
|
| | try: |
| | return get_aes128ecb_decrypt_javax() |
| | except Exception: |
| | pass |
| |
|
| | try: |
| | return get_aes128ecb_decrypt_openssl() |
| | except Exception: |
| | pass |
| |
|
| | raise NotImplementedError('aes128ecb_decrypt') |
| |
|
| |
|
| | def get_aes128ecb_decrypt_cryptography(): |
| | from cryptography.hazmat.primitives.ciphers import Cipher |
| | from cryptography.hazmat.primitives.ciphers import algorithms |
| | from cryptography.hazmat.primitives.ciphers import modes |
| | from cryptography.hazmat.backends import default_backend |
| |
|
| | def decrypt(key, ciphertext): |
| | backend = default_backend() |
| | cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend) |
| | decryptor = cipher.decryptor() |
| | return decryptor.update(ciphertext) + decryptor.finalize() |
| |
|
| | return decrypt |
| |
|
| |
|
| | def get_aes128ecb_decrypt_javax(): |
| | from javax.crypto import Cipher |
| | from javax.crypto.spec import SecretKeySpec |
| |
|
| | def decrypt(key, ciphertext): |
| | secretkey = SecretKeySpec(key, 'AES') |
| | cipher = Cipher.getInstance('AES/ECB/NoPadding') |
| | cipher.init(Cipher.DECRYPT_MODE, secretkey) |
| | decrypted = cipher.doFinal(ciphertext) |
| | return decrypted.tostring() |
| |
|
| | return decrypt |
| |
|
| |
|
| | def get_aes128ecb_decrypt_openssl(): |
| | if not openssl_reachable(): |
| | raise NotImplementedError() |
| |
|
| | def decrypt(key, ciphertext): |
| | fd, name = tempfile.mkstemp() |
| | fp = os.fdopen(fd, 'wb') |
| | try: |
| | fp.write(ciphertext) |
| | finally: |
| | fp.close() |
| |
|
| | args = [ |
| | 'openssl', |
| | 'enc', |
| | '-d', |
| | '-in', |
| | name, |
| | '-aes-128-ecb', |
| | '-K', |
| | b2a_hex(key), |
| | '-nopad', |
| | ] |
| | try: |
| | p = Popen(args, stdout=subprocess.PIPE) |
| | try: |
| | return p.stdout.read() |
| | finally: |
| | p.wait() |
| | p.stdout.close() |
| | finally: |
| | os.unlink(name) |
| |
|
| | return decrypt |
| |
|
| |
|
| | def openssl_reachable(): |
| | args = ['openssl', 'version'] |
| | try: |
| | subprocess.check_output(args) |
| | except OSError: |
| | return False |
| | except CalledProcessError: |
| | return False |
| | except Exception as e: |
| | logger.exception(e) |
| | return False |
| | else: |
| | return True |
| |
|