#!/usr/bin/env python3 # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Hackathon inference script for AEGIS-Env. MANDATORY / configurable environment variables: API_BASE_URL LLM API endpoint (default set below). MODEL_NAME Model id for chat completions (default set below). HF_TOKEN Hugging Face / API key for the OpenAI client. LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() method. OPENENV_BASE_URL The OpenEnv HTTP/WebSocket base URL (used when LOCAL_IMAGE_NAME is not set and --local is not used). Stdout (exact line shapes): [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.00> rewards= """ from __future__ import annotations import argparse import asyncio import contextlib import json import os import sys from typing import List, Optional, Tuple from openai import OpenAI from openenv.core.client_types import StepResult from models import AegisAction, AegisObservation # --- Defaults (API_BASE_URL / MODEL_NAME only; HF_TOKEN has no default) --- _DEFAULT_API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" _DEFAULT_MODEL_NAME = os.getenv("MODEL_NAME") or "meta-llama/Llama-3.2-3B-Instruct" API_BASE_URL = _DEFAULT_API_BASE_URL MODEL_NAME = _DEFAULT_MODEL_NAME LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") or "" OPENENV_BASE_URL = os.getenv("OPENENV_BASE_URL") or "http://127.0.0.1:8000" TASK_NAME = os.getenv("TASK_NAME", "easy") BENCHMARK = os.getenv("BENCHMARK", "AEGIS-Env") MAX_STEPS = int(os.getenv("MAX_STEPS", "1")) TEMPERATURE = float(os.getenv("TEMPERATURE", "0.2")) MAX_TOKENS = int(os.getenv("MAX_TOKENS", "4096")) SUCCESS_SCORE_THRESHOLD = float(os.getenv("SUCCESS_SCORE_THRESHOLD", "0.1")) def _llm_api_base_url(cli_override: Optional[str]) -> str: if cli_override is not None: return cli_override return os.getenv("API_BASE_URL") or API_BASE_URL def _model_name(cli_override: Optional[str]) -> str: if cli_override is not None: return cli_override return os.getenv("MODEL_NAME") or MODEL_NAME def _docker_image_name(cli_override: Optional[str]) -> str: if cli_override: return cli_override return (LOCAL_IMAGE_NAME or "").strip() def _done_str(done: bool) -> str: return str(done).lower() def _episode_score(rewards: List[float]) -> float: # In the multi-step pipeline with dense rewards, sum the rewards to get the total episode score. if not rewards: return 0.01 s = sum(rewards) s = min(1.0, max(0.0, s)) # Clamp to an exclusive range so we never emit exactly 0.00 or 1.00 in logs. if s <= 0.0: return 0.01 if s >= 1.0: return 0.99 if s < 0.01: return 0.01 if s > 0.99: return 0.99 return s def _one_line(s: str) -> str: return s.replace("\n", " ").replace("\r", " ") def _last_action_error_from_result(sr: Optional[StepResult[AegisObservation]]) -> Optional[str]: if sr is None: return None md = getattr(sr.observation, "metadata", None) or {} return md.get("last_action_error") or md.get("error") def _error_column(parse_exc: Optional[str], sr: Optional[StepResult[AegisObservation]]) -> str: if parse_exc is not None: return _one_line(parse_exc) lae = _last_action_error_from_result(sr) if lae is not None and str(lae).strip() != "": return _one_line(str(lae)) return "null" def _get_api_key() -> str: # Validator environments may omit API keys. We must never crash; in that case # we will fall back to a deterministic dummy action in the LLM call path. return str(os.environ.get("HF_TOKEN") or os.environ.get("API_KEY") or "").strip() def _fallback_action_json() -> str: # Minimal valid action JSON that will always parse. return json.dumps( { "proposed_score": 0.0, "agent_reasoning": "", "routing_decision": "proceed", }, ensure_ascii=False, ) def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: str, reward: float, done: bool, error_column: str, ) -> None: # Two spaces after [STEP] per hackathon format print( f"[STEP] step={step} action={action} reward={reward:.2f} " f"done={_done_str(done)} error={error_column}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True, ) def build_user_prompt( step: int, last_action: Optional[str], last_reward: float, history: List[str], obs: AegisObservation, ) -> str: stage = str(getattr(obs, "current_stage", "") or "") loops = int(getattr(obs, "refinement_loops_taken", 0) or 0) PROMPT_TEMPLATE = """{persona} --- PIPELINE HISTORY --- {pipeline_history} --- DATA --- Question: {question} Rubric: {rubric} Student Answer: {student_answer} Maximum Score: {max_score} You must output ONLY a raw JSON object. Do not wrap it in markdown formatting (e.g., no ```json). {{ "proposed_score": , "agent_reasoning": "", "routing_decision": "" }}""" if stage == "arbiter": persona = f"""You are **The Arbiter**, the initial routing/assessment agent in the automated grading pipeline. Goal: Perform the first comprehensive evaluation of the student's answer against the rubric. Instructions: - Conduct a thorough preliminary analysis against the rubric. - Propose an initial score and explain strengths + areas of concern. - Your reasoning will be used by later stages; be clear and complete. - Set routing_decision to 'proceed'.""" elif stage == "scrutinizer": persona = f"""You are **The Scrutinizer**, the first refinement agent. Goal: Critically examine and improve the current assessment. Instructions: - Review the pipeline history for accuracy, completeness, and rubric alignment. - Do a criterion-by-criterion verification; fix gaps or inconsistencies. - Refine the score and provide tighter justification grounded in the student answer. - Set routing_decision to 'proceed'.""" elif stage == "validator": persona = f"""You are **The Validator**, the quality assurance guardian of the evaluation process. Refinement loop: {loops}/2. Goal: Review the Scrutinizer's refined evaluation for fairness, accuracy, and consistency. Instructions: - Audit the pipeline history for fairness, accuracy, and consistency with the rubric. - Check for missing criteria, unjustified leaps, bias, or unclear feedback. - If deficiencies remain, set routing_decision to 'revise' (send back for another pass). - If the evaluation meets a high standard, set routing_decision to 'proceed'.""" elif stage == "mentor": persona = f"""You are **The Mentor**, the final agent who turns the validated assessment into a helpful report. Goal: Produce clear, personalized, actionable feedback for the student. Instructions: - Synthesize the final score and reasoning from the pipeline history. - Write encouraging, specific feedback: celebrate strengths, identify growth areas. - Provide actionable next steps (concrete improvements / study suggestions). - Set routing_decision to 'proceed'.""" else: persona = f"""You are an Automated Evaluation Agent. Goal: Return a reasonable score and clear reasoning. Instructions: - Follow the required JSON schema. - Set routing_decision to 'proceed'.""" return PROMPT_TEMPLATE.format( persona=persona.strip(), pipeline_history=str(getattr(obs, "pipeline_history", "") or ""), question=str(getattr(obs, "question", "") or ""), rubric=str(getattr(obs, "rubric", "") or ""), student_answer=str(getattr(obs, "student_answer", "") or ""), max_score=float(getattr(obs, "max_score", 1.0) or 1.0), ) ## NOTE: build_user_prompt is defined above (stage-aware). def _episode_seed(args: argparse.Namespace) -> Optional[int]: raw = getattr(args, "episode_seed", None) if raw is None: raw = os.environ.get("EPISODE_SEED") if raw is None or str(raw) == "": return None return int(raw) def _parse_action(raw: str, max_score: float) -> AegisAction: decoder = json.JSONDecoder() cleaned = _strip_markdown_json_fence(raw).strip() data = None first_err: Optional[Exception] = None # Tolerate reasoning/thinking prefixes by scanning for the first JSON object. for i, ch in enumerate(cleaned): if ch != "{": continue try: candidate, _ = decoder.raw_decode(cleaned[i:]) if isinstance(candidate, dict): data = candidate break except Exception as e: if first_err is None: first_err = e if data is None: if first_err is not None: raise first_err raise ValueError("No JSON object found in model output") return AegisAction( proposed_score=float(data.get("proposed_score", 0.0)), agent_reasoning=str(data.get("agent_reasoning", "")), routing_decision=str(data.get("routing_decision", "proceed")), ) def _action_log_str(action: AegisAction) -> str: return json.dumps( { "proposed_score": action.proposed_score, "routing": action.routing_decision, "reasoning": (action.agent_reasoning or "")[:100] + "...", }, ensure_ascii=False, ) def _complete_grading( llm: OpenAI, model: str, prompt: str, temperature: float, max_tokens: int, llm_enabled: bool, ) -> str: if not llm_enabled: return _fallback_action_json() kwargs = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, } fallback_kwargs = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, } if max_tokens > 0: kwargs["max_tokens"] = max_tokens fallback_kwargs["max_completion_tokens"] = max_tokens # Some OpenAI-compatible providers only accept one token-limit field. try: completion = llm.chat.completions.create(**kwargs) except Exception as first_exc: try: completion = llm.chat.completions.create(**fallback_kwargs) except Exception: return _fallback_action_json() if not completion.choices: return _fallback_action_json() msg = completion.choices[0].message content = getattr(msg, "content", None) if isinstance(content, str): return content.strip() if isinstance(content, list): # Some providers/modes return structured content parts. text_parts: List[str] = [] for part in content: if isinstance(part, dict): t = part.get("text") if isinstance(t, str): text_parts.append(t) else: t = getattr(part, "text", None) if isinstance(t, str): text_parts.append(t) return "\n".join(text_parts).strip() return "" def _repair_prompt_from_invalid_output(raw: str, max_score: float) -> str: clipped = raw[:2000] return ( "Your previous response was not valid JSON.\n" "Return ONLY a valid JSON object (no markdown, no extra text) with exactly these keys:\n" '{\n' f' "final_score": ,\n' ' "score_justification": "",\n' ' "improvement_advice": ""\n' "}\n\n" "Invalid response to fix:\n" f"{clipped}" ) def _get_action_with_retry( llm: OpenAI, model: str, prompt: str, temperature: float, max_tokens: int, max_score: float, llm_enabled: bool, ) -> Tuple[AegisAction, str]: text = _strip_markdown_json_fence( _complete_grading(llm, model, prompt, temperature, max_tokens, llm_enabled) ) try: return _parse_action(text, max_score), text except Exception as first_exc: repair_prompt = _repair_prompt_from_invalid_output(text, max_score) repaired = _strip_markdown_json_fence( _complete_grading(llm, model, repair_prompt, 0.0, max_tokens, llm_enabled) ) try: return _parse_action(repaired, max_score), repaired except Exception as second_exc: raise ValueError( f"parse_error_after_retry: first={first_exc!s}; second={second_exc!s}" ) from second_exc def _strip_markdown_json_fence(text: str) -> str: if text.startswith("```"): lines = text.split("\n") return "\n".join(lines[1:-1] if lines[-1].strip() == "```" else lines[1:]) return text def run_local_episode( task_name: str, benchmark: str, model: str, llm: OpenAI, max_steps: int, temperature: float, max_tokens: int, episode_seed: Optional[int], success_score_threshold: float, ) -> Tuple[bool, int, List[float]]: """In-process environment (sync).""" from server.aegis_env_environment import AegisEnvironment rewards: List[float] = [] steps_taken = 0 history: List[str] = [] last_action: Optional[str] = None last_reward = 0.0 log_start(task_name, benchmark, model) env = AegisEnvironment() obs: AegisObservation try: obs = env.reset(seed=episode_seed, task_name=task_name) for step in range(1, max_steps + 1): prompt = build_user_prompt(step, last_action, last_reward, history, obs) text = "" try: action, text = _get_action_with_retry( llm, model, prompt, temperature, max_tokens, obs.max_score, llm_enabled=bool(getattr(llm, "_aegis_llm_enabled", True)), ) except Exception as e: err = f"parse_error: {e!s}" steps_taken = step rewards.append(0.0) log_step(step, repr(text[:300]), 0.0, True, _error_column(err, None)) history.append(f"step={step} parse_error={_one_line(str(e))}") last_action = None last_reward = 0.0 continue out = env.step(action) r = float(getattr(out, "reward", None) or 0.0) rewards.append(r) steps_taken = step last_action = _action_log_str(action) last_reward = r history.append(f"step={step} action={last_action} reward={r:.2f}") log_step( step, repr(_action_log_str(action)), r, bool(getattr(out, "done", False)), _error_column(None, None), ) obs = out if bool(getattr(out, "done", False)): break return True, steps_taken, rewards except Exception as e: err = f"{type(e).__name__}: {e!s}" steps_taken = max(steps_taken, 1) rewards.append(0.0) log_step(steps_taken, "", 0.0, True, _error_column(err, None)) return False, steps_taken, rewards finally: env.close() async def run_remote_episode_async( task_name: str, benchmark: str, model: str, llm: OpenAI, docker_image: str, openenv_base_url: str, max_steps: int, temperature: float, max_tokens: int, episode_seed: Optional[int], success_score_threshold: float, ) -> Tuple[bool, int, List[float]]: """WebSocket client: Docker image or HTTP base URL.""" from client import AegisEnv rewards: List[float] = [] steps_taken = 0 history: List[str] = [] last_action: Optional[str] = None last_reward = 0.0 log_start(task_name, benchmark, model) env: Optional["AegisEnv"] = None try: if docker_image: # Ensure the container sees the selected task. env = await AegisEnv.from_docker_image( docker_image, env_vars={"TASK_NAME": task_name}, ) else: env = AegisEnv(base_url=openenv_base_url) await env.connect() result = await env.reset(seed=episode_seed, task_name=task_name) obs = result.observation for step in range(1, max_steps + 1): prompt = build_user_prompt(step, last_action, last_reward, history, obs) text = "" try: action, text = _get_action_with_retry( llm, model, prompt, temperature, max_tokens, obs.max_score, llm_enabled=bool(getattr(llm, "_aegis_llm_enabled", True)), ) except Exception as e: err = f"parse_error: {e!s}" steps_taken = step rewards.append(0.0) log_step(step, repr(text[:300]), 0.0, True, _error_column(err, None)) history.append(f"step={step} parse_error={_one_line(str(e))}") last_action = None last_reward = 0.0 continue result = await env.step(action) r = float(result.reward or 0.0) rewards.append(r) steps_taken = step last_action = _action_log_str(action) last_reward = r history.append(f"step={step} action={last_action} reward={r:.2f}") log_step( step, repr(_action_log_str(action)), r, result.done, _error_column(None, result), ) obs = result.observation if result.done: break return True, steps_taken, rewards except Exception as e: # If we already met target score, treat late transport failures as non-fatal. if rewards and _episode_score(rewards) >= success_score_threshold: return True, max(steps_taken, 1), rewards err = f"{type(e).__name__}: {e!s}" steps_taken = max(steps_taken, 1) if not rewards: rewards.append(0.0) log_step(steps_taken, "", 0.0, True, _error_column(err, None)) return False, steps_taken, rewards finally: if env is not None: with contextlib.suppress(asyncio.CancelledError, Exception): await env.close() async def run_episode_async( args: argparse.Namespace, model: str, llm: OpenAI, ) -> Tuple[bool, int, List[float]]: docker_image = _docker_image_name(getattr(args, "docker_image", None) or None) if args.local: return await asyncio.to_thread( run_local_episode, args.task_name, args.benchmark, model, llm, args.max_steps, args.temperature, args.max_tokens, _episode_seed(args), float(args.success_score_threshold), ) return await run_remote_episode_async( args.task_name, args.benchmark, model, llm, docker_image, args.openenv_base_url, args.max_steps, args.temperature, args.max_tokens, _episode_seed(args), float(args.success_score_threshold), ) async def main_async(argv: Optional[List[str]] = None) -> int: p = argparse.ArgumentParser( description="AEGIS-Env inference (OpenAI client + async OpenEnv)", ) p.add_argument( "--task-name", default=TASK_NAME, help="Logged task name", ) p.add_argument( "--benchmark", default=BENCHMARK, help="Logged benchmark name", ) p.add_argument("--model", default=None, help="Override MODEL_NAME") p.add_argument("--api-base-url", default=None, dest="api_base_url") p.add_argument("--openenv-base-url", default=OPENENV_BASE_URL) p.add_argument("--max-steps", type=int, default=MAX_STEPS) p.add_argument("--temperature", type=float, default=TEMPERATURE) p.add_argument("--max-tokens", type=int, default=MAX_TOKENS) p.add_argument("--success-score-threshold", type=float, default=SUCCESS_SCORE_THRESHOLD) p.add_argument("--episode-seed", type=int, default=None) p.add_argument( "--single-task", action="store_true", help="Run only the selected --task-name (disable default easy/medium/hard sweep).", ) p.add_argument( "--docker-image", default=None, help="Override LOCAL_IMAGE_NAME for from_docker_image()", ) p.add_argument( "--local", action="store_true", help="In-process environment (MONGO_URI); no HTTP/Docker client", ) args = p.parse_args(argv) success = False steps_out = 0 rewards_out: List[float] = [] exit_code = 1 try: api_key = _get_api_key() api_base = _llm_api_base_url(args.api_base_url) model = _model_name(args.model) llm = OpenAI(base_url=api_base, api_key=api_key or "no-key") llm._aegis_llm_enabled = bool(api_key) # type: ignore[attr-defined] # By default, run one example for each task tier to satisfy validators that # require multiple tasks with graders. tasks = [args.task_name] if args.single_task else ["easy", "medium", "hard"] # Each task is its own episode block with its own START/STEP/END lines. overall_success = True overall_exit_code = 0 for t in tasks: per_steps = 0 per_rewards: List[float] = [] per_success = False try: args.task_name = t _, per_steps, per_rewards = await run_episode_async(args, model, llm) per_score = _episode_score(per_rewards) per_success = per_score >= float(args.success_score_threshold) except Exception as e: # Ensure we always emit END even on errors. print(f"fatal_error={type(e).__name__}: {e!s}", file=sys.stderr) per_steps = max(per_steps, 1) if not per_rewards: per_rewards = [0.0] per_success = False per_score = _episode_score(per_rewards) finally: log_end( per_success, per_steps, _episode_score(per_rewards), per_rewards, ) overall_success = overall_success and per_success overall_exit_code = 0 if overall_success else 1 # Populate legacy outputs for completeness (not used for logging now). success = overall_success exit_code = overall_exit_code except SystemExit: raise except KeyboardInterrupt: success = False exit_code = 130 except Exception as e: print(f"fatal_error={type(e).__name__}: {e!s}", file=sys.stderr) success = False exit_code = 1 finally: # END lines are emitted per-task above; nothing to do here. pass return exit_code if __name__ == "__main__": asyncio.run(main_async())