| """Track and clean up spawned CLI subprocesses. |
| |
| This is a safety net for cases where the server is interrupted (Ctrl+C) and the |
| FastAPI lifespan cleanup doesn't run to completion. We only track processes we |
| spawn so we don't accidentally kill unrelated system processes. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import atexit |
| import os |
| import subprocess |
| import threading |
|
|
| from loguru import logger |
|
|
| _lock = threading.Lock() |
| _pids: set[int] = set() |
| _atexit_registered = False |
|
|
|
|
| def ensure_atexit_registered() -> None: |
| global _atexit_registered |
| with _lock: |
| if _atexit_registered: |
| return |
| atexit.register(kill_all_best_effort) |
| _atexit_registered = True |
|
|
|
|
| def register_pid(pid: int) -> None: |
| if not pid: |
| return |
| ensure_atexit_registered() |
| with _lock: |
| _pids.add(int(pid)) |
|
|
|
|
| def unregister_pid(pid: int) -> None: |
| if not pid: |
| return |
| with _lock: |
| _pids.discard(int(pid)) |
|
|
|
|
| def kill_all_best_effort() -> None: |
| """Kill any still-running registered pids (best-effort).""" |
| with _lock: |
| pids = list(_pids) |
| _pids.clear() |
|
|
| if not pids: |
| return |
|
|
| if os.name == "nt": |
| for pid in pids: |
| try: |
| |
| subprocess.run( |
| ["taskkill", "/PID", str(pid), "/T", "/F"], |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| check=False, |
| ) |
| except Exception as e: |
| logger.debug("process_registry: taskkill failed pid=%s: %s", pid, e) |
| return |
|
|
| |
| for pid in pids: |
| try: |
| os.kill(pid, 9) |
| except Exception as e: |
| logger.debug("process_registry: kill failed pid=%s: %s", pid, e) |
|
|