File size: 7,182 Bytes
7344bef | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | from __future__ import annotations
import atexit
import faulthandler
import os
import platform
import sys
import threading
import time
import traceback
from pathlib import Path
_LOCK = threading.RLock()
_STATE = {
"installed": False,
"log_file": None,
"log_path": None,
"stdout": None,
"stderr": None,
"prev_excepthook": None,
"prev_threading_excepthook": None,
"prev_unraisablehook": None,
}
class _TeeStream:
def __init__(self, stream, log_file):
self._stream = stream
self._log_file = log_file
def write(self, data):
if not data:
return 0
text = data.decode("utf-8", errors="replace") if isinstance(data, (bytes, bytearray)) else str(data)
with _LOCK:
written = _safe_write(self._stream, text)
_safe_write(self._log_file, text)
if "\n" in text or "\r" in text:
_safe_flush(self._stream)
_safe_flush(self._log_file)
return written if written is not None else len(text)
def flush(self):
with _LOCK:
_safe_flush(self._stream)
_safe_flush(self._log_file)
def isatty(self):
return bool(getattr(self._stream, "isatty", lambda: False)())
def fileno(self):
return getattr(self._stream, "fileno")()
def writable(self):
return bool(getattr(self._stream, "writable", lambda: True)())
@property
def encoding(self):
return getattr(self._stream, "encoding", "utf-8")
@property
def errors(self):
return getattr(self._stream, "errors", "strict")
def __getattr__(self, name):
return getattr(self._stream, name)
def _safe_write(stream, text):
if stream is None:
return None
try:
return stream.write(text)
except Exception:
return None
def _safe_flush(stream):
if stream is None:
return
try:
stream.flush()
except Exception:
return
def _flush_log(sync=False):
log_file = _STATE.get("log_file")
if log_file is None:
return
_safe_flush(log_file)
if sync:
try:
os.fsync(log_file.fileno())
except Exception:
return
def _timestamp():
return time.strftime("%Y-%m-%d %H:%M:%S")
def _log_line(message, sync=False):
log_file = _STATE.get("log_file")
if log_file is None:
return
with _LOCK:
_safe_write(log_file, f"[crash-diagnostics {_timestamp()}] {message}\n")
_flush_log(sync=sync)
def _log_trace(prefix, exc_type, exc_value, exc_traceback):
_log_line(prefix, sync=True)
formatted = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)).rstrip()
if formatted:
with _LOCK:
_safe_write(_STATE.get("log_file"), f"{formatted}\n")
_flush_log(sync=True)
def _install_stream_tees(log_file):
if _STATE["stdout"] is None:
_STATE["stdout"] = sys.stdout
if _STATE["stderr"] is None:
_STATE["stderr"] = sys.stderr
if sys.stdout is _STATE["stdout"]:
sys.stdout = _TeeStream(sys.stdout, log_file)
if sys.stderr is _STATE["stderr"]:
sys.stderr = _TeeStream(sys.stderr, log_file)
def _install_exception_hooks():
prev_excepthook = sys.excepthook
_STATE["prev_excepthook"] = prev_excepthook
def _excepthook(exc_type, exc_value, exc_traceback):
_log_trace(f"Unhandled exception in main thread '{threading.current_thread().name}'", exc_type, exc_value, exc_traceback)
if prev_excepthook not in (None, _excepthook):
try:
prev_excepthook(exc_type, exc_value, exc_traceback)
return
except Exception:
pass
sys.__excepthook__(exc_type, exc_value, exc_traceback)
sys.excepthook = _excepthook
if hasattr(threading, "excepthook"):
prev_threading_excepthook = threading.excepthook
_STATE["prev_threading_excepthook"] = prev_threading_excepthook
def _threading_excepthook(args):
thread_name = getattr(args.thread, "name", "unknown")
_log_trace(f"Unhandled exception in thread '{thread_name}'", args.exc_type, args.exc_value, args.exc_traceback)
if prev_threading_excepthook not in (None, _threading_excepthook):
try:
prev_threading_excepthook(args)
except Exception:
return
threading.excepthook = _threading_excepthook
if hasattr(sys, "unraisablehook"):
prev_unraisablehook = sys.unraisablehook
_STATE["prev_unraisablehook"] = prev_unraisablehook
def _unraisablehook(unraisable):
obj_name = type(unraisable.object).__name__ if getattr(unraisable, "object", None) is not None else "None"
err_msg = getattr(unraisable, "err_msg", None) or "Unraisable exception"
_log_line(f"{err_msg} on object type '{obj_name}'", sync=True)
_log_trace("Unraisable exception traceback", unraisable.exc_type, unraisable.exc_value, unraisable.exc_traceback)
if prev_unraisablehook not in (None, _unraisablehook):
try:
prev_unraisablehook(unraisable)
except Exception:
return
sys.unraisablehook = _unraisablehook
def _install_faulthandler(log_file):
try:
faulthandler.enable(file=log_file, all_threads=True)
_log_line("faulthandler enabled for fatal native crashes", sync=True)
except Exception as exc:
_log_line(f"Failed to enable faulthandler: {exc}", sync=True)
def _log_startup_context(anchor_path):
_log_line(f"Crash diagnostics active. Log file: {_STATE['log_path']}", sync=True)
_log_line(f"PID={os.getpid()} PPID={os.getppid() if hasattr(os, 'getppid') else 'n/a'}", sync=True)
_log_line(f"Python={sys.version.splitlines()[0]}", sync=True)
_log_line(f"Platform={platform.platform()}", sync=True)
_log_line(f"CWD={os.getcwd()}", sync=True)
_log_line(f"Entry={anchor_path}", sync=True)
_log_line(f"ARGV={sys.argv}", sync=True)
def _on_exit():
_log_line("Process exit reached", sync=True)
def install_wgp_crash_diagnostics(anchor_file):
try:
with _LOCK:
if _STATE["installed"]:
return str(_STATE["log_path"] or "")
anchor_path = Path(anchor_file).resolve()
log_dir = anchor_path.parent / "crash"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / f"wgp_crash_{time.strftime('%Y%m%d_%H%M%S')}_pid{os.getpid()}.log"
log_file = log_path.open("a", encoding="utf-8", buffering=1)
_STATE["log_file"] = log_file
_STATE["log_path"] = log_path
_install_stream_tees(log_file)
_install_exception_hooks()
_install_faulthandler(log_file)
atexit.register(_on_exit)
_STATE["installed"] = True
_log_startup_context(anchor_path)
print(f"[crash-diagnostics] Logging to {log_path}")
return str(log_path)
except Exception:
return ""
|