""" TeamForge Inference Script =========================== MANDATORY COMPLIANCE: - Named `inference.py` in root directory - Uses OpenAI client for all LLM calls - Emits exact [START] / [STEP] / [END] stdout format - Reads API_BASE_URL, MODEL_NAME, HF_TOKEN from environment ENV VARS: API_BASE_URL LLM endpoint (default: Groq) MODEL_NAME Model string (default: llama3-8b-8192) HF_TOKEN API key (Groq key or HuggingFace token) STDOUT FORMAT (strict): [START] task= env=teamforge model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.00> rewards= USAGE: python inference.py # runs all 3 tasks python inference.py --task easy_bugfix_chunk_list python inference.py --task all --max-steps 20 """ from __future__ import annotations import argparse import json import os import sys import time from typing import Any, Dict, List, Optional from openai import OpenAI # ── Local imports ────────────────────────────────────────────────────────────── from environment import TeamForgeEnv from models import ( Commit, EditFile, GenerateReview, Observation, PlanStep, RequestIteration, RunLint, RunTests, SelfReflect, ) from tasks.task_registry import SCORED_TASK_IDS # easy, medium, hard (not bonus) # ── Configuration (all from env vars — mandatory per spec) ──────────────────── API_BASE_URL = os.getenv("API_BASE_URL", "https://api.groq.com/openai/v1") MODEL_NAME = os.getenv("MODEL_NAME", "llama3-8b-8192") HF_TOKEN = os.getenv("HF_TOKEN") BENCHMARK = "teamforge" TEMPERATURE = 0.15 MAX_TOKENS = 1800 # ── System prompt ───────────────────────────────────────────────────────────── SYSTEM_PROMPT = """\ You are TeamForge-Agent, an autonomous AI software engineer. Work through tasks in phases: PLAN → CODE → TEST → LINT → REVIEW → REFLECT → COMMIT RULES: • Never modify test files (path contains "test") • Emit ≥2 plan_step actions before any edit_file • Always run_tests after editing before committing • generate_review must mention specific code details • Commit message must follow Conventional Commits: fix/feat/refactor/perf(scope): desc • Return ONLY valid JSON — no markdown fences, no explanation ACTIONS (return exactly one per turn as JSON): {"type":"plan_step", "step_number":1, "description":"...", "estimated_effort":"low|medium|high", "depends_on":[]} {"type":"edit_file", "file_path":"...", "content":"", "reason":"..."} {"type":"run_tests", "timeout_seconds":30} {"type":"run_lint", "fix":false} {"type":"generate_review", "focus_areas":["correctness"], "review_text":"..."} {"type":"commit", "message":"fix(scope): description", "files":[]} {"type":"self_reflect", "what_went_well":"...", "what_to_improve":"..."} {"type":"request_iteration","reason":"...", "target_issues":[]} """ # ── Agent ───────────────────────────────────────────────────────────────────── class Agent: def __init__(self, client: OpenAI): self.client = client self.history: List[Dict] = [] def reset(self) -> None: self.history = [] def act(self, obs: Observation) -> Optional[Any]: self.history.append({"role": "user", "content": self._obs_to_text(obs)}) for attempt in range(3): try: resp = self.client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, *self.history[-12:], ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, ) raw = resp.choices[0].message.content.strip() self.history.append({"role": "assistant", "content": raw}) return self._parse(raw) except Exception: time.sleep(1.5 ** attempt) return None def _obs_to_text(self, obs: Observation) -> str: lines = [ f"TASK: {obs.task_id} | STEP {obs.step_number}/{obs.max_steps} | PHASE: {obs.phase.value}", f"REWARD_SO_FAR: {obs.cumulative_reward:.3f}", f"\n## TASK DESCRIPTION\n{obs.task_description[:500]}", ] if obs.last_action_type: lines.append(f"\n## LAST: {obs.last_action_type} → {obs.last_action_status.value}") lines.append(f"```\n{obs.last_action_output[:500]}\n```") if obs.test_results: t = obs.test_results lines.append(f"\n## TESTS: {t.passed}p / {t.failed}f / {t.errors}e") if t.failed or t.errors: lines.append(f"```\n{t.output[-500:]}\n```") if obs.lint_results and obs.lint_results.violations: lines.append(f"\n## LINT: {obs.lint_results.violations} violations") lines.append("\n## REPO FILES") for f in obs.repo_files[:8]: if f.size_bytes < 4000: lines.append(f"\n### {f.path}\n```\n{f.content[:800]}\n```") if obs.plan: lines.append(f"\n## PLAN ({len(obs.plan)} steps recorded)") for s in obs.plan[-3:]: lines.append(f" {s.step_number}. {s.description}") lines.append("\n## YOUR NEXT ACTION (JSON only, no markdown):") return "\n".join(lines) def _parse(self, text: str) -> Optional[Any]: import re # Strip markdown fences if present text = re.sub(r'^```(?:json)?\s*', '', text.strip(), flags=re.MULTILINE) text = re.sub(r'\s*```$', '', text.strip(), flags=re.MULTILINE) text = text.strip() dispatch = { "plan_step": PlanStep, "edit_file": EditFile, "run_tests": RunTests, "run_lint": RunLint, "generate_review": GenerateReview, "commit": Commit, "self_reflect": SelfReflect, "request_iteration": RequestIteration, } # Try direct parse try: data = json.loads(text) cls = dispatch.get(data.get("type", "")) return cls(**data) if cls else None except Exception: pass # Try extracting JSON object from response m = re.search(r'\{.*\}', text, re.DOTALL) if m: try: data = json.loads(m.group()) cls = dispatch.get(data.get("type", "")) return cls(**data) if cls else None except Exception: pass return None # ── Episode runner (emits mandatory log format) ─────────────────────────────── def run_episode(env: TeamForgeEnv, agent: Agent, task_id: str) -> Dict: """ Run one episode and emit the mandatory stdout log lines. Stdout format (strict): [START] task= env=teamforge model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score=<0.00> rewards= """ agent.reset() obs = env.reset(task_id) rewards: List[float] = [] error_msg: Optional[str] = None # ── [START] ──────────────────────────────────────────────────────────────── print(f"[START] task={task_id} env={BENCHMARK} model={MODEL_NAME}", flush=True) step_count = 0 try: while not obs.done: action = agent.act(obs) if action is None: error_msg = "agent_returned_none" # Emit a [STEP] for the failed action print( f"[STEP] step={obs.step_number + 1} action=null " f"reward=0.10 done=false error={error_msg}", flush=True, ) break obs = env.step(action) step_count = obs.step_number rewards.append(obs.reward) err_str = "null" done_str = "true" if obs.done else "false" # ── [STEP] ──────────────────────────────────────────────────────── print( f"[STEP] step={obs.step_number} action={obs.last_action_type} " f"reward={obs.reward:.4f} done={done_str} error={err_str}", flush=True, ) except Exception as exc: error_msg = str(exc).replace("\n", " ")[:120] # Writing metadata for standalone OpenEnv grader try: from tasks.task_registry import get_task task_module = get_task(task_id) meta_payload = { "task_id": task_id, "total_steps": step_count, "max_steps": task_module.MAX_STEPS, "reviews": [r.model_dump() for r in env._reviews], "reflections": [r.model_dump() for r in env._reflections], "required_keywords": getattr(task_module, "REQUIRED_KEYWORDS_IN_REVIEW", []), } with open(os.path.join(str(env._sandbox.repo_path), "grading_metadata.json"), "w") as f: json.dump(meta_payload, f) except Exception: pass # Grade the episode result = env.grade() score = result.final_score success = result.passed rewards_str = ",".join(f"{r:.4f}" for r in rewards) if rewards else "0.1000" # ── [END] ───────────────────────────────────────────────────────────────── # We use 4 decimal places to ensure that interior scores (e.g. 0.999) # are never rounded to illegal boundary values (1.00) in the logs. print( f"[END] success={'true' if success else 'false'} steps={step_count} " f"score={score:.4f} rewards={rewards_str}", flush=True, ) return { "task_id": task_id, "success": success, "steps": step_count, "score": score, "rewards": rewards, "error": error_msg, } # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="TeamForge Inference Script") parser.add_argument( "--task", choices=SCORED_TASK_IDS + ["all"], default="all", help="Task to run (default: all)", ) parser.add_argument( "--max-steps", type=int, default=None, help="Override max steps per episode", ) args = parser.parse_args() if not HF_TOKEN: print("[ERROR] HF_TOKEN environment variable not set.", file=sys.stderr) sys.exit(1) client = OpenAI(api_key=HF_TOKEN, base_url=API_BASE_URL) agent = Agent(client) env = TeamForgeEnv() task_ids = SCORED_TASK_IDS if args.task == "all" else [args.task] all_results = [] for task_id in task_ids: result = run_episode(env, agent, task_id) all_results.append(result) env._sandbox.teardown() # Summary to stderr (not stdout — keeps stdout format clean) print("\n=== SUMMARY ===", file=sys.stderr) for r in all_results: status = "PASS" if r["success"] else "FAIL" print(f" [{status}] {r['task_id']:45s} score={r['score']:.4f} steps={r['steps']}", file=sys.stderr) if __name__ == "__main__": main()