Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import importlib | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any | |
| import httpx | |
| import yaml | |
| from pydantic import BaseModel | |
| ROOT = Path(__file__).resolve().parent | |
| if str(ROOT / "src") not in sys.path: | |
| sys.path.insert(0, str(ROOT / "src")) | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| START_RE = re.compile(r"^\[START\] task=([^ ]+) env=([^ ]+) model=(.+)$") | |
| STEP_RE = re.compile(r"^\[STEP\] step=(\d+) action=(.+) reward=([0-9]+\.[0-9]{2}) done=(true|false) error=(.+)$") | |
| END_RE = re.compile(r"^\[END\] success=(true|false) steps=(\d+) score=([0-9]+\.[0-9]{3}) rewards=([0-9\.,-]*)$") | |
| class CheckResult: | |
| name: str | |
| passed: bool | |
| detail: str | |
| def run_command(cmd: list[str], timeout: int = 300) -> tuple[int, str, str]: | |
| proc = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True, timeout=timeout) | |
| return proc.returncode, proc.stdout, proc.stderr | |
| def check_env_config() -> CheckResult: | |
| required = ["API_BASE_URL", "MODEL_NAME", "HF_TOKEN"] | |
| missing = [k for k in required if not os.getenv(k)] | |
| if missing: | |
| return CheckResult("Env vars configured", False, f"Missing: {', '.join(missing)}") | |
| return CheckResult("Env vars configured", True, "API_BASE_URL, MODEL_NAME, HF_TOKEN are set") | |
| def check_inference_file() -> CheckResult: | |
| path = ROOT / "inference.py" | |
| if not path.exists(): | |
| return CheckResult("Root inference.py", False, "inference.py missing at repo root") | |
| text = path.read_text(encoding="utf-8") | |
| required_snippets = [ | |
| "from openai import OpenAI", | |
| "API_BASE_URL", | |
| "MODEL_NAME", | |
| "HF_TOKEN", | |
| "[START] task=", | |
| "[STEP] step=", | |
| "[END] success=", | |
| ] | |
| missing = [s for s in required_snippets if s not in text] | |
| if missing: | |
| return CheckResult("Root inference.py", False, f"Missing required content: {missing}") | |
| return CheckResult("Root inference.py", True, "Found required script name, env vars, OpenAI client, and organizer log format") | |
| def check_openenv_compliance() -> CheckResult: | |
| cfg_path = ROOT / "openenv.yaml" | |
| if not cfg_path.exists(): | |
| return CheckResult("OpenEnv compliance", False, "openenv.yaml not found") | |
| cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) | |
| for key in ["entrypoint", "models", "tasks", "api"]: | |
| if key not in cfg: | |
| return CheckResult("OpenEnv compliance", False, f"Missing key in openenv.yaml: {key}") | |
| entrypoint = cfg["entrypoint"] | |
| if ":" not in entrypoint: | |
| return CheckResult("OpenEnv compliance", False, "Entrypoint must be <path>:<ClassName>") | |
| fs_path, class_name = entrypoint.split(":", 1) | |
| module_name = fs_path.replace("/", ".").replace(".py", "") | |
| module = importlib.import_module(module_name) | |
| env_cls = getattr(module, class_name, None) | |
| if env_cls is None: | |
| return CheckResult("OpenEnv compliance", False, f"Entrypoint class not found: {class_name}") | |
| env = env_cls() | |
| for method_name in ["reset", "step", "state"]: | |
| if not callable(getattr(env, method_name, None)): | |
| return CheckResult("OpenEnv compliance", False, f"Missing callable method: {method_name}") | |
| model_refs = cfg.get("models", {}) | |
| for model_name in ["observation", "action", "reward"]: | |
| dotted = model_refs.get(model_name) | |
| if not dotted or "." not in dotted: | |
| return CheckResult("OpenEnv compliance", False, f"Invalid model ref for {model_name}: {dotted}") | |
| mod_name, cls_name = dotted.rsplit(".", 1) | |
| cls = getattr(importlib.import_module(mod_name), cls_name, None) | |
| if cls is None or not issubclass(cls, BaseModel): | |
| return CheckResult("OpenEnv compliance", False, f"{dotted} must resolve to Pydantic BaseModel") | |
| obs = env.reset(cfg["tasks"][0]["id"]) | |
| if not isinstance(obs, BaseModel): | |
| return CheckResult("OpenEnv compliance", False, "reset() must return typed model") | |
| action_mod_name, action_cls_name = model_refs["action"].rsplit(".", 1) | |
| action_cls = getattr(importlib.import_module(action_mod_name), action_cls_name) | |
| action = action_cls(action_type="read_ticket", ticket_id="T-1001") | |
| obs2, reward, done, info = env.step(action) | |
| if not isinstance(obs2, BaseModel): | |
| return CheckResult("OpenEnv compliance", False, "step() observation must be typed model") | |
| if not isinstance(reward, BaseModel): | |
| return CheckResult("OpenEnv compliance", False, "step() reward must be typed model") | |
| if not isinstance(done, bool): | |
| return CheckResult("OpenEnv compliance", False, "step() done must be bool") | |
| if not isinstance(info, dict): | |
| return CheckResult("OpenEnv compliance", False, "step() info must be dict") | |
| if not isinstance(env.state(), dict): | |
| return CheckResult("OpenEnv compliance", False, "state() must return dict") | |
| return CheckResult("OpenEnv compliance", True, "openenv.yaml + typed models + reset/step/state validated") | |
| def check_task_graders() -> CheckResult: | |
| inference = importlib.import_module("inference") | |
| env_mod = importlib.import_module("support_triage_openenv.env") | |
| action_mod = importlib.import_module("support_triage_openenv.models") | |
| env = env_mod.SupportTriageEnv() | |
| task_ids = env.task_ids | |
| if len(task_ids) < 3: | |
| return CheckResult("3+ tasks with graders", False, f"Expected >=3 tasks, got {len(task_ids)}") | |
| details: list[str] = [] | |
| for task_id in task_ids: | |
| env.reset(task_id) | |
| done = False | |
| info: dict[str, Any] = {} | |
| while not done: | |
| step_idx = env.state()["step_count"] | |
| raw_action = inference.RULE_POLICY[task_id][min(step_idx, len(inference.RULE_POLICY[task_id]) - 1)] | |
| action = action_mod.Action.model_validate(raw_action) | |
| _, reward, done, info = env.step(action) | |
| reward_value = float(reward.value) | |
| if not (0.0 <= reward_value <= 1.0): | |
| return CheckResult("3+ tasks with graders", False, f"Reward out of range in {task_id}: {reward_value}") | |
| grader_score = float(info.get("grader_score", -1.0)) | |
| if not (0.0 <= grader_score <= 1.0): | |
| return CheckResult("3+ tasks with graders", False, f"Grader out of range in {task_id}: {grader_score}") | |
| details.append(f"{task_id}:{grader_score:.4f}") | |
| return CheckResult("3+ tasks with graders", True, " | ".join(details)) | |
| def _validate_log_sequence(lines: list[str]) -> tuple[bool, str]: | |
| if not lines: | |
| return False, "No stdout lines from inference.py" | |
| phase = "need_start" | |
| steps_seen = 0 | |
| episodes = 0 | |
| for line in lines: | |
| if line.startswith("[START]"): | |
| if phase != "need_start": | |
| return False, "[START] appeared before previous episode ended" | |
| if not START_RE.match(line): | |
| return False, f"Invalid [START] format: {line}" | |
| phase = "need_step_or_end" | |
| steps_seen = 0 | |
| continue | |
| if line.startswith("[STEP]"): | |
| if phase != "need_step_or_end": | |
| return False, "[STEP] appeared before [START]" | |
| m = STEP_RE.match(line) | |
| if not m: | |
| return False, f"Invalid [STEP] format: {line}" | |
| reward = float(m.group(3)) | |
| if reward < 0.0 or reward > 1.0: | |
| return False, f"[STEP] reward out of range: {reward}" | |
| steps_seen += 1 | |
| continue | |
| if line.startswith("[END]"): | |
| if phase != "need_step_or_end": | |
| return False, "[END] appeared before [START]" | |
| m = END_RE.match(line) | |
| if not m: | |
| return False, f"Invalid [END] format: {line}" | |
| end_steps = int(m.group(2)) | |
| score = float(m.group(3)) | |
| rewards_blob = m.group(4) | |
| if end_steps != steps_seen: | |
| return False, f"[END] steps mismatch: expected {steps_seen}, got {end_steps}" | |
| if score < 0.0 or score > 1.0: | |
| return False, f"[END] score out of range: {score}" | |
| rewards = [r for r in rewards_blob.split(",") if r != ""] | |
| if len(rewards) != steps_seen: | |
| return False, f"[END] rewards count mismatch: expected {steps_seen}, got {len(rewards)}" | |
| for r in rewards: | |
| rv = float(r) | |
| if rv < 0.0 or rv > 1.0: | |
| return False, f"[END] reward out of range: {rv}" | |
| episodes += 1 | |
| phase = "need_start" | |
| continue | |
| return False, f"Unexpected stdout line (must be START/STEP/END only): {line}" | |
| if phase != "need_start": | |
| return False, "Missing [END] for final episode" | |
| if episodes == 0: | |
| return False, "No complete episodes found" | |
| return True, f"Validated {episodes} episode log sequences" | |
| def check_inference_repro() -> CheckResult: | |
| output_path = ROOT / "scores" / "inference_scores.json" | |
| cmd = [sys.executable, "inference.py", "--mode", "heuristic", "--output", str(output_path)] | |
| code, out, err = run_command(cmd, timeout=120) | |
| if code != 0: | |
| return CheckResult("Baseline reproduces", False, f"inference.py failed: {err.strip() or out.strip()}") | |
| if not output_path.exists(): | |
| return CheckResult("Baseline reproduces", False, "scores/inference_scores.json was not created") | |
| try: | |
| payload = json.loads(output_path.read_text(encoding="utf-8")) | |
| except Exception as exc: | |
| return CheckResult("Baseline reproduces", False, f"Invalid JSON output: {exc}") | |
| for key in ["avg_score", "avg_final_reward", "episodes"]: | |
| if key not in payload: | |
| return CheckResult("Baseline reproduces", False, f"Missing key in output JSON: {key}") | |
| lines = [ln.strip() for ln in out.splitlines() if ln.strip()] | |
| ok, detail = _validate_log_sequence(lines) | |
| if not ok: | |
| return CheckResult("Baseline reproduces", False, detail) | |
| return CheckResult("Baseline reproduces", True, f"inference.py completed and wrote {output_path.relative_to(ROOT)}; {detail}") | |
| def check_docker_build(skip: bool) -> CheckResult: | |
| if skip: | |
| return CheckResult("Dockerfile builds", True, "Skipped by --skip-docker") | |
| code, out, err = run_command(["docker", "build", "-t", "support-triage-openenv:presubmit", "."], timeout=900) | |
| if code != 0: | |
| msg = (err or out).strip().splitlines() | |
| short = msg[-1] if msg else "docker build failed" | |
| return CheckResult("Dockerfile builds", False, short) | |
| return CheckResult("Dockerfile builds", True, "docker build succeeded") | |
| def check_space_ping(space_url: str | None, skip: bool) -> CheckResult: | |
| if skip: | |
| return CheckResult("HF Space deploys + ping", True, "Skipped by --skip-space") | |
| if not space_url: | |
| return CheckResult("HF Space deploys + ping", False, "Provide --space-url (or use --skip-space for local-only checks)") | |
| base = space_url.rstrip("/") | |
| try: | |
| with httpx.Client(timeout=20.0) as client: | |
| reset = client.post(f"{base}/reset", json={"task_id": "easy_password_reset"}) | |
| if reset.status_code != 200: | |
| return CheckResult("HF Space deploys + ping", False, f"POST /reset returned {reset.status_code}") | |
| payload = reset.json() | |
| if payload.get("task_id") != "easy_password_reset": | |
| return CheckResult("HF Space deploys + ping", False, "reset() payload missing expected task_id") | |
| except Exception as exc: | |
| return CheckResult("HF Space deploys + ping", False, f"Ping failed: {exc}") | |
| return CheckResult("HF Space deploys + ping", True, f"{base} returned 200 and reset() works") | |
| def check_organizer_script(space_url: str | None, skip: bool) -> CheckResult: | |
| if skip: | |
| return CheckResult("Organizer pre-validation script", True, "Skipped") | |
| script_path = ROOT / "scripts" / "pre_validation_script.sh" | |
| if not script_path.exists(): | |
| return CheckResult("Organizer pre-validation script", False, "scripts/pre_validation_script.sh not found") | |
| if not space_url: | |
| return CheckResult("Organizer pre-validation script", False, "Requires --space-url") | |
| code, out, err = run_command(["bash", str(script_path), space_url, str(ROOT)], timeout=1800) | |
| if code != 0: | |
| tail = (out + "\n" + err).strip().splitlines()[-5:] | |
| return CheckResult("Organizer pre-validation script", False, " | ".join(tail) if tail else "script failed") | |
| return CheckResult("Organizer pre-validation script", True, "Organizer script passed") | |
| def run_all(args: argparse.Namespace) -> list[CheckResult]: | |
| organizer_skip = args.skip_organizer_script or args.skip_space or args.skip_docker | |
| return [ | |
| check_env_config(), | |
| check_inference_file(), | |
| check_openenv_compliance(), | |
| check_task_graders(), | |
| check_inference_repro(), | |
| check_docker_build(skip=args.skip_docker), | |
| check_space_ping(space_url=args.space_url, skip=args.skip_space), | |
| check_organizer_script(space_url=args.space_url, skip=organizer_skip), | |
| ] | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Pre-submission validator for Meta HF hackathon OpenEnv env.") | |
| parser.add_argument("--space-url", default=os.getenv("SPACE_URL"), help="Deployed HF Space URL for ping checks") | |
| parser.add_argument("--skip-docker", action="store_true", help="Skip docker build check") | |
| parser.add_argument("--skip-space", action="store_true", help="Skip remote Space ping check") | |
| parser.add_argument("--skip-organizer-script", action="store_true", help="Skip organizer-provided pre-validation script") | |
| args = parser.parse_args() | |
| results = run_all(args) | |
| print("\n=== Pre-Submission Checklist Report ===") | |
| for r in results: | |
| status = "PASS" if r.passed else "FAIL" | |
| print(f"[{status}] {r.name}: {r.detail}") | |
| failed = [r for r in results if not r.passed] | |
| print("\nSummary:") | |
| print(f"- Total checks: {len(results)}") | |
| print(f"- Passed: {len(results) - len(failed)}") | |
| print(f"- Failed: {len(failed)}") | |
| if failed: | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |