| | |
| | |
| | |
| | |
| | from __future__ import annotations |
| |
|
| | import collections.abc |
| | import sys |
| | import textwrap |
| | import traceback |
| | from functools import singledispatch |
| | from types import TracebackType |
| | from typing import Any, List, Optional |
| |
|
| | from ._exceptions import BaseExceptionGroup |
| |
|
| | max_group_width = 15 |
| | max_group_depth = 10 |
| | _cause_message = ( |
| | "\nThe above exception was the direct cause of the following exception:\n\n" |
| | ) |
| |
|
| | _context_message = ( |
| | "\nDuring handling of the above exception, another exception occurred:\n\n" |
| | ) |
| |
|
| |
|
| | def _format_final_exc_line(etype, value): |
| | valuestr = _safe_string(value, "exception") |
| | if value is None or not valuestr: |
| | line = f"{etype}\n" |
| | else: |
| | line = f"{etype}: {valuestr}\n" |
| |
|
| | return line |
| |
|
| |
|
| | def _safe_string(value, what, func=str): |
| | try: |
| | return func(value) |
| | except BaseException: |
| | return f"<{what} {func.__name__}() failed>" |
| |
|
| |
|
| | class _ExceptionPrintContext: |
| | def __init__(self): |
| | self.seen = set() |
| | self.exception_group_depth = 0 |
| | self.need_close = False |
| |
|
| | def indent(self): |
| | return " " * (2 * self.exception_group_depth) |
| |
|
| | def emit(self, text_gen, margin_char=None): |
| | if margin_char is None: |
| | margin_char = "|" |
| | indent_str = self.indent() |
| | if self.exception_group_depth: |
| | indent_str += margin_char + " " |
| |
|
| | if isinstance(text_gen, str): |
| | yield textwrap.indent(text_gen, indent_str, lambda line: True) |
| | else: |
| | for text in text_gen: |
| | yield textwrap.indent(text, indent_str, lambda line: True) |
| |
|
| |
|
| | def exceptiongroup_excepthook( |
| | etype: type[BaseException], value: BaseException, tb: TracebackType | None |
| | ) -> None: |
| | sys.stderr.write("".join(traceback.format_exception(etype, value, tb))) |
| |
|
| |
|
| | class PatchedTracebackException(traceback.TracebackException): |
| | def __init__( |
| | self, |
| | exc_type: type[BaseException], |
| | exc_value: BaseException, |
| | exc_traceback: TracebackType | None, |
| | *, |
| | limit: int | None = None, |
| | lookup_lines: bool = True, |
| | capture_locals: bool = False, |
| | compact: bool = False, |
| | _seen: set[int] | None = None, |
| | ) -> None: |
| | kwargs: dict[str, Any] = {} |
| | if sys.version_info >= (3, 10): |
| | kwargs["compact"] = compact |
| |
|
| | is_recursive_call = _seen is not None |
| | if _seen is None: |
| | _seen = set() |
| | _seen.add(id(exc_value)) |
| |
|
| | self.stack = traceback.StackSummary.extract( |
| | traceback.walk_tb(exc_traceback), |
| | limit=limit, |
| | lookup_lines=lookup_lines, |
| | capture_locals=capture_locals, |
| | ) |
| | self.exc_type = exc_type |
| | |
| | |
| | self._str = _safe_string(exc_value, "exception") |
| | try: |
| | self.__notes__ = getattr(exc_value, "__notes__", None) |
| | except KeyError: |
| | |
| | |
| | HTTPError = getattr(sys.modules.get("urllib.error", None), "HTTPError", ()) |
| | if sys.version_info[:2] <= (3, 11) and isinstance(exc_value, HTTPError): |
| | self.__notes__ = None |
| | else: |
| | raise |
| |
|
| | if exc_type and issubclass(exc_type, SyntaxError): |
| | |
| | self.filename = exc_value.filename |
| | lno = exc_value.lineno |
| | self.lineno = str(lno) if lno is not None else None |
| | self.text = exc_value.text |
| | self.offset = exc_value.offset |
| | self.msg = exc_value.msg |
| | if sys.version_info >= (3, 10): |
| | end_lno = exc_value.end_lineno |
| | self.end_lineno = str(end_lno) if end_lno is not None else None |
| | self.end_offset = exc_value.end_offset |
| | elif ( |
| | exc_type |
| | and issubclass(exc_type, (NameError, AttributeError)) |
| | and getattr(exc_value, "name", None) is not None |
| | ): |
| | suggestion = _compute_suggestion_error(exc_value, exc_traceback) |
| | if suggestion: |
| | self._str += f". Did you mean: '{suggestion}'?" |
| |
|
| | if lookup_lines: |
| | |
| | for frame in self.stack: |
| | frame.line |
| |
|
| | self.__suppress_context__ = ( |
| | exc_value.__suppress_context__ if exc_value is not None else False |
| | ) |
| |
|
| | |
| | |
| | if not is_recursive_call: |
| | queue = [(self, exc_value)] |
| | while queue: |
| | te, e = queue.pop() |
| |
|
| | if e and e.__cause__ is not None and id(e.__cause__) not in _seen: |
| | cause = PatchedTracebackException( |
| | type(e.__cause__), |
| | e.__cause__, |
| | e.__cause__.__traceback__, |
| | limit=limit, |
| | lookup_lines=lookup_lines, |
| | capture_locals=capture_locals, |
| | _seen=_seen, |
| | ) |
| | else: |
| | cause = None |
| |
|
| | if compact: |
| | need_context = ( |
| | cause is None and e is not None and not e.__suppress_context__ |
| | ) |
| | else: |
| | need_context = True |
| | if ( |
| | e |
| | and e.__context__ is not None |
| | and need_context |
| | and id(e.__context__) not in _seen |
| | ): |
| | context = PatchedTracebackException( |
| | type(e.__context__), |
| | e.__context__, |
| | e.__context__.__traceback__, |
| | limit=limit, |
| | lookup_lines=lookup_lines, |
| | capture_locals=capture_locals, |
| | _seen=_seen, |
| | ) |
| | else: |
| | context = None |
| |
|
| | |
| | |
| | if e and isinstance(e, BaseExceptionGroup): |
| | exceptions = [] |
| | for exc in e.exceptions: |
| | texc = PatchedTracebackException( |
| | type(exc), |
| | exc, |
| | exc.__traceback__, |
| | lookup_lines=lookup_lines, |
| | capture_locals=capture_locals, |
| | _seen=_seen, |
| | ) |
| | exceptions.append(texc) |
| | else: |
| | exceptions = None |
| |
|
| | te.__cause__ = cause |
| | te.__context__ = context |
| | te.exceptions = exceptions |
| | if cause: |
| | queue.append((te.__cause__, e.__cause__)) |
| | if context: |
| | queue.append((te.__context__, e.__context__)) |
| | if exceptions: |
| | queue.extend(zip(te.exceptions, e.exceptions)) |
| |
|
| | def format(self, *, chain=True, _ctx=None, **kwargs): |
| | if _ctx is None: |
| | _ctx = _ExceptionPrintContext() |
| |
|
| | output = [] |
| | exc = self |
| | if chain: |
| | while exc: |
| | if exc.__cause__ is not None: |
| | chained_msg = _cause_message |
| | chained_exc = exc.__cause__ |
| | elif exc.__context__ is not None and not exc.__suppress_context__: |
| | chained_msg = _context_message |
| | chained_exc = exc.__context__ |
| | else: |
| | chained_msg = None |
| | chained_exc = None |
| |
|
| | output.append((chained_msg, exc)) |
| | exc = chained_exc |
| | else: |
| | output.append((None, exc)) |
| |
|
| | for msg, exc in reversed(output): |
| | if msg is not None: |
| | yield from _ctx.emit(msg) |
| | if getattr(exc, "exceptions", None) is None: |
| | if exc.stack: |
| | yield from _ctx.emit("Traceback (most recent call last):\n") |
| | yield from _ctx.emit(exc.stack.format()) |
| | yield from _ctx.emit(exc.format_exception_only()) |
| | elif _ctx.exception_group_depth > max_group_depth: |
| | |
| | yield from _ctx.emit(f"... (max_group_depth is {max_group_depth})\n") |
| | else: |
| | |
| | is_toplevel = _ctx.exception_group_depth == 0 |
| | if is_toplevel: |
| | _ctx.exception_group_depth += 1 |
| |
|
| | if exc.stack: |
| | yield from _ctx.emit( |
| | "Exception Group Traceback (most recent call last):\n", |
| | margin_char="+" if is_toplevel else None, |
| | ) |
| | yield from _ctx.emit(exc.stack.format()) |
| |
|
| | yield from _ctx.emit(exc.format_exception_only()) |
| | num_excs = len(exc.exceptions) |
| | if num_excs <= max_group_width: |
| | n = num_excs |
| | else: |
| | n = max_group_width + 1 |
| | _ctx.need_close = False |
| | for i in range(n): |
| | last_exc = i == n - 1 |
| | if last_exc: |
| | |
| | _ctx.need_close = True |
| |
|
| | if max_group_width is not None: |
| | truncated = i >= max_group_width |
| | else: |
| | truncated = False |
| | title = f"{i + 1}" if not truncated else "..." |
| | yield ( |
| | _ctx.indent() |
| | + ("+-" if i == 0 else " ") |
| | + f"+---------------- {title} ----------------\n" |
| | ) |
| | _ctx.exception_group_depth += 1 |
| | if not truncated: |
| | yield from exc.exceptions[i].format(chain=chain, _ctx=_ctx) |
| | else: |
| | remaining = num_excs - max_group_width |
| | plural = "s" if remaining > 1 else "" |
| | yield from _ctx.emit( |
| | f"and {remaining} more exception{plural}\n" |
| | ) |
| |
|
| | if last_exc and _ctx.need_close: |
| | yield _ctx.indent() + "+------------------------------------\n" |
| | _ctx.need_close = False |
| | _ctx.exception_group_depth -= 1 |
| |
|
| | if is_toplevel: |
| | assert _ctx.exception_group_depth == 1 |
| | _ctx.exception_group_depth = 0 |
| |
|
| | def format_exception_only(self, **kwargs): |
| | """Format the exception part of the traceback. |
| | The return value is a generator of strings, each ending in a newline. |
| | Normally, the generator emits a single string; however, for |
| | SyntaxError exceptions, it emits several lines that (when |
| | printed) display detailed information about where the syntax |
| | error occurred. |
| | The message indicating which exception occurred is always the last |
| | string in the output. |
| | """ |
| | if self.exc_type is None: |
| | yield traceback._format_final_exc_line(None, self._str) |
| | return |
| |
|
| | stype = self.exc_type.__qualname__ |
| | smod = self.exc_type.__module__ |
| | if smod not in ("__main__", "builtins"): |
| | if not isinstance(smod, str): |
| | smod = "<unknown>" |
| | stype = smod + "." + stype |
| |
|
| | if not issubclass(self.exc_type, SyntaxError): |
| | yield _format_final_exc_line(stype, self._str) |
| | elif traceback_exception_format_syntax_error is not None: |
| | yield from traceback_exception_format_syntax_error(self, stype) |
| | else: |
| | yield from traceback_exception_original_format_exception_only(self) |
| |
|
| | notes = getattr(self, "__notes__", None) |
| | if isinstance(notes, collections.abc.Sequence): |
| | for note in notes: |
| | note = _safe_string(note, "note") |
| | yield from [line + "\n" for line in note.split("\n")] |
| | elif notes is not None: |
| | yield _safe_string(notes, "__notes__", func=repr) |
| |
|
| |
|
| | traceback_exception_original_format = traceback.TracebackException.format |
| | traceback_exception_original_format_exception_only = ( |
| | traceback.TracebackException.format_exception_only |
| | ) |
| | traceback_exception_format_syntax_error = getattr( |
| | traceback.TracebackException, "_format_syntax_error", None |
| | ) |
| | if sys.excepthook is sys.__excepthook__: |
| | traceback.TracebackException.__init__ = ( |
| | PatchedTracebackException.__init__ |
| | ) |
| | traceback.TracebackException.format = ( |
| | PatchedTracebackException.format |
| | ) |
| | traceback.TracebackException.format_exception_only = ( |
| | PatchedTracebackException.format_exception_only |
| | ) |
| | sys.excepthook = exceptiongroup_excepthook |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | if getattr(sys.excepthook, "__name__", None) in ( |
| | "apport_excepthook", |
| | |
| | "partial_apport_excepthook", |
| | ): |
| | |
| | traceback.TracebackException.__init__ = ( |
| | PatchedTracebackException.__init__ |
| | ) |
| | traceback.TracebackException.format = ( |
| | PatchedTracebackException.format |
| | ) |
| | traceback.TracebackException.format_exception_only = ( |
| | PatchedTracebackException.format_exception_only |
| | ) |
| |
|
| | from types import ModuleType |
| |
|
| | import apport_python_hook |
| |
|
| | |
| | fake_sys = ModuleType("exceptiongroup_fake_sys") |
| | fake_sys.__dict__.update(sys.__dict__) |
| | fake_sys.__excepthook__ = exceptiongroup_excepthook |
| | apport_python_hook.sys = fake_sys |
| |
|
| |
|
| | @singledispatch |
| | def format_exception_only(__exc: BaseException, **kwargs: Any) -> List[str]: |
| | return list( |
| | PatchedTracebackException( |
| | type(__exc), __exc, None, compact=True |
| | ).format_exception_only() |
| | ) |
| |
|
| |
|
| | @format_exception_only.register |
| | def _(__exc: type, value: BaseException, **kwargs: Any) -> List[str]: |
| | return format_exception_only(value) |
| |
|
| |
|
| | @singledispatch |
| | def format_exception( |
| | __exc: BaseException, limit: Optional[int] = None, chain: bool = True, **kwargs: Any |
| | ) -> List[str]: |
| | return list( |
| | PatchedTracebackException( |
| | type(__exc), __exc, __exc.__traceback__, limit=limit, compact=True |
| | ).format(chain=chain) |
| | ) |
| |
|
| |
|
| | @format_exception.register |
| | def _( |
| | __exc: type, |
| | value: BaseException, |
| | tb: TracebackType, |
| | limit: Optional[int] = None, |
| | chain: bool = True, |
| | **kwargs: Any, |
| | ) -> List[str]: |
| | return format_exception(value, limit, chain) |
| |
|
| |
|
| | @singledispatch |
| | def print_exception( |
| | __exc: BaseException, |
| | limit: Optional[int] = None, |
| | file: Any = None, |
| | chain: bool = True, |
| | **kwargs: Any, |
| | ) -> None: |
| | if file is None: |
| | file = sys.stderr |
| |
|
| | for line in PatchedTracebackException( |
| | type(__exc), __exc, __exc.__traceback__, limit=limit |
| | ).format(chain=chain): |
| | print(line, file=file, end="") |
| |
|
| |
|
| | @print_exception.register |
| | def _( |
| | __exc: type, |
| | value: BaseException, |
| | tb: TracebackType, |
| | limit: Optional[int] = None, |
| | file: Any = None, |
| | chain: bool = True, |
| | ) -> None: |
| | print_exception(value, limit, file, chain) |
| |
|
| |
|
| | def print_exc( |
| | limit: Optional[int] = None, |
| | file: Any | None = None, |
| | chain: bool = True, |
| | ) -> None: |
| | value = sys.exc_info()[1] |
| | print_exception(value, limit, file, chain) |
| |
|
| |
|
| | |
| | |
| |
|
| | _MAX_CANDIDATE_ITEMS = 750 |
| | _MAX_STRING_SIZE = 40 |
| | _MOVE_COST = 2 |
| | _CASE_COST = 1 |
| | _SENTINEL = object() |
| |
|
| |
|
| | def _substitution_cost(ch_a, ch_b): |
| | if ch_a == ch_b: |
| | return 0 |
| | if ch_a.lower() == ch_b.lower(): |
| | return _CASE_COST |
| | return _MOVE_COST |
| |
|
| |
|
| | def _compute_suggestion_error(exc_value, tb): |
| | wrong_name = getattr(exc_value, "name", None) |
| | if wrong_name is None or not isinstance(wrong_name, str): |
| | return None |
| | if isinstance(exc_value, AttributeError): |
| | obj = getattr(exc_value, "obj", _SENTINEL) |
| | if obj is _SENTINEL: |
| | return None |
| | obj = exc_value.obj |
| | try: |
| | d = dir(obj) |
| | except Exception: |
| | return None |
| | else: |
| | assert isinstance(exc_value, NameError) |
| | |
| | if tb is None: |
| | return None |
| | while tb.tb_next is not None: |
| | tb = tb.tb_next |
| | frame = tb.tb_frame |
| |
|
| | d = list(frame.f_locals) + list(frame.f_globals) + list(frame.f_builtins) |
| | if len(d) > _MAX_CANDIDATE_ITEMS: |
| | return None |
| | wrong_name_len = len(wrong_name) |
| | if wrong_name_len > _MAX_STRING_SIZE: |
| | return None |
| | best_distance = wrong_name_len |
| | suggestion = None |
| | for possible_name in d: |
| | if possible_name == wrong_name: |
| | |
| | continue |
| | |
| | max_distance = (len(possible_name) + wrong_name_len + 3) * _MOVE_COST // 6 |
| | |
| | max_distance = min(max_distance, best_distance - 1) |
| | current_distance = _levenshtein_distance( |
| | wrong_name, possible_name, max_distance |
| | ) |
| | if current_distance > max_distance: |
| | continue |
| | if not suggestion or current_distance < best_distance: |
| | suggestion = possible_name |
| | best_distance = current_distance |
| | return suggestion |
| |
|
| |
|
| | def _levenshtein_distance(a, b, max_cost): |
| | |
| |
|
| | |
| | if a == b: |
| | return 0 |
| |
|
| | |
| | pre = 0 |
| | while a[pre:] and b[pre:] and a[pre] == b[pre]: |
| | pre += 1 |
| | a = a[pre:] |
| | b = b[pre:] |
| | post = 0 |
| | while a[: post or None] and b[: post or None] and a[post - 1] == b[post - 1]: |
| | post -= 1 |
| | a = a[: post or None] |
| | b = b[: post or None] |
| | if not a or not b: |
| | return _MOVE_COST * (len(a) + len(b)) |
| | if len(a) > _MAX_STRING_SIZE or len(b) > _MAX_STRING_SIZE: |
| | return max_cost + 1 |
| |
|
| | |
| | if len(b) < len(a): |
| | a, b = b, a |
| |
|
| | |
| | if (len(b) - len(a)) * _MOVE_COST > max_cost: |
| | return max_cost + 1 |
| |
|
| | |
| | |
| | |
| | row = list(range(_MOVE_COST, _MOVE_COST * (len(a) + 1), _MOVE_COST)) |
| |
|
| | result = 0 |
| | for bindex in range(len(b)): |
| | bchar = b[bindex] |
| | distance = result = bindex * _MOVE_COST |
| | minimum = sys.maxsize |
| | for index in range(len(a)): |
| | |
| | substitute = distance + _substitution_cost(bchar, a[index]) |
| | |
| | distance = row[index] |
| | |
| |
|
| | insert_delete = min(result, distance) + _MOVE_COST |
| | result = min(insert_delete, substitute) |
| |
|
| | |
| | row[index] = result |
| | if result < minimum: |
| | minimum = result |
| | if minimum > max_cost: |
| | |
| | return max_cost + 1 |
| | return result |
| |
|