Spaces:
Runtime error
Runtime error
| # util.py | |
| import inspect | |
| import warnings | |
| import types | |
| import collections | |
| import itertools | |
| from functools import lru_cache, wraps | |
| from typing import Callable, List, Union, Iterable, TypeVar, cast | |
| _bslash = chr(92) | |
| C = TypeVar("C", bound=Callable) | |
| class __config_flags: | |
| """Internal class for defining compatibility and debugging flags""" | |
| _all_names: List[str] = [] | |
| _fixed_names: List[str] = [] | |
| _type_desc = "configuration" | |
| def _set(cls, dname, value): | |
| if dname in cls._fixed_names: | |
| warnings.warn( | |
| f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}" | |
| f" and cannot be overridden", | |
| stacklevel=3, | |
| ) | |
| return | |
| if dname in cls._all_names: | |
| setattr(cls, dname, value) | |
| else: | |
| raise ValueError(f"no such {cls._type_desc} {dname!r}") | |
| enable = classmethod(lambda cls, name: cls._set(name, True)) | |
| disable = classmethod(lambda cls, name: cls._set(name, False)) | |
| def col(loc: int, strg: str) -> int: | |
| """ | |
| Returns current column within a string, counting newlines as line separators. | |
| The first column is number 1. | |
| Note: the default parsing behavior is to expand tabs in the input string | |
| before starting the parsing process. See | |
| :class:`ParserElement.parse_string` for more | |
| information on parsing strings containing ``<TAB>`` s, and suggested | |
| methods to maintain a consistent view of the parsed string, the parse | |
| location, and line and column positions within the parsed string. | |
| """ | |
| s = strg | |
| return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc) | |
| def lineno(loc: int, strg: str) -> int: | |
| """Returns current line number within a string, counting newlines as line separators. | |
| The first line is number 1. | |
| Note - the default parsing behavior is to expand tabs in the input string | |
| before starting the parsing process. See :class:`ParserElement.parse_string` | |
| for more information on parsing strings containing ``<TAB>`` s, and | |
| suggested methods to maintain a consistent view of the parsed string, the | |
| parse location, and line and column positions within the parsed string. | |
| """ | |
| return strg.count("\n", 0, loc) + 1 | |
| def line(loc: int, strg: str) -> str: | |
| """ | |
| Returns the line of text containing loc within a string, counting newlines as line separators. | |
| """ | |
| last_cr = strg.rfind("\n", 0, loc) | |
| next_cr = strg.find("\n", loc) | |
| return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :] | |
| class _UnboundedCache: | |
| def __init__(self): | |
| cache = {} | |
| cache_get = cache.get | |
| self.not_in_cache = not_in_cache = object() | |
| def get(_, key): | |
| return cache_get(key, not_in_cache) | |
| def set_(_, key, value): | |
| cache[key] = value | |
| def clear(_): | |
| cache.clear() | |
| self.size = None | |
| self.get = types.MethodType(get, self) | |
| self.set = types.MethodType(set_, self) | |
| self.clear = types.MethodType(clear, self) | |
| class _FifoCache: | |
| def __init__(self, size): | |
| self.not_in_cache = not_in_cache = object() | |
| cache = {} | |
| keyring = [object()] * size | |
| cache_get = cache.get | |
| cache_pop = cache.pop | |
| keyiter = itertools.cycle(range(size)) | |
| def get(_, key): | |
| return cache_get(key, not_in_cache) | |
| def set_(_, key, value): | |
| cache[key] = value | |
| i = next(keyiter) | |
| cache_pop(keyring[i], None) | |
| keyring[i] = key | |
| def clear(_): | |
| cache.clear() | |
| keyring[:] = [object()] * size | |
| self.size = size | |
| self.get = types.MethodType(get, self) | |
| self.set = types.MethodType(set_, self) | |
| self.clear = types.MethodType(clear, self) | |
| class LRUMemo: | |
| """ | |
| A memoizing mapping that retains `capacity` deleted items | |
| The memo tracks retained items by their access order; once `capacity` items | |
| are retained, the least recently used item is discarded. | |
| """ | |
| def __init__(self, capacity): | |
| self._capacity = capacity | |
| self._active = {} | |
| self._memory = collections.OrderedDict() | |
| def __getitem__(self, key): | |
| try: | |
| return self._active[key] | |
| except KeyError: | |
| self._memory.move_to_end(key) | |
| return self._memory[key] | |
| def __setitem__(self, key, value): | |
| self._memory.pop(key, None) | |
| self._active[key] = value | |
| def __delitem__(self, key): | |
| try: | |
| value = self._active.pop(key) | |
| except KeyError: | |
| pass | |
| else: | |
| while len(self._memory) >= self._capacity: | |
| self._memory.popitem(last=False) | |
| self._memory[key] = value | |
| def clear(self): | |
| self._active.clear() | |
| self._memory.clear() | |
| class UnboundedMemo(dict): | |
| """ | |
| A memoizing mapping that retains all deleted items | |
| """ | |
| def __delitem__(self, key): | |
| pass | |
| def _escape_regex_range_chars(s: str) -> str: | |
| # escape these chars: ^-[] | |
| for c in r"\^-[]": | |
| s = s.replace(c, _bslash + c) | |
| s = s.replace("\n", r"\n") | |
| s = s.replace("\t", r"\t") | |
| return str(s) | |
| def _collapse_string_to_ranges( | |
| s: Union[str, Iterable[str]], re_escape: bool = True | |
| ) -> str: | |
| def is_consecutive(c): | |
| c_int = ord(c) | |
| is_consecutive.prev, prev = c_int, is_consecutive.prev | |
| if c_int - prev > 1: | |
| is_consecutive.value = next(is_consecutive.counter) | |
| return is_consecutive.value | |
| is_consecutive.prev = 0 # type: ignore [attr-defined] | |
| is_consecutive.counter = itertools.count() # type: ignore [attr-defined] | |
| is_consecutive.value = -1 # type: ignore [attr-defined] | |
| def escape_re_range_char(c): | |
| return "\\" + c if c in r"\^-][" else c | |
| def no_escape_re_range_char(c): | |
| return c | |
| if not re_escape: | |
| escape_re_range_char = no_escape_re_range_char | |
| ret = [] | |
| s = "".join(sorted(set(s))) | |
| if len(s) > 3: | |
| for _, chars in itertools.groupby(s, key=is_consecutive): | |
| first = last = next(chars) | |
| last = collections.deque( | |
| itertools.chain(iter([last]), chars), maxlen=1 | |
| ).pop() | |
| if first == last: | |
| ret.append(escape_re_range_char(first)) | |
| else: | |
| sep = "" if ord(last) == ord(first) + 1 else "-" | |
| ret.append( | |
| f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}" | |
| ) | |
| else: | |
| ret = [escape_re_range_char(c) for c in s] | |
| return "".join(ret) | |
| def _flatten(ll: list) -> list: | |
| ret = [] | |
| for i in ll: | |
| if isinstance(i, list): | |
| ret.extend(_flatten(i)) | |
| else: | |
| ret.append(i) | |
| return ret | |
| def _make_synonym_function(compat_name: str, fn: C) -> C: | |
| # In a future version, uncomment the code in the internal _inner() functions | |
| # to begin emitting DeprecationWarnings. | |
| # Unwrap staticmethod/classmethod | |
| fn = getattr(fn, "__func__", fn) | |
| # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take | |
| # some extra steps to add it if present in decorated function.) | |
| if "self" == list(inspect.signature(fn).parameters)[0]: | |
| def _inner(self, *args, **kwargs): | |
| # warnings.warn( | |
| # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3 | |
| # ) | |
| return fn(self, *args, **kwargs) | |
| else: | |
| def _inner(*args, **kwargs): | |
| # warnings.warn( | |
| # f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=3 | |
| # ) | |
| return fn(*args, **kwargs) | |
| _inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`""" | |
| _inner.__name__ = compat_name | |
| _inner.__annotations__ = fn.__annotations__ | |
| if isinstance(fn, types.FunctionType): | |
| _inner.__kwdefaults__ = fn.__kwdefaults__ | |
| elif isinstance(fn, type) and hasattr(fn, "__init__"): | |
| _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ | |
| else: | |
| _inner.__kwdefaults__ = None | |
| _inner.__qualname__ = fn.__qualname__ | |
| return cast(C, _inner) | |
| def replaced_by_pep8(fn: C) -> Callable[[Callable], C]: | |
| """ | |
| Decorator for pre-PEP8 compatibility synonyms, to link them to the new function. | |
| """ | |
| return lambda other: _make_synonym_function(other.__name__, fn) | |