Spaces:
No application file
No application file
| from __future__ import annotations | |
| import argparse | |
| import ast | |
| import importlib | |
| import json | |
| import os | |
| import py_compile | |
| import re | |
| import subprocess | |
| import sys | |
| import time | |
| import traceback | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| SKIP_DIRS = {".venv", "__pycache__", ".git", "diagnostics", "data", "logs"} | |
| IDENT_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") | |
| def iter_python_files() -> list[Path]: | |
| files: list[Path] = [] | |
| for path in ROOT.rglob("*.py"): | |
| if any(part in SKIP_DIRS for part in path.relative_to(ROOT).parts): | |
| continue | |
| files.append(path) | |
| return sorted(files) | |
| def has_main_block(path: Path) -> bool: | |
| try: | |
| tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) | |
| except UnicodeDecodeError: | |
| tree = ast.parse(path.read_text(encoding="cp1252"), filename=str(path)) | |
| for node in ast.walk(tree): | |
| if not isinstance(node, ast.If): | |
| continue | |
| test = node.test | |
| if ( | |
| isinstance(test, ast.Compare) | |
| and isinstance(test.left, ast.Name) | |
| and test.left.id == "__name__" | |
| and len(test.ops) == 1 | |
| and isinstance(test.ops[0], ast.Eq) | |
| and len(test.comparators) == 1 | |
| and isinstance(test.comparators[0], ast.Constant) | |
| and test.comparators[0].value == "__main__" | |
| ): | |
| return True | |
| return False | |
| def module_name_for(path: Path) -> str | None: | |
| rel = path.relative_to(ROOT).with_suffix("") | |
| parts = rel.parts | |
| if parts and parts[0] == "scripts": | |
| return None | |
| if not parts or any(not IDENT_RE.match(part) for part in parts): | |
| return None | |
| return ".".join(parts) | |
| def result(file: Path, stage: str, status: str, **extra: Any) -> dict[str, Any]: | |
| row = { | |
| "file": str(file.relative_to(ROOT)), | |
| "stage": stage, | |
| "status": status, | |
| } | |
| row.update(extra) | |
| return row | |
| def compile_file(path: Path) -> dict[str, Any]: | |
| try: | |
| py_compile.compile(str(path), doraise=True) | |
| return result(path, "compile", "pass") | |
| except Exception as exc: | |
| return result( | |
| path, | |
| "compile", | |
| "fail", | |
| error_type=type(exc).__name__, | |
| error=str(exc), | |
| traceback=traceback.format_exc(limit=5), | |
| ) | |
| def import_file(path: Path) -> dict[str, Any]: | |
| module_name = module_name_for(path) | |
| if module_name is None: | |
| return result(path, "import", "skip", reason="not a valid importable module path") | |
| try: | |
| importlib.import_module(module_name) | |
| return result(path, "import", "pass", module=module_name) | |
| except Exception as exc: | |
| return result( | |
| path, | |
| "import", | |
| "fail", | |
| module=module_name, | |
| error_type=type(exc).__name__, | |
| error=str(exc), | |
| traceback=traceback.format_exc(limit=5), | |
| ) | |
| def run_file(path: Path, timeout: int) -> dict[str, Any]: | |
| if path.resolve() == Path(__file__).resolve(): | |
| return result(path, "execute", "skip", reason="test runner does not execute itself") | |
| if path.name == "run_gold_btc_mt5.py" and not all( | |
| os.environ.get(name) | |
| for name in ( | |
| "MT5_ACCOUNT_LIVE_LOGIN", | |
| "MT5_ACCOUNT_LIVE_PASSWORD", | |
| "MT5_ACCOUNT_LIVE_SERVER", | |
| ) | |
| ): | |
| return result(path, "execute", "skip", reason="MT5_ACCOUNT_LIVE_* env vars are not set") | |
| if not has_main_block(path): | |
| return result(path, "execute", "skip", reason="no __main__ block") | |
| module_name = module_name_for(path) | |
| cmd = [sys.executable] | |
| if module_name is not None: | |
| cmd += ["-m", module_name] | |
| else: | |
| cmd += [str(path)] | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "") | |
| env["PYTHONUTF8"] = "1" | |
| env["PYTHONIOENCODING"] = "utf-8" | |
| start = time.time() | |
| try: | |
| proc = subprocess.run( | |
| cmd, | |
| cwd=ROOT, | |
| env=env, | |
| capture_output=True, | |
| text=True, | |
| timeout=timeout, | |
| encoding="utf-8", | |
| errors="replace", | |
| ) | |
| elapsed = round(time.time() - start, 3) | |
| status = "pass" if proc.returncode == 0 else "fail" | |
| return result( | |
| path, | |
| "execute", | |
| status, | |
| command=" ".join(cmd), | |
| returncode=proc.returncode, | |
| seconds=elapsed, | |
| stdout=proc.stdout[-4000:], | |
| stderr=proc.stderr[-4000:], | |
| ) | |
| except subprocess.TimeoutExpired as exc: | |
| return result( | |
| path, | |
| "execute", | |
| "timeout", | |
| command=" ".join(cmd), | |
| seconds=timeout, | |
| stdout=(exc.stdout or "")[-4000:] if isinstance(exc.stdout, str) else "", | |
| stderr=(exc.stderr or "")[-4000:] if isinstance(exc.stderr, str) else "", | |
| ) | |
| def summarize(rows: list[dict[str, Any]]) -> dict[str, Any]: | |
| summary: dict[str, Any] = {} | |
| for row in rows: | |
| stage = row["stage"] | |
| status = row["status"] | |
| summary.setdefault(stage, {}) | |
| summary[stage][status] = summary[stage].get(status, 0) + 1 | |
| return summary | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Compile, import, and execute every project Python file safely.") | |
| parser.add_argument("--timeout", type=int, default=20, help="Per-file execute timeout in seconds.") | |
| parser.add_argument("--out", default="diagnostics/every_file_tests.json") | |
| parser.add_argument("--no-execute", action="store_true", help="Only compile/import files.") | |
| args = parser.parse_args() | |
| files = iter_python_files() | |
| rows: list[dict[str, Any]] = [] | |
| for path in files: | |
| rows.append(compile_file(path)) | |
| for path in files: | |
| rows.append(import_file(path)) | |
| if not args.no_execute: | |
| for path in files: | |
| rows.append(run_file(path, args.timeout)) | |
| payload = { | |
| "generated_at": datetime.now(timezone.utc).isoformat(), | |
| "python": sys.executable, | |
| "file_count": len(files), | |
| "summary": summarize(rows), | |
| "results": rows, | |
| } | |
| out = ROOT / args.out | |
| out.parent.mkdir(parents=True, exist_ok=True) | |
| out.write_text(json.dumps(payload, indent=2), encoding="utf-8") | |
| print(f"Python files: {len(files)}") | |
| print(json.dumps(payload["summary"], indent=2)) | |
| print(f"Report: {out}") | |
| failures = [r for r in rows if r["status"] in {"fail", "timeout"}] | |
| if failures: | |
| print("\nFailures/timeouts:") | |
| for row in failures: | |
| print(f"- {row['stage']} {row['file']}: {row.get('error_type', row['status'])} {row.get('error', '')}") | |
| return 1 if failures else 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |