| | |
| | import warnings |
| | import types |
| | import collections |
| | import itertools |
| | from functools import lru_cache |
| | from typing import List, Union, Iterable |
| |
|
| | _bslash = chr(92) |
| |
|
| |
|
| | class __config_flags: |
| | """Internal class for defining compatibility and debugging flags""" |
| |
|
| | _all_names: List[str] = [] |
| | _fixed_names: List[str] = [] |
| | _type_desc = "configuration" |
| |
|
| | @classmethod |
| | def _set(cls, dname, value): |
| | if dname in cls._fixed_names: |
| | warnings.warn( |
| | "{}.{} {} is {} and cannot be overridden".format( |
| | cls.__name__, |
| | dname, |
| | cls._type_desc, |
| | str(getattr(cls, dname)).upper(), |
| | ) |
| | ) |
| | return |
| | if dname in cls._all_names: |
| | setattr(cls, dname, value) |
| | else: |
| | raise ValueError("no such {} {!r}".format(cls._type_desc, dname)) |
| |
|
| | enable = classmethod(lambda cls, name: cls._set(name, True)) |
| | disable = classmethod(lambda cls, name: cls._set(name, False)) |
| |
|
| |
|
| | @lru_cache(maxsize=128) |
| | def col(loc: int, strg: str) -> int: |
| | """ |
| | Returns current column within a string, counting newlines as line separators. |
| | The first column is number 1. |
| | |
| | Note: the default parsing behavior is to expand tabs in the input string |
| | before starting the parsing process. See |
| | :class:`ParserElement.parseString` for more |
| | information on parsing strings containing ``<TAB>`` s, and suggested |
| | methods to maintain a consistent view of the parsed string, the parse |
| | location, and line and column positions within the parsed string. |
| | """ |
| | s = strg |
| | return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc) |
| |
|
| |
|
| | @lru_cache(maxsize=128) |
| | def lineno(loc: int, strg: str) -> int: |
| | """Returns current line number within a string, counting newlines as line separators. |
| | The first line is number 1. |
| | |
| | Note - the default parsing behavior is to expand tabs in the input string |
| | before starting the parsing process. See :class:`ParserElement.parseString` |
| | for more information on parsing strings containing ``<TAB>`` s, and |
| | suggested methods to maintain a consistent view of the parsed string, the |
| | parse location, and line and column positions within the parsed string. |
| | """ |
| | return strg.count("\n", 0, loc) + 1 |
| |
|
| |
|
| | @lru_cache(maxsize=128) |
| | def line(loc: int, strg: str) -> str: |
| | """ |
| | Returns the line of text containing loc within a string, counting newlines as line separators. |
| | """ |
| | last_cr = strg.rfind("\n", 0, loc) |
| | next_cr = strg.find("\n", loc) |
| | return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :] |
| |
|
| |
|
| | class _UnboundedCache: |
| | def __init__(self): |
| | cache = {} |
| | cache_get = cache.get |
| | self.not_in_cache = not_in_cache = object() |
| |
|
| | def get(_, key): |
| | return cache_get(key, not_in_cache) |
| |
|
| | def set_(_, key, value): |
| | cache[key] = value |
| |
|
| | def clear(_): |
| | cache.clear() |
| |
|
| | self.size = None |
| | self.get = types.MethodType(get, self) |
| | self.set = types.MethodType(set_, self) |
| | self.clear = types.MethodType(clear, self) |
| |
|
| |
|
| | class _FifoCache: |
| | def __init__(self, size): |
| | self.not_in_cache = not_in_cache = object() |
| | cache = collections.OrderedDict() |
| | cache_get = cache.get |
| |
|
| | def get(_, key): |
| | return cache_get(key, not_in_cache) |
| |
|
| | def set_(_, key, value): |
| | cache[key] = value |
| | while len(cache) > size: |
| | cache.popitem(last=False) |
| |
|
| | def clear(_): |
| | cache.clear() |
| |
|
| | self.size = size |
| | self.get = types.MethodType(get, self) |
| | self.set = types.MethodType(set_, self) |
| | self.clear = types.MethodType(clear, self) |
| |
|
| |
|
| | class LRUMemo: |
| | """ |
| | A memoizing mapping that retains `capacity` deleted items |
| | |
| | The memo tracks retained items by their access order; once `capacity` items |
| | are retained, the least recently used item is discarded. |
| | """ |
| |
|
| | def __init__(self, capacity): |
| | self._capacity = capacity |
| | self._active = {} |
| | self._memory = collections.OrderedDict() |
| |
|
| | def __getitem__(self, key): |
| | try: |
| | return self._active[key] |
| | except KeyError: |
| | self._memory.move_to_end(key) |
| | return self._memory[key] |
| |
|
| | def __setitem__(self, key, value): |
| | self._memory.pop(key, None) |
| | self._active[key] = value |
| |
|
| | def __delitem__(self, key): |
| | try: |
| | value = self._active.pop(key) |
| | except KeyError: |
| | pass |
| | else: |
| | while len(self._memory) >= self._capacity: |
| | self._memory.popitem(last=False) |
| | self._memory[key] = value |
| |
|
| | def clear(self): |
| | self._active.clear() |
| | self._memory.clear() |
| |
|
| |
|
| | class UnboundedMemo(dict): |
| | """ |
| | A memoizing mapping that retains all deleted items |
| | """ |
| |
|
| | def __delitem__(self, key): |
| | pass |
| |
|
| |
|
| | def _escape_regex_range_chars(s: str) -> str: |
| | |
| | for c in r"\^-[]": |
| | s = s.replace(c, _bslash + c) |
| | s = s.replace("\n", r"\n") |
| | s = s.replace("\t", r"\t") |
| | return str(s) |
| |
|
| |
|
| | def _collapse_string_to_ranges( |
| | s: Union[str, Iterable[str]], re_escape: bool = True |
| | ) -> str: |
| | def is_consecutive(c): |
| | c_int = ord(c) |
| | is_consecutive.prev, prev = c_int, is_consecutive.prev |
| | if c_int - prev > 1: |
| | is_consecutive.value = next(is_consecutive.counter) |
| | return is_consecutive.value |
| |
|
| | is_consecutive.prev = 0 |
| | is_consecutive.counter = itertools.count() |
| | is_consecutive.value = -1 |
| |
|
| | def escape_re_range_char(c): |
| | return "\\" + c if c in r"\^-][" else c |
| |
|
| | def no_escape_re_range_char(c): |
| | return c |
| |
|
| | if not re_escape: |
| | escape_re_range_char = no_escape_re_range_char |
| |
|
| | ret = [] |
| | s = "".join(sorted(set(s))) |
| | if len(s) > 3: |
| | for _, chars in itertools.groupby(s, key=is_consecutive): |
| | first = last = next(chars) |
| | last = collections.deque( |
| | itertools.chain(iter([last]), chars), maxlen=1 |
| | ).pop() |
| | if first == last: |
| | ret.append(escape_re_range_char(first)) |
| | else: |
| | sep = "" if ord(last) == ord(first) + 1 else "-" |
| | ret.append( |
| | "{}{}{}".format( |
| | escape_re_range_char(first), sep, escape_re_range_char(last) |
| | ) |
| | ) |
| | else: |
| | ret = [escape_re_range_char(c) for c in s] |
| |
|
| | return "".join(ret) |
| |
|
| |
|
| | def _flatten(ll: list) -> list: |
| | ret = [] |
| | for i in ll: |
| | if isinstance(i, list): |
| | ret.extend(_flatten(i)) |
| | else: |
| | ret.append(i) |
| | return ret |
| |
|