| |
| from __future__ import annotations |
|
|
| import argparse |
| import importlib |
| import json |
| import os |
| import re |
| import subprocess |
| import sys |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import httpx |
| import yaml |
| from pydantic import BaseModel |
|
|
| ROOT = Path(__file__).resolve().parent |
| if str(ROOT / "src") not in sys.path: |
| sys.path.insert(0, str(ROOT / "src")) |
| if str(ROOT) not in sys.path: |
| sys.path.insert(0, str(ROOT)) |
|
|
| START_RE = re.compile(r"^\[START\] task=([^ ]+) env=([^ ]+) model=(.+)$") |
| STEP_RE = re.compile(r"^\[STEP\] step=(\d+) action=(.+) reward=([0-9]+\.[0-9]{2}) done=(true|false) error=(.+)$") |
| END_RE = re.compile(r"^\[END\] success=(true|false) steps=(\d+) score=([0-9]+\.[0-9]{3}) rewards=([0-9\.,-]*)$") |
|
|
|
|
| @dataclass |
| class CheckResult: |
| name: str |
| passed: bool |
| detail: str |
|
|
|
|
| def run_command(cmd: list[str], timeout: int = 300) -> tuple[int, str, str]: |
| proc = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True, timeout=timeout) |
| return proc.returncode, proc.stdout, proc.stderr |
|
|
|
|
| def check_env_config() -> CheckResult: |
| required = ["API_BASE_URL", "MODEL_NAME", "HF_TOKEN"] |
| missing = [k for k in required if not os.getenv(k)] |
| if missing: |
| return CheckResult("Env vars configured", False, f"Missing: {', '.join(missing)}") |
| return CheckResult("Env vars configured", True, "API_BASE_URL, MODEL_NAME, HF_TOKEN are set") |
|
|
|
|
| def check_inference_file() -> CheckResult: |
| path = ROOT / "inference.py" |
| if not path.exists(): |
| return CheckResult("Root inference.py", False, "inference.py missing at repo root") |
|
|
| text = path.read_text(encoding="utf-8") |
| required_snippets = [ |
| "from openai import OpenAI", |
| "API_BASE_URL", |
| "MODEL_NAME", |
| "HF_TOKEN", |
| "[START] task=", |
| "[STEP] step=", |
| "[END] success=", |
| ] |
| missing = [s for s in required_snippets if s not in text] |
| if missing: |
| return CheckResult("Root inference.py", False, f"Missing required content: {missing}") |
|
|
| return CheckResult("Root inference.py", True, "Found required script name, env vars, OpenAI client, and organizer log format") |
|
|
|
|
| def check_openenv_compliance() -> CheckResult: |
| cfg_path = ROOT / "openenv.yaml" |
| if not cfg_path.exists(): |
| return CheckResult("OpenEnv compliance", False, "openenv.yaml not found") |
|
|
| cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) |
| for key in ["entrypoint", "models", "tasks", "api"]: |
| if key not in cfg: |
| return CheckResult("OpenEnv compliance", False, f"Missing key in openenv.yaml: {key}") |
|
|
| entrypoint = cfg["entrypoint"] |
| if ":" not in entrypoint: |
| return CheckResult("OpenEnv compliance", False, "Entrypoint must be <path>:<ClassName>") |
|
|
| fs_path, class_name = entrypoint.split(":", 1) |
| module_name = fs_path.replace("/", ".").replace(".py", "") |
| module = importlib.import_module(module_name) |
| env_cls = getattr(module, class_name, None) |
| if env_cls is None: |
| return CheckResult("OpenEnv compliance", False, f"Entrypoint class not found: {class_name}") |
|
|
| env = env_cls() |
| for method_name in ["reset", "step", "state"]: |
| if not callable(getattr(env, method_name, None)): |
| return CheckResult("OpenEnv compliance", False, f"Missing callable method: {method_name}") |
|
|
| model_refs = cfg.get("models", {}) |
| for model_name in ["observation", "action", "reward"]: |
| dotted = model_refs.get(model_name) |
| if not dotted or "." not in dotted: |
| return CheckResult("OpenEnv compliance", False, f"Invalid model ref for {model_name}: {dotted}") |
| mod_name, cls_name = dotted.rsplit(".", 1) |
| cls = getattr(importlib.import_module(mod_name), cls_name, None) |
| if cls is None or not issubclass(cls, BaseModel): |
| return CheckResult("OpenEnv compliance", False, f"{dotted} must resolve to Pydantic BaseModel") |
|
|
| obs = env.reset(cfg["tasks"][0]["id"]) |
| if not isinstance(obs, BaseModel): |
| return CheckResult("OpenEnv compliance", False, "reset() must return typed model") |
|
|
| action_mod_name, action_cls_name = model_refs["action"].rsplit(".", 1) |
| action_cls = getattr(importlib.import_module(action_mod_name), action_cls_name) |
| action = action_cls(action_type="read_ticket", ticket_id="T-1001") |
| obs2, reward, done, info = env.step(action) |
|
|
| if not isinstance(obs2, BaseModel): |
| return CheckResult("OpenEnv compliance", False, "step() observation must be typed model") |
| if not isinstance(reward, BaseModel): |
| return CheckResult("OpenEnv compliance", False, "step() reward must be typed model") |
| if not isinstance(done, bool): |
| return CheckResult("OpenEnv compliance", False, "step() done must be bool") |
| if not isinstance(info, dict): |
| return CheckResult("OpenEnv compliance", False, "step() info must be dict") |
| if not isinstance(env.state(), dict): |
| return CheckResult("OpenEnv compliance", False, "state() must return dict") |
|
|
| return CheckResult("OpenEnv compliance", True, "openenv.yaml + typed models + reset/step/state validated") |
|
|
|
|
| def check_task_graders() -> CheckResult: |
| inference = importlib.import_module("inference") |
| env_mod = importlib.import_module("support_triage_openenv.env") |
| action_mod = importlib.import_module("support_triage_openenv.models") |
|
|
| env = env_mod.SupportTriageEnv() |
| task_ids = env.task_ids |
| if len(task_ids) < 3: |
| return CheckResult("3+ tasks with graders", False, f"Expected >=3 tasks, got {len(task_ids)}") |
|
|
| details: list[str] = [] |
| for task_id in task_ids: |
| env.reset(task_id) |
| done = False |
| info: dict[str, Any] = {} |
|
|
| while not done: |
| step_idx = env.state()["step_count"] |
| raw_action = inference.RULE_POLICY[task_id][min(step_idx, len(inference.RULE_POLICY[task_id]) - 1)] |
| action = action_mod.Action.model_validate(raw_action) |
| _, reward, done, info = env.step(action) |
| reward_value = float(reward.value) |
| if not (0.0 <= reward_value <= 1.0): |
| return CheckResult("3+ tasks with graders", False, f"Reward out of range in {task_id}: {reward_value}") |
|
|
| grader_score = float(info.get("grader_score", -1.0)) |
| if not (0.0 <= grader_score <= 1.0): |
| return CheckResult("3+ tasks with graders", False, f"Grader out of range in {task_id}: {grader_score}") |
|
|
| details.append(f"{task_id}:{grader_score:.4f}") |
|
|
| return CheckResult("3+ tasks with graders", True, " | ".join(details)) |
|
|
|
|
| def _validate_log_sequence(lines: list[str]) -> tuple[bool, str]: |
| if not lines: |
| return False, "No stdout lines from inference.py" |
|
|
| phase = "need_start" |
| steps_seen = 0 |
| episodes = 0 |
|
|
| for line in lines: |
| if line.startswith("[START]"): |
| if phase != "need_start": |
| return False, "[START] appeared before previous episode ended" |
| if not START_RE.match(line): |
| return False, f"Invalid [START] format: {line}" |
| phase = "need_step_or_end" |
| steps_seen = 0 |
| continue |
|
|
| if line.startswith("[STEP]"): |
| if phase != "need_step_or_end": |
| return False, "[STEP] appeared before [START]" |
| m = STEP_RE.match(line) |
| if not m: |
| return False, f"Invalid [STEP] format: {line}" |
| reward = float(m.group(3)) |
| if reward < 0.0 or reward > 1.0: |
| return False, f"[STEP] reward out of range: {reward}" |
| steps_seen += 1 |
| continue |
|
|
| if line.startswith("[END]"): |
| if phase != "need_step_or_end": |
| return False, "[END] appeared before [START]" |
| m = END_RE.match(line) |
| if not m: |
| return False, f"Invalid [END] format: {line}" |
| end_steps = int(m.group(2)) |
| score = float(m.group(3)) |
| rewards_blob = m.group(4) |
|
|
| if end_steps != steps_seen: |
| return False, f"[END] steps mismatch: expected {steps_seen}, got {end_steps}" |
| if score < 0.0 or score > 1.0: |
| return False, f"[END] score out of range: {score}" |
|
|
| rewards = [r for r in rewards_blob.split(",") if r != ""] |
| if len(rewards) != steps_seen: |
| return False, f"[END] rewards count mismatch: expected {steps_seen}, got {len(rewards)}" |
| for r in rewards: |
| rv = float(r) |
| if rv < 0.0 or rv > 1.0: |
| return False, f"[END] reward out of range: {rv}" |
|
|
| episodes += 1 |
| phase = "need_start" |
| continue |
|
|
| return False, f"Unexpected stdout line (must be START/STEP/END only): {line}" |
|
|
| if phase != "need_start": |
| return False, "Missing [END] for final episode" |
| if episodes == 0: |
| return False, "No complete episodes found" |
|
|
| return True, f"Validated {episodes} episode log sequences" |
|
|
|
|
| def check_inference_repro() -> CheckResult: |
| output_path = ROOT / "scores" / "inference_scores.json" |
| cmd = [sys.executable, "inference.py", "--mode", "heuristic", "--output", str(output_path)] |
| code, out, err = run_command(cmd, timeout=120) |
| if code != 0: |
| return CheckResult("Baseline reproduces", False, f"inference.py failed: {err.strip() or out.strip()}") |
|
|
| if not output_path.exists(): |
| return CheckResult("Baseline reproduces", False, "scores/inference_scores.json was not created") |
|
|
| try: |
| payload = json.loads(output_path.read_text(encoding="utf-8")) |
| except Exception as exc: |
| return CheckResult("Baseline reproduces", False, f"Invalid JSON output: {exc}") |
|
|
| for key in ["avg_score", "avg_final_reward", "episodes"]: |
| if key not in payload: |
| return CheckResult("Baseline reproduces", False, f"Missing key in output JSON: {key}") |
|
|
| lines = [ln.strip() for ln in out.splitlines() if ln.strip()] |
| ok, detail = _validate_log_sequence(lines) |
| if not ok: |
| return CheckResult("Baseline reproduces", False, detail) |
|
|
| return CheckResult("Baseline reproduces", True, f"inference.py completed and wrote {output_path.relative_to(ROOT)}; {detail}") |
|
|
|
|
| def check_docker_build(skip: bool) -> CheckResult: |
| if skip: |
| return CheckResult("Dockerfile builds", True, "Skipped by --skip-docker") |
|
|
| code, out, err = run_command(["docker", "build", "-t", "support-triage-openenv:presubmit", "."], timeout=900) |
| if code != 0: |
| msg = (err or out).strip().splitlines() |
| short = msg[-1] if msg else "docker build failed" |
| return CheckResult("Dockerfile builds", False, short) |
| return CheckResult("Dockerfile builds", True, "docker build succeeded") |
|
|
|
|
| def check_space_ping(space_url: str | None, skip: bool) -> CheckResult: |
| if skip: |
| return CheckResult("HF Space deploys + ping", True, "Skipped by --skip-space") |
|
|
| if not space_url: |
| return CheckResult("HF Space deploys + ping", False, "Provide --space-url (or use --skip-space for local-only checks)") |
|
|
| base = space_url.rstrip("/") |
| try: |
| with httpx.Client(timeout=20.0) as client: |
| reset = client.post(f"{base}/reset", json={"task_id": "easy_password_reset"}) |
| if reset.status_code != 200: |
| return CheckResult("HF Space deploys + ping", False, f"POST /reset returned {reset.status_code}") |
|
|
| payload = reset.json() |
| if payload.get("task_id") != "easy_password_reset": |
| return CheckResult("HF Space deploys + ping", False, "reset() payload missing expected task_id") |
| except Exception as exc: |
| return CheckResult("HF Space deploys + ping", False, f"Ping failed: {exc}") |
|
|
| return CheckResult("HF Space deploys + ping", True, f"{base} returned 200 and reset() works") |
|
|
|
|
| def check_organizer_script(space_url: str | None, skip: bool) -> CheckResult: |
| if skip: |
| return CheckResult("Organizer pre-validation script", True, "Skipped") |
|
|
| script_path = ROOT / "scripts" / "pre_validation_script.sh" |
| if not script_path.exists(): |
| return CheckResult("Organizer pre-validation script", False, "scripts/pre_validation_script.sh not found") |
| if not space_url: |
| return CheckResult("Organizer pre-validation script", False, "Requires --space-url") |
|
|
| code, out, err = run_command(["bash", str(script_path), space_url, str(ROOT)], timeout=1800) |
| if code != 0: |
| tail = (out + "\n" + err).strip().splitlines()[-5:] |
| return CheckResult("Organizer pre-validation script", False, " | ".join(tail) if tail else "script failed") |
|
|
| return CheckResult("Organizer pre-validation script", True, "Organizer script passed") |
|
|
|
|
| def run_all(args: argparse.Namespace) -> list[CheckResult]: |
| organizer_skip = args.skip_organizer_script or args.skip_space or args.skip_docker |
| return [ |
| check_env_config(), |
| check_inference_file(), |
| check_openenv_compliance(), |
| check_task_graders(), |
| check_inference_repro(), |
| check_docker_build(skip=args.skip_docker), |
| check_space_ping(space_url=args.space_url, skip=args.skip_space), |
| check_organizer_script(space_url=args.space_url, skip=organizer_skip), |
| ] |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description="Pre-submission validator for Meta HF hackathon OpenEnv env.") |
| parser.add_argument("--space-url", default=os.getenv("SPACE_URL"), help="Deployed HF Space URL for ping checks") |
| parser.add_argument("--skip-docker", action="store_true", help="Skip docker build check") |
| parser.add_argument("--skip-space", action="store_true", help="Skip remote Space ping check") |
| parser.add_argument("--skip-organizer-script", action="store_true", help="Skip organizer-provided pre-validation script") |
| args = parser.parse_args() |
|
|
| results = run_all(args) |
|
|
| print("\n=== Pre-Submission Checklist Report ===") |
| for r in results: |
| status = "PASS" if r.passed else "FAIL" |
| print(f"[{status}] {r.name}: {r.detail}") |
|
|
| failed = [r for r in results if not r.passed] |
| print("\nSummary:") |
| print(f"- Total checks: {len(results)}") |
| print(f"- Passed: {len(results) - len(failed)}") |
| print(f"- Failed: {len(failed)}") |
|
|
| if failed: |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|