| | from collections import namedtuple |
| | from hashlib import sha256 |
| | import os |
| | import shutil |
| | import sys |
| | import fnmatch |
| |
|
| | from sympy.testing.pytest import XFAIL |
| |
|
| |
|
| | def may_xfail(func): |
| | if sys.platform.lower() == 'darwin' or os.name == 'nt': |
| | |
| | |
| | |
| | return XFAIL(func) |
| | else: |
| | return func |
| |
|
| |
|
| | class CompilerNotFoundError(FileNotFoundError): |
| | pass |
| |
|
| |
|
| | class CompileError (Exception): |
| | """Failure to compile one or more C/C++ source files.""" |
| |
|
| |
|
| | def get_abspath(path, cwd='.'): |
| | """ Returns the absolute path. |
| | |
| | Parameters |
| | ========== |
| | |
| | path : str |
| | (relative) path. |
| | cwd : str |
| | Path to root of relative path. |
| | """ |
| | if os.path.isabs(path): |
| | return path |
| | else: |
| | if not os.path.isabs(cwd): |
| | cwd = os.path.abspath(cwd) |
| | return os.path.abspath( |
| | os.path.join(cwd, path) |
| | ) |
| |
|
| |
|
| | def make_dirs(path): |
| | """ Create directories (equivalent of ``mkdir -p``). """ |
| | if path[-1] == '/': |
| | parent = os.path.dirname(path[:-1]) |
| | else: |
| | parent = os.path.dirname(path) |
| |
|
| | if len(parent) > 0: |
| | if not os.path.exists(parent): |
| | make_dirs(parent) |
| |
|
| | if not os.path.exists(path): |
| | os.mkdir(path, 0o777) |
| | else: |
| | assert os.path.isdir(path) |
| |
|
| | def missing_or_other_newer(path, other_path, cwd=None): |
| | """ |
| | Investigate if path is non-existent or older than provided reference |
| | path. |
| | |
| | Parameters |
| | ========== |
| | path: string |
| | path to path which might be missing or too old |
| | other_path: string |
| | reference path |
| | cwd: string |
| | working directory (root of relative paths) |
| | |
| | Returns |
| | ======= |
| | True if path is older or missing. |
| | """ |
| | cwd = cwd or '.' |
| | path = get_abspath(path, cwd=cwd) |
| | other_path = get_abspath(other_path, cwd=cwd) |
| | if not os.path.exists(path): |
| | return True |
| | if os.path.getmtime(other_path) - 1e-6 >= os.path.getmtime(path): |
| | |
| | return True |
| | return False |
| |
|
| | def copy(src, dst, only_update=False, copystat=True, cwd=None, |
| | dest_is_dir=False, create_dest_dirs=False): |
| | """ Variation of ``shutil.copy`` with extra options. |
| | |
| | Parameters |
| | ========== |
| | |
| | src : str |
| | Path to source file. |
| | dst : str |
| | Path to destination. |
| | only_update : bool |
| | Only copy if source is newer than destination |
| | (returns None if it was newer), default: ``False``. |
| | copystat : bool |
| | See ``shutil.copystat``. default: ``True``. |
| | cwd : str |
| | Path to working directory (root of relative paths). |
| | dest_is_dir : bool |
| | Ensures that dst is treated as a directory. default: ``False`` |
| | create_dest_dirs : bool |
| | Creates directories if needed. |
| | |
| | Returns |
| | ======= |
| | |
| | Path to the copied file. |
| | |
| | """ |
| | if cwd: |
| | if not os.path.isabs(src): |
| | src = os.path.join(cwd, src) |
| | if not os.path.isabs(dst): |
| | dst = os.path.join(cwd, dst) |
| |
|
| | if not os.path.exists(src): |
| | raise FileNotFoundError("Source: `{}` does not exist".format(src)) |
| |
|
| | |
| | |
| | if dest_is_dir: |
| | if not dst[-1] == '/': |
| | dst = dst+'/' |
| | else: |
| | if os.path.exists(dst) and os.path.isdir(dst): |
| | dest_is_dir = True |
| |
|
| | if dest_is_dir: |
| | dest_dir = dst |
| | dest_fname = os.path.basename(src) |
| | dst = os.path.join(dest_dir, dest_fname) |
| | else: |
| | dest_dir = os.path.dirname(dst) |
| |
|
| | if not os.path.exists(dest_dir): |
| | if create_dest_dirs: |
| | make_dirs(dest_dir) |
| | else: |
| | raise FileNotFoundError("You must create directory first.") |
| |
|
| | if only_update: |
| | if not missing_or_other_newer(dst, src): |
| | return |
| |
|
| | if os.path.islink(dst): |
| | dst = os.path.abspath(os.path.realpath(dst), cwd=cwd) |
| |
|
| | shutil.copy(src, dst) |
| | if copystat: |
| | shutil.copystat(src, dst) |
| |
|
| | return dst |
| |
|
| | Glob = namedtuple('Glob', 'pathname') |
| | ArbitraryDepthGlob = namedtuple('ArbitraryDepthGlob', 'filename') |
| |
|
| | def glob_at_depth(filename_glob, cwd=None): |
| | if cwd is not None: |
| | cwd = '.' |
| | globbed = [] |
| | for root, dirs, filenames in os.walk(cwd): |
| | for fn in filenames: |
| | |
| | if fnmatch.fnmatch(fn, filename_glob): |
| | globbed.append(os.path.join(root, fn)) |
| | return globbed |
| |
|
| | def sha256_of_file(path, nblocks=128): |
| | """ Computes the SHA256 hash of a file. |
| | |
| | Parameters |
| | ========== |
| | |
| | path : string |
| | Path to file to compute hash of. |
| | nblocks : int |
| | Number of blocks to read per iteration. |
| | |
| | Returns |
| | ======= |
| | |
| | hashlib sha256 hash object. Use ``.digest()`` or ``.hexdigest()`` |
| | on returned object to get binary or hex encoded string. |
| | """ |
| | sh = sha256() |
| | with open(path, 'rb') as f: |
| | for chunk in iter(lambda: f.read(nblocks*sh.block_size), b''): |
| | sh.update(chunk) |
| | return sh |
| |
|
| |
|
| | def sha256_of_string(string): |
| | """ Computes the SHA256 hash of a string. """ |
| | sh = sha256() |
| | sh.update(string) |
| | return sh |
| |
|
| |
|
| | def pyx_is_cplus(path): |
| | """ |
| | Inspect a Cython source file (.pyx) and look for comment line like: |
| | |
| | # distutils: language = c++ |
| | |
| | Returns True if such a file is present in the file, else False. |
| | """ |
| | with open(path) as fh: |
| | for line in fh: |
| | if line.startswith('#') and '=' in line: |
| | splitted = line.split('=') |
| | if len(splitted) != 2: |
| | continue |
| | lhs, rhs = splitted |
| | if lhs.strip().split()[-1].lower() == 'language' and \ |
| | rhs.strip().split()[0].lower() == 'c++': |
| | return True |
| | return False |
| |
|
| | def import_module_from_file(filename, only_if_newer_than=None): |
| | """ Imports Python extension (from shared object file) |
| | |
| | Provide a list of paths in `only_if_newer_than` to check |
| | timestamps of dependencies. import_ raises an ImportError |
| | if any is newer. |
| | |
| | Word of warning: The OS may cache shared objects which makes |
| | reimporting same path of an shared object file very problematic. |
| | |
| | It will not detect the new time stamp, nor new checksum, but will |
| | instead silently use old module. Use unique names for this reason. |
| | |
| | Parameters |
| | ========== |
| | |
| | filename : str |
| | Path to shared object. |
| | only_if_newer_than : iterable of strings |
| | Paths to dependencies of the shared object. |
| | |
| | Raises |
| | ====== |
| | |
| | ``ImportError`` if any of the files specified in ``only_if_newer_than`` are newer |
| | than the file given by filename. |
| | """ |
| | path, name = os.path.split(filename) |
| | name, ext = os.path.splitext(name) |
| | name = name.split('.')[0] |
| | if sys.version_info[0] == 2: |
| | from imp import find_module, load_module |
| | fobj, filename, data = find_module(name, [path]) |
| | if only_if_newer_than: |
| | for dep in only_if_newer_than: |
| | if os.path.getmtime(filename) < os.path.getmtime(dep): |
| | raise ImportError("{} is newer than {}".format(dep, filename)) |
| | mod = load_module(name, fobj, filename, data) |
| | else: |
| | import importlib.util |
| | spec = importlib.util.spec_from_file_location(name, filename) |
| | if spec is None: |
| | raise ImportError("Failed to import: '%s'" % filename) |
| | mod = importlib.util.module_from_spec(spec) |
| | spec.loader.exec_module(mod) |
| | return mod |
| |
|
| |
|
| | def find_binary_of_command(candidates): |
| | """ Finds binary first matching name among candidates. |
| | |
| | Calls ``which`` from shutils for provided candidates and returns |
| | first hit. |
| | |
| | Parameters |
| | ========== |
| | |
| | candidates : iterable of str |
| | Names of candidate commands |
| | |
| | Raises |
| | ====== |
| | |
| | CompilerNotFoundError if no candidates match. |
| | """ |
| | from shutil import which |
| | for c in candidates: |
| | binary_path = which(c) |
| | if c and binary_path: |
| | return c, binary_path |
| |
|
| | raise CompilerNotFoundError('No binary located for candidates: {}'.format(candidates)) |
| |
|
| |
|
| | def unique_list(l): |
| | """ Uniquify a list (skip duplicate items). """ |
| | result = [] |
| | for x in l: |
| | if x not in result: |
| | result.append(x) |
| | return result |
| |
|