from __future__ import annotations import argparse import ast import importlib import json import os import py_compile import re import subprocess import sys import time import traceback from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) SKIP_DIRS = {".venv", "__pycache__", ".git", "diagnostics", "data", "logs"} IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") def iter_python_files() -> list[Path]: files: list[Path] = [] for path in ROOT.rglob("*.py"): if any(part in SKIP_DIRS for part in path.relative_to(ROOT).parts): continue files.append(path) return sorted(files) def has_main_block(path: Path) -> bool: try: tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) except UnicodeDecodeError: tree = ast.parse(path.read_text(encoding="cp1252"), filename=str(path)) for node in ast.walk(tree): if not isinstance(node, ast.If): continue test = node.test if ( isinstance(test, ast.Compare) and isinstance(test.left, ast.Name) and test.left.id == "__name__" and len(test.ops) == 1 and isinstance(test.ops[0], ast.Eq) and len(test.comparators) == 1 and isinstance(test.comparators[0], ast.Constant) and test.comparators[0].value == "__main__" ): return True return False def module_name_for(path: Path) -> str | None: rel = path.relative_to(ROOT).with_suffix("") parts = rel.parts if parts and parts[0] == "scripts": return None if not parts or any(not IDENT_RE.match(part) for part in parts): return None return ".".join(parts) def result(file: Path, stage: str, status: str, **extra: Any) -> dict[str, Any]: row = { "file": str(file.relative_to(ROOT)), "stage": stage, "status": status, } row.update(extra) return row def compile_file(path: Path) -> dict[str, Any]: try: py_compile.compile(str(path), doraise=True) return result(path, "compile", "pass") except Exception as exc: return result( path, "compile", "fail", error_type=type(exc).__name__, error=str(exc), traceback=traceback.format_exc(limit=5), ) def import_file(path: Path) -> dict[str, Any]: module_name = module_name_for(path) if module_name is None: return result(path, "import", "skip", reason="not a valid importable module path") try: importlib.import_module(module_name) return result(path, "import", "pass", module=module_name) except Exception as exc: return result( path, "import", "fail", module=module_name, error_type=type(exc).__name__, error=str(exc), traceback=traceback.format_exc(limit=5), ) def run_file(path: Path, timeout: int) -> dict[str, Any]: if path.resolve() == Path(__file__).resolve(): return result(path, "execute", "skip", reason="test runner does not execute itself") if path.name == "run_gold_btc_mt5.py" and not all( os.environ.get(name) for name in ( "MT5_ACCOUNT_LIVE_LOGIN", "MT5_ACCOUNT_LIVE_PASSWORD", "MT5_ACCOUNT_LIVE_SERVER", ) ): return result(path, "execute", "skip", reason="MT5_ACCOUNT_LIVE_* env vars are not set") if not has_main_block(path): return result(path, "execute", "skip", reason="no __main__ block") module_name = module_name_for(path) cmd = [sys.executable] if module_name is not None: cmd += ["-m", module_name] else: cmd += [str(path)] env = os.environ.copy() env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "") env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" start = time.time() try: proc = subprocess.run( cmd, cwd=ROOT, env=env, capture_output=True, text=True, timeout=timeout, encoding="utf-8", errors="replace", ) elapsed = round(time.time() - start, 3) status = "pass" if proc.returncode == 0 else "fail" return result( path, "execute", status, command=" ".join(cmd), returncode=proc.returncode, seconds=elapsed, stdout=proc.stdout[-4000:], stderr=proc.stderr[-4000:], ) except subprocess.TimeoutExpired as exc: return result( path, "execute", "timeout", command=" ".join(cmd), seconds=timeout, stdout=(exc.stdout or "")[-4000:] if isinstance(exc.stdout, str) else "", stderr=(exc.stderr or "")[-4000:] if isinstance(exc.stderr, str) else "", ) def summarize(rows: list[dict[str, Any]]) -> dict[str, Any]: summary: dict[str, Any] = {} for row in rows: stage = row["stage"] status = row["status"] summary.setdefault(stage, {}) summary[stage][status] = summary[stage].get(status, 0) + 1 return summary def main() -> int: parser = argparse.ArgumentParser(description="Compile, import, and execute every project Python file safely.") parser.add_argument("--timeout", type=int, default=20, help="Per-file execute timeout in seconds.") parser.add_argument("--out", default="diagnostics/every_file_tests.json") parser.add_argument("--no-execute", action="store_true", help="Only compile/import files.") args = parser.parse_args() files = iter_python_files() rows: list[dict[str, Any]] = [] for path in files: rows.append(compile_file(path)) for path in files: rows.append(import_file(path)) if not args.no_execute: for path in files: rows.append(run_file(path, args.timeout)) payload = { "generated_at": datetime.now(timezone.utc).isoformat(), "python": sys.executable, "file_count": len(files), "summary": summarize(rows), "results": rows, } out = ROOT / args.out out.parent.mkdir(parents=True, exist_ok=True) out.write_text(json.dumps(payload, indent=2), encoding="utf-8") print(f"Python files: {len(files)}") print(json.dumps(payload["summary"], indent=2)) print(f"Report: {out}") failures = [r for r in rows if r["status"] in {"fail", "timeout"}] if failures: print("\nFailures/timeouts:") for row in failures: print(f"- {row['stage']} {row['file']}: {row.get('error_type', row['status'])} {row.get('error', '')}") return 1 if failures else 0 if __name__ == "__main__": raise SystemExit(main())