| """
|
| Cython -- Things that don't belong anywhere else in particular
|
| """
|
|
|
| from __future__ import absolute_import
|
|
|
| import cython
|
|
|
| cython.declare(
|
| basestring=object,
|
| os=object, sys=object, re=object, io=object, codecs=object, glob=object, shutil=object, tempfile=object,
|
| cython_version=object,
|
| _function_caches=list, _parse_file_version=object, _match_file_encoding=object,
|
| )
|
|
|
| try:
|
| from __builtin__ import basestring
|
| except ImportError:
|
| basestring = str
|
|
|
| try:
|
| FileNotFoundError
|
| except NameError:
|
| FileNotFoundError = OSError
|
|
|
| import os
|
| import sys
|
| import re
|
| import io
|
| import codecs
|
| import glob
|
| import shutil
|
| import tempfile
|
| from functools import wraps
|
|
|
| from . import __version__ as cython_version
|
|
|
| PACKAGE_FILES = ("__init__.py", "__init__.pyc", "__init__.pyx", "__init__.pxd")
|
|
|
| _build_cache_name = "__{0}_cache".format
|
| _CACHE_NAME_PATTERN = re.compile(r"^__(.+)_cache$")
|
|
|
| modification_time = os.path.getmtime
|
|
|
| GENERATED_BY_MARKER = "/* Generated by Cython %s */" % cython_version
|
| GENERATED_BY_MARKER_BYTES = GENERATED_BY_MARKER.encode('us-ascii')
|
|
|
|
|
| class _TryFinallyGeneratorContextManager(object):
|
| """
|
| Fast, bare minimum @contextmanager, only for try-finally, not for exception handling.
|
| """
|
| def __init__(self, gen):
|
| self._gen = gen
|
|
|
| def __enter__(self):
|
| return next(self._gen)
|
|
|
| def __exit__(self, exc_type, exc_val, exc_tb):
|
| try:
|
| next(self._gen)
|
| except (StopIteration, GeneratorExit):
|
| pass
|
|
|
|
|
| def try_finally_contextmanager(gen_func):
|
| @wraps(gen_func)
|
| def make_gen(*args, **kwargs):
|
| return _TryFinallyGeneratorContextManager(gen_func(*args, **kwargs))
|
| return make_gen
|
|
|
|
|
| _function_caches = []
|
|
|
|
|
| def clear_function_caches():
|
| for cache in _function_caches:
|
| cache.clear()
|
|
|
|
|
| def cached_function(f):
|
| cache = {}
|
| _function_caches.append(cache)
|
| uncomputed = object()
|
|
|
| @wraps(f)
|
| def wrapper(*args):
|
| res = cache.get(args, uncomputed)
|
| if res is uncomputed:
|
| res = cache[args] = f(*args)
|
| return res
|
|
|
| wrapper.uncached = f
|
| return wrapper
|
|
|
|
|
| def _find_cache_attributes(obj):
|
| """The function iterates over the attributes of the object and,
|
| if it finds the name of the cache, it returns it and the corresponding method name.
|
| The method may not be present in the object.
|
| """
|
| for attr_name in dir(obj):
|
| match = _CACHE_NAME_PATTERN.match(attr_name)
|
| if match is not None:
|
| yield attr_name, match.group(1)
|
|
|
|
|
| def clear_method_caches(obj):
|
| """Removes every cache found in the object,
|
| if a corresponding method exists for that cache.
|
| """
|
| for cache_name, method_name in _find_cache_attributes(obj):
|
| if hasattr(obj, method_name):
|
| delattr(obj, cache_name)
|
|
|
|
|
|
|
|
|
| def cached_method(f):
|
| cache_name = _build_cache_name(f.__name__)
|
|
|
| def wrapper(self, *args):
|
| cache = getattr(self, cache_name, None)
|
| if cache is None:
|
| cache = {}
|
| setattr(self, cache_name, cache)
|
| if args in cache:
|
| return cache[args]
|
| res = cache[args] = f(self, *args)
|
| return res
|
|
|
| return wrapper
|
|
|
|
|
| def replace_suffix(path, newsuf):
|
| base, _ = os.path.splitext(path)
|
| return base + newsuf
|
|
|
|
|
| def open_new_file(path):
|
| if os.path.exists(path):
|
|
|
|
|
| os.unlink(path)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| return codecs.open(path, "w", encoding="ISO-8859-1")
|
|
|
|
|
| def castrate_file(path, st):
|
|
|
|
|
|
|
|
|
| if not is_cython_generated_file(path, allow_failed=True, if_not_found=False):
|
| return
|
|
|
| try:
|
| f = open_new_file(path)
|
| except EnvironmentError:
|
| pass
|
| else:
|
| f.write(
|
| "#error Do not use this file, it is the result of a failed Cython compilation.\n")
|
| f.close()
|
| if st:
|
| os.utime(path, (st.st_atime, st.st_mtime-1))
|
|
|
|
|
| def is_cython_generated_file(path, allow_failed=False, if_not_found=True):
|
| failure_marker = b"#error Do not use this file, it is the result of a failed Cython compilation."
|
| file_content = None
|
| if os.path.exists(path):
|
| try:
|
| with open(path, "rb") as f:
|
| file_content = f.read(len(failure_marker))
|
| except (OSError, IOError):
|
| pass
|
|
|
| if file_content is None:
|
|
|
| return if_not_found
|
|
|
| return (
|
|
|
| file_content.startswith(b"/* Generated by Cython ") or
|
|
|
| (allow_failed and file_content == failure_marker) or
|
|
|
| not file_content
|
| )
|
|
|
|
|
| def file_generated_by_this_cython(path):
|
| file_content = b''
|
| if os.path.exists(path):
|
| try:
|
| with open(path, "rb") as f:
|
| file_content = f.read(len(GENERATED_BY_MARKER_BYTES))
|
| except (OSError, IOError):
|
| pass
|
| return file_content and file_content.startswith(GENERATED_BY_MARKER_BYTES)
|
|
|
|
|
| def file_newer_than(path, time):
|
| ftime = modification_time(path)
|
| return ftime > time
|
|
|
|
|
| def safe_makedirs(path):
|
| try:
|
| os.makedirs(path)
|
| except OSError:
|
| if not os.path.isdir(path):
|
| raise
|
|
|
|
|
| def copy_file_to_dir_if_newer(sourcefile, destdir):
|
| """
|
| Copy file sourcefile to directory destdir (creating it if needed),
|
| preserving metadata. If the destination file exists and is not
|
| older than the source file, the copying is skipped.
|
| """
|
| destfile = os.path.join(destdir, os.path.basename(sourcefile))
|
| try:
|
| desttime = modification_time(destfile)
|
| except OSError:
|
|
|
| safe_makedirs(destdir)
|
| else:
|
|
|
| if not file_newer_than(sourcefile, desttime):
|
| return
|
| shutil.copy2(sourcefile, destfile)
|
|
|
|
|
| @cached_function
|
| def find_root_package_dir(file_path):
|
| dir = os.path.dirname(file_path)
|
| if file_path == dir:
|
| return dir
|
| elif is_package_dir(dir):
|
| return find_root_package_dir(dir)
|
| else:
|
| return dir
|
|
|
|
|
| @cached_function
|
| def check_package_dir(dir_path, package_names):
|
| namespace = True
|
| for dirname in package_names:
|
| dir_path = os.path.join(dir_path, dirname)
|
| has_init = contains_init(dir_path)
|
| if has_init:
|
| namespace = False
|
| return dir_path, namespace
|
|
|
|
|
| @cached_function
|
| def contains_init(dir_path):
|
| for filename in PACKAGE_FILES:
|
| path = os.path.join(dir_path, filename)
|
| if path_exists(path):
|
| return 1
|
|
|
|
|
| def is_package_dir(dir_path):
|
| if contains_init(dir_path):
|
| return 1
|
|
|
|
|
| @cached_function
|
| def path_exists(path):
|
|
|
| if os.path.exists(path):
|
| return True
|
|
|
| try:
|
| loader = __loader__
|
|
|
|
|
| archive_path = getattr(loader, 'archive', None)
|
| if archive_path:
|
| normpath = os.path.normpath(path)
|
| if normpath.startswith(archive_path):
|
| arcname = normpath[len(archive_path)+1:]
|
| try:
|
| loader.get_data(arcname)
|
| return True
|
| except IOError:
|
| return False
|
| except NameError:
|
| pass
|
| return False
|
|
|
|
|
| _parse_file_version = re.compile(r".*[.]cython-([0-9]+)[.][^./\\]+$").findall
|
|
|
|
|
| @cached_function
|
| def find_versioned_file(directory, filename, suffix,
|
| _current_version=int(re.sub(r"^([0-9]+)[.]([0-9]+).*", r"\1\2", cython_version))):
|
| """
|
| Search a directory for versioned pxd files, e.g. "lib.cython-30.pxd" for a Cython 3.0+ version.
|
|
|
| @param directory: the directory to search
|
| @param filename: the filename without suffix
|
| @param suffix: the filename extension including the dot, e.g. ".pxd"
|
| @return: the file path if found, or None
|
| """
|
| assert not suffix or suffix[:1] == '.'
|
| path_prefix = os.path.join(directory, filename)
|
|
|
| matching_files = glob.glob(
|
| (glob.escape(path_prefix) if sys.version_info >= (3, 4) else
|
| ''.join([ '['+c+']' if c in '[*?' else c for c in path_prefix]))
|
| + ".cython-*" + suffix)
|
| path = path_prefix + suffix
|
| if not os.path.exists(path):
|
| path = None
|
| best_match = (-1, path)
|
|
|
| for path in matching_files:
|
| versions = _parse_file_version(path)
|
| if versions:
|
| int_version = int(versions[0])
|
|
|
| if best_match[0] < int_version <= _current_version:
|
| best_match = (int_version, path)
|
| return best_match[1]
|
|
|
|
|
|
|
|
|
| def decode_filename(filename):
|
| if isinstance(filename, bytes):
|
| try:
|
| filename_encoding = sys.getfilesystemencoding()
|
| if filename_encoding is None:
|
| filename_encoding = sys.getdefaultencoding()
|
| filename = filename.decode(filename_encoding)
|
| except UnicodeDecodeError:
|
| pass
|
| return filename
|
|
|
|
|
|
|
|
|
| _match_file_encoding = re.compile(br"(\w*coding)[:=]\s*([-\w.]+)").search
|
|
|
|
|
| def detect_opened_file_encoding(f, default='UTF-8'):
|
|
|
|
|
|
|
| lines = ()
|
| start = b''
|
| while len(lines) < 3:
|
| data = f.read(500)
|
| start += data
|
| lines = start.split(b"\n")
|
| if not data:
|
| break
|
|
|
| m = _match_file_encoding(lines[0])
|
| if m and m.group(1) != b'c_string_encoding':
|
| return m.group(2).decode('iso8859-1')
|
| elif len(lines) > 1:
|
| m = _match_file_encoding(lines[1])
|
| if m:
|
| return m.group(2).decode('iso8859-1')
|
| return default
|
|
|
|
|
| def skip_bom(f):
|
| """
|
| Read past a BOM at the beginning of a source file.
|
| This could be added to the scanner, but it's *substantially* easier
|
| to keep it at this level.
|
| """
|
| if f.read(1) != u'\uFEFF':
|
| f.seek(0)
|
|
|
|
|
| def open_source_file(source_filename, encoding=None, error_handling=None):
|
| stream = None
|
| try:
|
| if encoding is None:
|
|
|
| f = io.open(source_filename, 'rb')
|
| encoding = detect_opened_file_encoding(f)
|
| f.seek(0)
|
| stream = io.TextIOWrapper(f, encoding=encoding, errors=error_handling)
|
| else:
|
| stream = io.open(source_filename, encoding=encoding, errors=error_handling)
|
|
|
| except OSError:
|
| if os.path.exists(source_filename):
|
| raise
|
|
|
| try:
|
| loader = __loader__
|
| if source_filename.startswith(loader.archive):
|
| stream = open_source_from_loader(
|
| loader, source_filename,
|
| encoding, error_handling)
|
| except (NameError, AttributeError):
|
| pass
|
|
|
| if stream is None:
|
| raise FileNotFoundError(source_filename)
|
| skip_bom(stream)
|
| return stream
|
|
|
|
|
| def open_source_from_loader(loader,
|
| source_filename,
|
| encoding=None, error_handling=None):
|
| nrmpath = os.path.normpath(source_filename)
|
| arcname = nrmpath[len(loader.archive)+1:]
|
| data = loader.get_data(arcname)
|
| return io.TextIOWrapper(io.BytesIO(data),
|
| encoding=encoding,
|
| errors=error_handling)
|
|
|
|
|
| def str_to_number(value):
|
|
|
|
|
| is_neg = False
|
| if value[:1] == '-':
|
| is_neg = True
|
| value = value[1:]
|
| if len(value) < 2:
|
| value = int(value, 0)
|
| elif value[0] == '0':
|
| literal_type = value[1]
|
| if literal_type in 'xX':
|
|
|
| value = strip_py2_long_suffix(value)
|
| value = int(value[2:], 16)
|
| elif literal_type in 'oO':
|
|
|
| value = int(value[2:], 8)
|
| elif literal_type in 'bB':
|
|
|
| value = int(value[2:], 2)
|
| else:
|
|
|
| value = int(value, 8)
|
| else:
|
| value = int(value, 0)
|
| return -value if is_neg else value
|
|
|
|
|
| def strip_py2_long_suffix(value_str):
|
| """
|
| Python 2 likes to append 'L' to stringified numbers
|
| which in then can't process when converting them to numbers.
|
| """
|
| if value_str[-1] in 'lL':
|
| return value_str[:-1]
|
| return value_str
|
|
|
|
|
| def long_literal(value):
|
| if isinstance(value, basestring):
|
| value = str_to_number(value)
|
| return not -2**31 <= value < 2**31
|
|
|
|
|
| @cached_function
|
| def get_cython_cache_dir():
|
| r"""
|
| Return the base directory containing Cython's caches.
|
|
|
| Priority:
|
|
|
| 1. CYTHON_CACHE_DIR
|
| 2. (OS X): ~/Library/Caches/Cython
|
| (posix not OS X): XDG_CACHE_HOME/cython if XDG_CACHE_HOME defined
|
| 3. ~/.cython
|
|
|
| """
|
| if 'CYTHON_CACHE_DIR' in os.environ:
|
| return os.environ['CYTHON_CACHE_DIR']
|
|
|
| parent = None
|
| if os.name == 'posix':
|
| if sys.platform == 'darwin':
|
| parent = os.path.expanduser('~/Library/Caches')
|
| else:
|
|
|
| parent = os.environ.get('XDG_CACHE_HOME')
|
|
|
| if parent and os.path.isdir(parent):
|
| return os.path.join(parent, 'cython')
|
|
|
|
|
| return os.path.expanduser(os.path.join('~', '.cython'))
|
|
|
|
|
| @try_finally_contextmanager
|
| def captured_fd(stream=2, encoding=None):
|
| orig_stream = os.dup(stream)
|
| try:
|
| with tempfile.TemporaryFile(mode="a+b") as temp_file:
|
| def read_output(_output=[b'']):
|
| if not temp_file.closed:
|
| temp_file.seek(0)
|
| _output[0] = temp_file.read()
|
| return _output[0]
|
|
|
| os.dup2(temp_file.fileno(), stream)
|
| def get_output():
|
| result = read_output()
|
| return result.decode(encoding) if encoding else result
|
|
|
| yield get_output
|
|
|
| os.dup2(orig_stream, stream)
|
| read_output()
|
| finally:
|
| os.close(orig_stream)
|
|
|
|
|
| def get_encoding_candidates():
|
| candidates = [sys.getdefaultencoding()]
|
| for stream in (sys.stdout, sys.stdin, sys.__stdout__, sys.__stdin__):
|
| encoding = getattr(stream, 'encoding', None)
|
|
|
| if encoding is not None and encoding not in candidates:
|
| candidates.append(encoding)
|
| return candidates
|
|
|
|
|
| def prepare_captured(captured):
|
| captured_bytes = captured.strip()
|
| if not captured_bytes:
|
| return None
|
| for encoding in get_encoding_candidates():
|
| try:
|
| return captured_bytes.decode(encoding)
|
| except UnicodeDecodeError:
|
| pass
|
|
|
| return captured_bytes.decode('latin-1')
|
|
|
|
|
| def print_captured(captured, output, header_line=None):
|
| captured = prepare_captured(captured)
|
| if captured:
|
| if header_line:
|
| output.write(header_line)
|
| output.write(captured)
|
|
|
|
|
| def print_bytes(s, header_text=None, end=b'\n', file=sys.stdout, flush=True):
|
| if header_text:
|
| file.write(header_text)
|
| file.flush()
|
| try:
|
| out = file.buffer
|
| except AttributeError:
|
| out = file
|
| out.write(s)
|
| if end:
|
| out.write(end)
|
| if flush:
|
| out.flush()
|
|
|
|
|
| class OrderedSet(object):
|
| def __init__(self, elements=()):
|
| self._list = []
|
| self._set = set()
|
| self.update(elements)
|
|
|
| def __iter__(self):
|
| return iter(self._list)
|
|
|
| def update(self, elements):
|
| for e in elements:
|
| self.add(e)
|
|
|
| def add(self, e):
|
| if e not in self._set:
|
| self._list.append(e)
|
| self._set.add(e)
|
|
|
| def __bool__(self):
|
| return bool(self._set)
|
|
|
| __nonzero__ = __bool__
|
|
|
|
|
|
|
|
|
| def add_metaclass(metaclass):
|
| """Class decorator for creating a class with a metaclass."""
|
| def wrapper(cls):
|
| orig_vars = cls.__dict__.copy()
|
| slots = orig_vars.get('__slots__')
|
| if slots is not None:
|
| if isinstance(slots, str):
|
| slots = [slots]
|
| for slots_var in slots:
|
| orig_vars.pop(slots_var)
|
| orig_vars.pop('__dict__', None)
|
| orig_vars.pop('__weakref__', None)
|
| return metaclass(cls.__name__, cls.__bases__, orig_vars)
|
| return wrapper
|
|
|
|
|
| def raise_error_if_module_name_forbidden(full_module_name):
|
|
|
| if full_module_name == 'cython' or full_module_name.startswith('cython.'):
|
| raise ValueError('cython is a special module, cannot be used as a module name')
|
|
|
|
|
| def build_hex_version(version_string):
|
| """
|
| Parse and translate public version identifier like '4.3a1' into the readable hex representation '0x040300A1' (like PY_VERSION_HEX).
|
|
|
| SEE: https://peps.python.org/pep-0440/#public-version-identifiers
|
| """
|
|
|
|
|
| digits = []
|
| release_status = 0xF0
|
| for segment in re.split(r'(\D+)', version_string):
|
| if segment in ('a', 'b', 'rc'):
|
| release_status = {'a': 0xA0, 'b': 0xB0, 'rc': 0xC0}[segment]
|
| digits = (digits + [0, 0])[:3]
|
| elif segment in ('.dev', '.pre', '.post'):
|
| break
|
| elif segment != '.':
|
| digits.append(int(segment))
|
|
|
| digits = (digits + [0] * 3)[:4]
|
| digits[3] += release_status
|
|
|
|
|
| hexversion = 0
|
| for digit in digits:
|
| hexversion = (hexversion << 8) + digit
|
|
|
| return '0x%08X' % hexversion
|
|
|
|
|
| def write_depfile(target, source, dependencies):
|
| src_base_dir = os.path.dirname(source)
|
| cwd = os.getcwd()
|
| if not src_base_dir.endswith(os.sep):
|
| src_base_dir += os.sep
|
|
|
| paths = []
|
| for fname in dependencies:
|
| if fname.startswith(src_base_dir):
|
| try:
|
| newpath = os.path.relpath(fname, cwd)
|
| except ValueError:
|
|
|
| newpath = os.path.abspath(fname)
|
| else:
|
| newpath = os.path.abspath(fname)
|
| paths.append(newpath)
|
|
|
| depline = os.path.relpath(target, cwd) + ": \\\n "
|
| depline += " \\\n ".join(paths) + "\n"
|
|
|
| with open(target+'.dep', 'w') as outfile:
|
| outfile.write(depline)
|
|
|
|
|
| def print_version():
|
| print("Cython version %s" % cython_version)
|
|
|
|
|
| if sys.stderr.isatty() or sys.stdout == sys.stderr:
|
| return
|
| if os.fstat(1) == os.fstat(2):
|
|
|
|
|
| return
|
| sys.stderr.write("Cython version %s\n" % cython_version)
|
|
|
|
|
| def normalise_float_repr(float_str):
|
| """
|
| Generate a 'normalised', simple digits string representation of a float value
|
| to allow string comparisons. Examples: '.123', '123.456', '123.'
|
| """
|
| str_value = float_str.lower().lstrip('0')
|
|
|
| exp = 0
|
| if 'E' in str_value or 'e' in str_value:
|
| str_value, exp = str_value.split('E' if 'E' in str_value else 'e', 1)
|
| exp = int(exp)
|
|
|
| if '.' in str_value:
|
| num_int_digits = str_value.index('.')
|
| str_value = str_value[:num_int_digits] + str_value[num_int_digits + 1:]
|
| else:
|
| num_int_digits = len(str_value)
|
| exp += num_int_digits
|
|
|
| result = (
|
| str_value[:exp]
|
| + '0' * (exp - len(str_value))
|
| + '.'
|
| + '0' * -exp
|
| + str_value[exp:]
|
| ).rstrip('0')
|
|
|
| return result if result != '.' else '.0'
|
|
|