File size: 14,424 Bytes
1395b2e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 | #!/usr/bin/env python3
from __future__ import annotations
import argparse
import importlib
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import httpx
import yaml
from pydantic import BaseModel
ROOT = Path(__file__).resolve().parent
if str(ROOT / "src") not in sys.path:
sys.path.insert(0, str(ROOT / "src"))
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
START_RE = re.compile(r"^\[START\] task=([^ ]+) env=([^ ]+) model=(.+)$")
STEP_RE = re.compile(r"^\[STEP\] step=(\d+) action=(.+) reward=([0-9]+\.[0-9]{2}) done=(true|false) error=(.+)$")
END_RE = re.compile(r"^\[END\] success=(true|false) steps=(\d+) score=([0-9]+\.[0-9]{3}) rewards=([0-9\.,-]*)$")
@dataclass
class CheckResult:
name: str
passed: bool
detail: str
def run_command(cmd: list[str], timeout: int = 300) -> tuple[int, str, str]:
proc = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True, timeout=timeout)
return proc.returncode, proc.stdout, proc.stderr
def check_env_config() -> CheckResult:
required = ["API_BASE_URL", "MODEL_NAME", "HF_TOKEN"]
missing = [k for k in required if not os.getenv(k)]
if missing:
return CheckResult("Env vars configured", False, f"Missing: {', '.join(missing)}")
return CheckResult("Env vars configured", True, "API_BASE_URL, MODEL_NAME, HF_TOKEN are set")
def check_inference_file() -> CheckResult:
path = ROOT / "inference.py"
if not path.exists():
return CheckResult("Root inference.py", False, "inference.py missing at repo root")
text = path.read_text(encoding="utf-8")
required_snippets = [
"from openai import OpenAI",
"API_BASE_URL",
"MODEL_NAME",
"HF_TOKEN",
"[START] task=",
"[STEP] step=",
"[END] success=",
]
missing = [s for s in required_snippets if s not in text]
if missing:
return CheckResult("Root inference.py", False, f"Missing required content: {missing}")
return CheckResult("Root inference.py", True, "Found required script name, env vars, OpenAI client, and organizer log format")
def check_openenv_compliance() -> CheckResult:
cfg_path = ROOT / "openenv.yaml"
if not cfg_path.exists():
return CheckResult("OpenEnv compliance", False, "openenv.yaml not found")
cfg = yaml.safe_load(cfg_path.read_text(encoding="utf-8"))
for key in ["entrypoint", "models", "tasks", "api"]:
if key not in cfg:
return CheckResult("OpenEnv compliance", False, f"Missing key in openenv.yaml: {key}")
entrypoint = cfg["entrypoint"]
if ":" not in entrypoint:
return CheckResult("OpenEnv compliance", False, "Entrypoint must be <path>:<ClassName>")
fs_path, class_name = entrypoint.split(":", 1)
module_name = fs_path.replace("/", ".").replace(".py", "")
module = importlib.import_module(module_name)
env_cls = getattr(module, class_name, None)
if env_cls is None:
return CheckResult("OpenEnv compliance", False, f"Entrypoint class not found: {class_name}")
env = env_cls()
for method_name in ["reset", "step", "state"]:
if not callable(getattr(env, method_name, None)):
return CheckResult("OpenEnv compliance", False, f"Missing callable method: {method_name}")
model_refs = cfg.get("models", {})
for model_name in ["observation", "action", "reward"]:
dotted = model_refs.get(model_name)
if not dotted or "." not in dotted:
return CheckResult("OpenEnv compliance", False, f"Invalid model ref for {model_name}: {dotted}")
mod_name, cls_name = dotted.rsplit(".", 1)
cls = getattr(importlib.import_module(mod_name), cls_name, None)
if cls is None or not issubclass(cls, BaseModel):
return CheckResult("OpenEnv compliance", False, f"{dotted} must resolve to Pydantic BaseModel")
obs = env.reset(cfg["tasks"][0]["id"])
if not isinstance(obs, BaseModel):
return CheckResult("OpenEnv compliance", False, "reset() must return typed model")
action_mod_name, action_cls_name = model_refs["action"].rsplit(".", 1)
action_cls = getattr(importlib.import_module(action_mod_name), action_cls_name)
action = action_cls(action_type="read_ticket", ticket_id="T-1001")
obs2, reward, done, info = env.step(action)
if not isinstance(obs2, BaseModel):
return CheckResult("OpenEnv compliance", False, "step() observation must be typed model")
if not isinstance(reward, BaseModel):
return CheckResult("OpenEnv compliance", False, "step() reward must be typed model")
if not isinstance(done, bool):
return CheckResult("OpenEnv compliance", False, "step() done must be bool")
if not isinstance(info, dict):
return CheckResult("OpenEnv compliance", False, "step() info must be dict")
if not isinstance(env.state(), dict):
return CheckResult("OpenEnv compliance", False, "state() must return dict")
return CheckResult("OpenEnv compliance", True, "openenv.yaml + typed models + reset/step/state validated")
def check_task_graders() -> CheckResult:
inference = importlib.import_module("inference")
env_mod = importlib.import_module("support_triage_openenv.env")
action_mod = importlib.import_module("support_triage_openenv.models")
env = env_mod.SupportTriageEnv()
task_ids = env.task_ids
if len(task_ids) < 3:
return CheckResult("3+ tasks with graders", False, f"Expected >=3 tasks, got {len(task_ids)}")
details: list[str] = []
for task_id in task_ids:
env.reset(task_id)
done = False
info: dict[str, Any] = {}
while not done:
step_idx = env.state()["step_count"]
raw_action = inference.RULE_POLICY[task_id][min(step_idx, len(inference.RULE_POLICY[task_id]) - 1)]
action = action_mod.Action.model_validate(raw_action)
_, reward, done, info = env.step(action)
reward_value = float(reward.value)
if not (0.0 <= reward_value <= 1.0):
return CheckResult("3+ tasks with graders", False, f"Reward out of range in {task_id}: {reward_value}")
grader_score = float(info.get("grader_score", -1.0))
if not (0.0 <= grader_score <= 1.0):
return CheckResult("3+ tasks with graders", False, f"Grader out of range in {task_id}: {grader_score}")
details.append(f"{task_id}:{grader_score:.4f}")
return CheckResult("3+ tasks with graders", True, " | ".join(details))
def _validate_log_sequence(lines: list[str]) -> tuple[bool, str]:
if not lines:
return False, "No stdout lines from inference.py"
phase = "need_start"
steps_seen = 0
episodes = 0
for line in lines:
if line.startswith("[START]"):
if phase != "need_start":
return False, "[START] appeared before previous episode ended"
if not START_RE.match(line):
return False, f"Invalid [START] format: {line}"
phase = "need_step_or_end"
steps_seen = 0
continue
if line.startswith("[STEP]"):
if phase != "need_step_or_end":
return False, "[STEP] appeared before [START]"
m = STEP_RE.match(line)
if not m:
return False, f"Invalid [STEP] format: {line}"
reward = float(m.group(3))
if reward < 0.0 or reward > 1.0:
return False, f"[STEP] reward out of range: {reward}"
steps_seen += 1
continue
if line.startswith("[END]"):
if phase != "need_step_or_end":
return False, "[END] appeared before [START]"
m = END_RE.match(line)
if not m:
return False, f"Invalid [END] format: {line}"
end_steps = int(m.group(2))
score = float(m.group(3))
rewards_blob = m.group(4)
if end_steps != steps_seen:
return False, f"[END] steps mismatch: expected {steps_seen}, got {end_steps}"
if score < 0.0 or score > 1.0:
return False, f"[END] score out of range: {score}"
rewards = [r for r in rewards_blob.split(",") if r != ""]
if len(rewards) != steps_seen:
return False, f"[END] rewards count mismatch: expected {steps_seen}, got {len(rewards)}"
for r in rewards:
rv = float(r)
if rv < 0.0 or rv > 1.0:
return False, f"[END] reward out of range: {rv}"
episodes += 1
phase = "need_start"
continue
return False, f"Unexpected stdout line (must be START/STEP/END only): {line}"
if phase != "need_start":
return False, "Missing [END] for final episode"
if episodes == 0:
return False, "No complete episodes found"
return True, f"Validated {episodes} episode log sequences"
def check_inference_repro() -> CheckResult:
output_path = ROOT / "scores" / "inference_scores.json"
cmd = [sys.executable, "inference.py", "--mode", "heuristic", "--output", str(output_path)]
code, out, err = run_command(cmd, timeout=120)
if code != 0:
return CheckResult("Baseline reproduces", False, f"inference.py failed: {err.strip() or out.strip()}")
if not output_path.exists():
return CheckResult("Baseline reproduces", False, "scores/inference_scores.json was not created")
try:
payload = json.loads(output_path.read_text(encoding="utf-8"))
except Exception as exc:
return CheckResult("Baseline reproduces", False, f"Invalid JSON output: {exc}")
for key in ["avg_score", "avg_final_reward", "episodes"]:
if key not in payload:
return CheckResult("Baseline reproduces", False, f"Missing key in output JSON: {key}")
lines = [ln.strip() for ln in out.splitlines() if ln.strip()]
ok, detail = _validate_log_sequence(lines)
if not ok:
return CheckResult("Baseline reproduces", False, detail)
return CheckResult("Baseline reproduces", True, f"inference.py completed and wrote {output_path.relative_to(ROOT)}; {detail}")
def check_docker_build(skip: bool) -> CheckResult:
if skip:
return CheckResult("Dockerfile builds", True, "Skipped by --skip-docker")
code, out, err = run_command(["docker", "build", "-t", "support-triage-openenv:presubmit", "."], timeout=900)
if code != 0:
msg = (err or out).strip().splitlines()
short = msg[-1] if msg else "docker build failed"
return CheckResult("Dockerfile builds", False, short)
return CheckResult("Dockerfile builds", True, "docker build succeeded")
def check_space_ping(space_url: str | None, skip: bool) -> CheckResult:
if skip:
return CheckResult("HF Space deploys + ping", True, "Skipped by --skip-space")
if not space_url:
return CheckResult("HF Space deploys + ping", False, "Provide --space-url (or use --skip-space for local-only checks)")
base = space_url.rstrip("/")
try:
with httpx.Client(timeout=20.0) as client:
reset = client.post(f"{base}/reset", json={"task_id": "easy_password_reset"})
if reset.status_code != 200:
return CheckResult("HF Space deploys + ping", False, f"POST /reset returned {reset.status_code}")
payload = reset.json()
if payload.get("task_id") != "easy_password_reset":
return CheckResult("HF Space deploys + ping", False, "reset() payload missing expected task_id")
except Exception as exc:
return CheckResult("HF Space deploys + ping", False, f"Ping failed: {exc}")
return CheckResult("HF Space deploys + ping", True, f"{base} returned 200 and reset() works")
def check_organizer_script(space_url: str | None, skip: bool) -> CheckResult:
if skip:
return CheckResult("Organizer pre-validation script", True, "Skipped")
script_path = ROOT / "scripts" / "pre_validation_script.sh"
if not script_path.exists():
return CheckResult("Organizer pre-validation script", False, "scripts/pre_validation_script.sh not found")
if not space_url:
return CheckResult("Organizer pre-validation script", False, "Requires --space-url")
code, out, err = run_command(["bash", str(script_path), space_url, str(ROOT)], timeout=1800)
if code != 0:
tail = (out + "\n" + err).strip().splitlines()[-5:]
return CheckResult("Organizer pre-validation script", False, " | ".join(tail) if tail else "script failed")
return CheckResult("Organizer pre-validation script", True, "Organizer script passed")
def run_all(args: argparse.Namespace) -> list[CheckResult]:
organizer_skip = args.skip_organizer_script or args.skip_space or args.skip_docker
return [
check_env_config(),
check_inference_file(),
check_openenv_compliance(),
check_task_graders(),
check_inference_repro(),
check_docker_build(skip=args.skip_docker),
check_space_ping(space_url=args.space_url, skip=args.skip_space),
check_organizer_script(space_url=args.space_url, skip=organizer_skip),
]
def main() -> None:
parser = argparse.ArgumentParser(description="Pre-submission validator for Meta HF hackathon OpenEnv env.")
parser.add_argument("--space-url", default=os.getenv("SPACE_URL"), help="Deployed HF Space URL for ping checks")
parser.add_argument("--skip-docker", action="store_true", help="Skip docker build check")
parser.add_argument("--skip-space", action="store_true", help="Skip remote Space ping check")
parser.add_argument("--skip-organizer-script", action="store_true", help="Skip organizer-provided pre-validation script")
args = parser.parse_args()
results = run_all(args)
print("\n=== Pre-Submission Checklist Report ===")
for r in results:
status = "PASS" if r.passed else "FAIL"
print(f"[{status}] {r.name}: {r.detail}")
failed = [r for r in results if not r.passed]
print("\nSummary:")
print(f"- Total checks: {len(results)}")
print(f"- Passed: {len(results) - len(failed)}")
print(f"- Failed: {len(failed)}")
if failed:
sys.exit(1)
if __name__ == "__main__":
main()
|