""" Inference Script for Cloud-Native Debug Environment =================================== MANDATORY - Before submitting, ensure the following variables are defined in your environment configuration: API_BASE_URL The API endpoint for the LLM. MODEL_NAME The model identifier to use for inference. HF_TOKEN Your Hugging Face / API key. LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() method - Defaults are set only for API_BASE_URL and MODEL_NAME (and should reflect your active inference setup): API_BASE_URL = os.getenv("API_BASE_URL", "") MODEL_NAME = os.getenv("MODEL_NAME", "") - The inference script must be named `inference.py` and placed in the root directory of the project - Participants must use OpenAI Client for all LLM calls using above variables STDOUT FORMAT - The script must emit exactly three line types to stdout, in this order: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= Rules: - One [START] line at episode begin. - One [STEP] line per step, immediately after env.step() returns. - One [END] line after the episode completes, always emitted (even on exception). - reward and rewards are formatted to 2 decimal places. - done and success are lowercase booleans: true or false. - error is the raw error string, or null if none. - All fields on a single line with no newlines within a line. - Each tasks should return score in [0, 1] Example: [START] task=dockerfile_syntax env=cloud_native_devops model=meta-llama/Llama-3.1-70B-Instruct [STEP] step=1 action=edit_file reward=0.30 done=false error=null [STEP] step=2 action=submit reward=0.00 done=true error=null [END] success=true steps=2 score=0.850 rewards=0.30,0.00 """ import json import os import re import sys import time from typing import Any, Dict, List, Optional import requests from openai import OpenAI API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1" MODEL_NAME = os.getenv("MODEL_NAME") or "meta-llama/Llama-3.1-70B-Instruct" API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY") ENV_URL = os.getenv("ENV_URL", "http://localhost:7860") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") BENCHMARK = "cloud_native_devops" MAX_STEPS = 8 # leave 2 steps buffer before env hard-limit of 10 SUCCESS_SCORE_THRESHOLD = 0.1 # normalized score in [0, 1] SYSTEM_PROMPT = """You are an expert DevOps engineer debugging cloud-native deployment pipelines. You will receive broken Dockerfile, GitHub Actions workflow, and/or Kubernetes manifest files along with error messages. Your job is to: 1. Analyze the error message carefully 2. Identify the root cause in the configuration files 3. Provide a precise fix When you identify a fix, respond with a JSON object in this exact format: { "action_type": "YOUR_CHOSEN_ACTION_TYPE", "reasoning": "Brief explanation of the bug and fix", "edits": [ { "file_path": "path/to/file", "line_number": 5, // Only needed for replace_line, add_line, delete_line, add_block "old_content": "exactly broken", // Only needed for edit_file, delete_block "new_content": "corrected block" // Not needed for delete_line, delete_block } ] } Available action_type values for edits: - "edit_file" (requires old_content and new_content) - "replace_line" (requires line_number and new_content) - "add_line" (requires line_number and new_content) - "delete_line" (requires line_number) - "add_block" (requires line_number and new_content) - "delete_block" (requires old_content) To create a new file (e.g. a missing ConfigMap), use "edit_file" with empty old_content: { "action_type": "edit_file", "reasoning": "Create missing ConfigMap manifest", "edits": [ { "file_path": "k8s/configmap.yaml", "old_content": "", "new_content": "apiVersion: v1\\nkind: ConfigMap\\n..." } ] } If you believe all issues are fixed and want to submit, respond with: {"action_type": "submit"} If you need a hint, respond with: {"action_type": "request_hint"} Rules: - Match old_content EXACTLY as it appears in the file (whitespace matters) - Fix one issue at a time for precision - Focus on the error message — it tells you exactly what's wrong - Common issues: typos, wrong syntax, missing fields, wrong secret references - For GitHub Actions: check secret syntax (${{ }} not ${ }), env blocks, permissions - For Dockerfiles: check instruction syntax, file paths, base image tags - For Kubernetes: check label selectors, port matching, resource limits, probe configs, ingress rules - For full-stack pipelines: issues may span multiple files (workflow + Dockerfile + K8s manifests) - Always respond with valid JSON only, no markdown fences""" # --------------------------------------------------------------------------- # Logging helpers (mandatory stdout format) # --------------------------------------------------------------------------- def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) # --------------------------------------------------------------------------- # Client / env helpers # --------------------------------------------------------------------------- def create_client() -> OpenAI: """Create OpenAI-compatible client for HuggingFace router.""" return OpenAI( base_url=API_BASE_URL, api_key=API_KEY, ) def env_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]: """Make a request to the environment server.""" url = f"{ENV_URL}{endpoint}" if method == "GET": resp = requests.get(url, timeout=30) else: resp = requests.post(url, json=json_data or {}, timeout=30) resp.raise_for_status() return resp.json() def format_observation(obs: Dict[str, Any]) -> str: """Format observation into a prompt for the LLM.""" parts = [] parts.append(f"Task: {obs.get('task_description', 'Unknown')}") parts.append(f"Difficulty: {obs.get('difficulty', 'unknown')}") parts.append(f"Step: {obs.get('step_number', 0)}/{obs.get('max_steps', 10)}") parts.append(f"Issues fixed: {obs.get('issues_fixed', 0)}/{obs.get('total_issues', '?')}") error = obs.get("error", {}) parts.append(f"\n--- ERROR ---") parts.append(f"Phase: {error.get('phase', 'unknown')}") parts.append(f"Message: {error.get('error_message', 'No error')}") if error.get("failed_step"): parts.append(f"Failed step: {error['failed_step']}") if error.get("line_hint"): parts.append(f"Line hint: {error['line_hint']}") parts.append(f"\n--- FILES ---") for f in obs.get("files", []): parts.append(f"\n=== {f['path']} ({f.get('file_type', 'unknown')}) ===") content = f.get("content", "") lines = content.split("\n") for i, line in enumerate(lines, 1): parts.append(f"{i:3d} | {line}") if obs.get("available_secrets"): parts.append(f"\n--- AVAILABLE SECRETS ---") parts.append(", ".join(obs["available_secrets"])) if obs.get("last_action_feedback"): parts.append(f"\n--- LAST ACTION FEEDBACK ---") parts.append(obs["last_action_feedback"]) return "\n".join(parts) def parse_llm_response(text: str) -> Dict[str, Any]: """Parse LLM response into an action dict.""" text = text.strip() # Strip markdown code fences if present if text.startswith("```"): lines = text.split("\n") lines = [l for l in lines if not l.strip().startswith("```")] text = "\n".join(lines).strip() # Try to find JSON in the response json_match = re.search(r'\{[\s\S]*\}', text) if json_match: try: return json.loads(json_match.group()) except json.JSONDecodeError: pass # Fallback: treat as submit return {"action": "submit"} def build_action(parsed: Dict[str, Any]) -> Dict[str, Any]: """Convert parsed LLM response to environment action format.""" action_type = parsed.get("action_type") # Backwards compatibility and standard aliases if parsed.get("action") == "submit" or action_type == "submit": return {"action_type": "submit"} if parsed.get("action") == "hint" or action_type == "request_hint": return {"action_type": "request_hint"} edits = parsed.get("edits", []) if not edits and not action_type: return {"action_type": "submit"} action_str = action_type if action_type else "edit_file" return { "action_type": action_str, "edits": [ { "file_path": e.get("file_path", ""), "line_number": e.get("line_number"), "old_content": e.get("old_content", ""), "new_content": e.get("new_content", ""), } for e in edits ], } def run_episode(client: OpenAI, task_id: Optional[str] = None, scenario_id: Optional[str] = None) -> Dict[str, Any]: """Run a single episode: reset, loop (observe -> LLM -> act), grade.""" reset_payload: Dict[str, Any] = {} if task_id: reset_payload["task_id"] = task_id if scenario_id: reset_payload["scenario_id"] = scenario_id # Best-effort task name for Start target_task = task_id or "random_task" log_start(task=target_task, env=BENCHMARK, model=MODEL_NAME) trajectory = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False try: reset_resp = env_request("POST", "/reset", reset_payload) obs = reset_resp["observation"] info = reset_resp.get("info", {}) actual_task_id = info.get("task_id", target_task) actual_scenario_id = info.get("scenario_id", scenario_id or "unknown") messages = [{"role": "system", "content": SYSTEM_PROMPT}] for step_num in range(1, MAX_STEPS + 1): user_msg = format_observation(obs) messages.append({"role": "user", "content": user_msg}) error_msg: Optional[str] = None try: completion = client.chat.completions.create( model=MODEL_NAME, messages=messages, temperature=0.1, max_tokens=1024, ) llm_text = completion.choices[0].message.content or '{"action": "submit"}' except Exception as e: error_msg = str(e) print(f"[DEBUG] Model request failed: {e}", flush=True) llm_text = '{"action": "submit"}' messages.append({"role": "assistant", "content": llm_text}) parsed = parse_llm_response(llm_text) action = build_action(parsed) step_resp = env_request("POST", "/step", {"action": action}) obs = step_resp["observation"] reward = step_resp.get("reward", 0.0) done = step_resp.get("done", False) step_info = step_resp.get("info", {}) steps_taken = step_num rewards.append(reward) log_step( step=step_num, action=action["action_type"], reward=reward, done=done, error=error_msg, ) trajectory.append({ "step": step_num, "action": action, "reward": reward, "done": done, "info": step_info, }) if done: break # Grade the trajectory grade_resp = env_request("POST", "/grader", { "task_id": actual_task_id, "trajectory": trajectory, }) result = grade_resp.get("result", {}) score = result.get("score", 0.0) score = min(max(score, 0.0), 1.0) # clamp to [0, 1] success = score >= SUCCESS_SCORE_THRESHOLD finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) return {"score": score, "success": success, "steps": steps_taken, "rewards": rewards} def run_all_tasks(client: OpenAI) -> Dict[str, float]: """Run baseline on all tasks (and ALL their scenarios) and report scores.""" try: from server.tasks.task_registry import TASK_REGISTRY except ImportError as e: print(f"[DEBUG] Could not import TASK_REGISTRY: {e}", flush=True) return {} scores: Dict[str, List[float]] = {} for task_id, task_cls in TASK_REGISTRY.items(): task_scores = [] # Iterate over all exact scenarios for this task scenarios = task_cls.SCENARIOS for scenario in scenarios: scenario_id = scenario["id"] result = run_episode(client, task_id=task_id, scenario_id=scenario_id) task_scores.append(result.get("score", 0.0)) scores[task_id] = task_scores # Summary print(f"\n[DEBUG] {'='*60}", flush=True) print("[DEBUG] BASELINE RESULTS SUMMARY", flush=True) print(f"[DEBUG] {'='*60}", flush=True) avg_scores = {} for task_id, task_scores in scores.items(): avg = sum(task_scores) / len(task_scores) if task_scores else 0.0 avg_scores[task_id] = avg print(f"[DEBUG] {task_id:40s} {avg:.3f}", flush=True) overall = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0.0 print(f"[DEBUG] {'OVERALL':40s} {overall:.3f}", flush=True) return avg_scores def main(): """Entry point for baseline inference.""" if not API_KEY: print("[DEBUG] WARNING: HF_TOKEN not set. Set it via: export HF_TOKEN=your_token_here", flush=True) print("[DEBUG] Continuing anyway (will fail if auth is required)...", flush=True) # Verify environment is running try: health = env_request("GET", "/health") print(f"[DEBUG] Environment status: {health.get('status', 'unknown')}", flush=True) except Exception as e: print(f"[DEBUG] Cannot connect to environment at {ENV_URL}: {e}", flush=True) print("[DEBUG] Start the server first: python -m uvicorn server.app:app --host 0.0.0.0 --port 7860", flush=True) sys.exit(1) client = create_client() # If a specific task is requested via CLI arg if len(sys.argv) > 1: task_id = sys.argv[1] scenario_id = sys.argv[2] if len(sys.argv) > 2 else None run_episode(client, task_id=task_id, scenario_id=scenario_id) else: run_all_tasks(client) if __name__ == "__main__": main()