Krishna1107's picture
inference fixed, port changed to 7860
eb895b1
"""
Inference Script for Cloud-Native Debug Environment
===================================
MANDATORY
- Before submitting, ensure the following variables are defined in your environment configuration:
API_BASE_URL The API endpoint for the LLM.
MODEL_NAME The model identifier to use for inference.
HF_TOKEN Your Hugging Face / API key.
LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image()
method
- Defaults are set only for API_BASE_URL and MODEL_NAME
(and should reflect your active inference setup):
API_BASE_URL = os.getenv("API_BASE_URL", "<your-active-endpoint>")
MODEL_NAME = os.getenv("MODEL_NAME", "<your-active-model>")
- The inference script must be named `inference.py` and placed in the root directory of the project
- Participants must use OpenAI Client for all LLM calls using above variables
STDOUT FORMAT
- The script must emit exactly three line types to stdout, in this order:
[START] task=<task_name> env=<benchmark> model=<model_name>
[STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
[END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
Rules:
- One [START] line at episode begin.
- One [STEP] line per step, immediately after env.step() returns.
- One [END] line after the episode completes, always emitted (even on exception).
- reward and rewards are formatted to 2 decimal places.
- done and success are lowercase booleans: true or false.
- error is the raw error string, or null if none.
- All fields on a single line with no newlines within a line.
- Each tasks should return score in [0, 1]
Example:
[START] task=dockerfile_syntax env=cloud_native_devops model=meta-llama/Llama-3.1-70B-Instruct
[STEP] step=1 action=edit_file reward=0.30 done=false error=null
[STEP] step=2 action=submit reward=0.00 done=true error=null
[END] success=true steps=2 score=0.850 rewards=0.30,0.00
"""
import json
import os
import re
import sys
import time
from typing import Any, Dict, List, Optional
import requests
from openai import OpenAI
API_BASE_URL = os.getenv("API_BASE_URL") or "https://router.huggingface.co/v1"
MODEL_NAME = os.getenv("MODEL_NAME") or "meta-llama/Llama-3.1-70B-Instruct"
API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
ENV_URL = os.getenv("ENV_URL", "http://localhost:7860")
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
BENCHMARK = "cloud_native_devops"
MAX_STEPS = 8 # leave 2 steps buffer before env hard-limit of 10
SUCCESS_SCORE_THRESHOLD = 0.1 # normalized score in [0, 1]
SYSTEM_PROMPT = """You are an expert DevOps engineer debugging cloud-native deployment pipelines.
You will receive broken Dockerfile, GitHub Actions workflow, and/or Kubernetes manifest files along with error messages.
Your job is to:
1. Analyze the error message carefully
2. Identify the root cause in the configuration files
3. Provide a precise fix
When you identify a fix, respond with a JSON object in this exact format:
{
"action_type": "YOUR_CHOSEN_ACTION_TYPE",
"reasoning": "Brief explanation of the bug and fix",
"edits": [
{
"file_path": "path/to/file",
"line_number": 5, // Only needed for replace_line, add_line, delete_line, add_block
"old_content": "exactly broken", // Only needed for edit_file, delete_block
"new_content": "corrected block" // Not needed for delete_line, delete_block
}
]
}
Available action_type values for edits:
- "edit_file" (requires old_content and new_content)
- "replace_line" (requires line_number and new_content)
- "add_line" (requires line_number and new_content)
- "delete_line" (requires line_number)
- "add_block" (requires line_number and new_content)
- "delete_block" (requires old_content)
To create a new file (e.g. a missing ConfigMap), use "edit_file" with empty old_content:
{
"action_type": "edit_file",
"reasoning": "Create missing ConfigMap manifest",
"edits": [
{
"file_path": "k8s/configmap.yaml",
"old_content": "",
"new_content": "apiVersion: v1\\nkind: ConfigMap\\n..."
}
]
}
If you believe all issues are fixed and want to submit, respond with:
{"action_type": "submit"}
If you need a hint, respond with:
{"action_type": "request_hint"}
Rules:
- Match old_content EXACTLY as it appears in the file (whitespace matters)
- Fix one issue at a time for precision
- Focus on the error message — it tells you exactly what's wrong
- Common issues: typos, wrong syntax, missing fields, wrong secret references
- For GitHub Actions: check secret syntax (${{ }} not ${ }), env blocks, permissions
- For Dockerfiles: check instruction syntax, file paths, base image tags
- For Kubernetes: check label selectors, port matching, resource limits, probe configs, ingress rules
- For full-stack pipelines: issues may span multiple files (workflow + Dockerfile + K8s manifests)
- Always respond with valid JSON only, no markdown fences"""
# ---------------------------------------------------------------------------
# Logging helpers (mandatory stdout format)
# ---------------------------------------------------------------------------
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True)
# ---------------------------------------------------------------------------
# Client / env helpers
# ---------------------------------------------------------------------------
def create_client() -> OpenAI:
"""Create OpenAI-compatible client for HuggingFace router."""
return OpenAI(
base_url=API_BASE_URL,
api_key=API_KEY,
)
def env_request(method: str, endpoint: str, json_data: Optional[Dict] = None) -> Dict[str, Any]:
"""Make a request to the environment server."""
url = f"{ENV_URL}{endpoint}"
if method == "GET":
resp = requests.get(url, timeout=30)
else:
resp = requests.post(url, json=json_data or {}, timeout=30)
resp.raise_for_status()
return resp.json()
def format_observation(obs: Dict[str, Any]) -> str:
"""Format observation into a prompt for the LLM."""
parts = []
parts.append(f"Task: {obs.get('task_description', 'Unknown')}")
parts.append(f"Difficulty: {obs.get('difficulty', 'unknown')}")
parts.append(f"Step: {obs.get('step_number', 0)}/{obs.get('max_steps', 10)}")
parts.append(f"Issues fixed: {obs.get('issues_fixed', 0)}/{obs.get('total_issues', '?')}")
error = obs.get("error", {})
parts.append(f"\n--- ERROR ---")
parts.append(f"Phase: {error.get('phase', 'unknown')}")
parts.append(f"Message: {error.get('error_message', 'No error')}")
if error.get("failed_step"):
parts.append(f"Failed step: {error['failed_step']}")
if error.get("line_hint"):
parts.append(f"Line hint: {error['line_hint']}")
parts.append(f"\n--- FILES ---")
for f in obs.get("files", []):
parts.append(f"\n=== {f['path']} ({f.get('file_type', 'unknown')}) ===")
content = f.get("content", "")
lines = content.split("\n")
for i, line in enumerate(lines, 1):
parts.append(f"{i:3d} | {line}")
if obs.get("available_secrets"):
parts.append(f"\n--- AVAILABLE SECRETS ---")
parts.append(", ".join(obs["available_secrets"]))
if obs.get("last_action_feedback"):
parts.append(f"\n--- LAST ACTION FEEDBACK ---")
parts.append(obs["last_action_feedback"])
return "\n".join(parts)
def parse_llm_response(text: str) -> Dict[str, Any]:
"""Parse LLM response into an action dict."""
text = text.strip()
# Strip markdown code fences if present
if text.startswith("```"):
lines = text.split("\n")
lines = [l for l in lines if not l.strip().startswith("```")]
text = "\n".join(lines).strip()
# Try to find JSON in the response
json_match = re.search(r'\{[\s\S]*\}', text)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Fallback: treat as submit
return {"action": "submit"}
def build_action(parsed: Dict[str, Any]) -> Dict[str, Any]:
"""Convert parsed LLM response to environment action format."""
action_type = parsed.get("action_type")
# Backwards compatibility and standard aliases
if parsed.get("action") == "submit" or action_type == "submit":
return {"action_type": "submit"}
if parsed.get("action") == "hint" or action_type == "request_hint":
return {"action_type": "request_hint"}
edits = parsed.get("edits", [])
if not edits and not action_type:
return {"action_type": "submit"}
action_str = action_type if action_type else "edit_file"
return {
"action_type": action_str,
"edits": [
{
"file_path": e.get("file_path", ""),
"line_number": e.get("line_number"),
"old_content": e.get("old_content", ""),
"new_content": e.get("new_content", ""),
}
for e in edits
],
}
def run_episode(client: OpenAI, task_id: Optional[str] = None, scenario_id: Optional[str] = None) -> Dict[str, Any]:
"""Run a single episode: reset, loop (observe -> LLM -> act), grade."""
reset_payload: Dict[str, Any] = {}
if task_id:
reset_payload["task_id"] = task_id
if scenario_id:
reset_payload["scenario_id"] = scenario_id
# Best-effort task name for Start
target_task = task_id or "random_task"
log_start(task=target_task, env=BENCHMARK, model=MODEL_NAME)
trajectory = []
rewards: List[float] = []
steps_taken = 0
score = 0.0
success = False
try:
reset_resp = env_request("POST", "/reset", reset_payload)
obs = reset_resp["observation"]
info = reset_resp.get("info", {})
actual_task_id = info.get("task_id", target_task)
actual_scenario_id = info.get("scenario_id", scenario_id or "unknown")
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
for step_num in range(1, MAX_STEPS + 1):
user_msg = format_observation(obs)
messages.append({"role": "user", "content": user_msg})
error_msg: Optional[str] = None
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=0.1,
max_tokens=1024,
)
llm_text = completion.choices[0].message.content or '{"action": "submit"}'
except Exception as e:
error_msg = str(e)
print(f"[DEBUG] Model request failed: {e}", flush=True)
llm_text = '{"action": "submit"}'
messages.append({"role": "assistant", "content": llm_text})
parsed = parse_llm_response(llm_text)
action = build_action(parsed)
step_resp = env_request("POST", "/step", {"action": action})
obs = step_resp["observation"]
reward = step_resp.get("reward", 0.0)
done = step_resp.get("done", False)
step_info = step_resp.get("info", {})
steps_taken = step_num
rewards.append(reward)
log_step(
step=step_num,
action=action["action_type"],
reward=reward,
done=done,
error=error_msg,
)
trajectory.append({
"step": step_num,
"action": action,
"reward": reward,
"done": done,
"info": step_info,
})
if done:
break
# Grade the trajectory
grade_resp = env_request("POST", "/grader", {
"task_id": actual_task_id,
"trajectory": trajectory,
})
result = grade_resp.get("result", {})
score = result.get("score", 0.0)
score = min(max(score, 0.0), 1.0) # clamp to [0, 1]
success = score >= SUCCESS_SCORE_THRESHOLD
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
return {"score": score, "success": success, "steps": steps_taken, "rewards": rewards}
def run_all_tasks(client: OpenAI) -> Dict[str, float]:
"""Run baseline on all tasks (and ALL their scenarios) and report scores."""
try:
from server.tasks.task_registry import TASK_REGISTRY
except ImportError as e:
print(f"[DEBUG] Could not import TASK_REGISTRY: {e}", flush=True)
return {}
scores: Dict[str, List[float]] = {}
for task_id, task_cls in TASK_REGISTRY.items():
task_scores = []
# Iterate over all exact scenarios for this task
scenarios = task_cls.SCENARIOS
for scenario in scenarios:
scenario_id = scenario["id"]
result = run_episode(client, task_id=task_id, scenario_id=scenario_id)
task_scores.append(result.get("score", 0.0))
scores[task_id] = task_scores
# Summary
print(f"\n[DEBUG] {'='*60}", flush=True)
print("[DEBUG] BASELINE RESULTS SUMMARY", flush=True)
print(f"[DEBUG] {'='*60}", flush=True)
avg_scores = {}
for task_id, task_scores in scores.items():
avg = sum(task_scores) / len(task_scores) if task_scores else 0.0
avg_scores[task_id] = avg
print(f"[DEBUG] {task_id:40s} {avg:.3f}", flush=True)
overall = sum(avg_scores.values()) / len(avg_scores) if avg_scores else 0.0
print(f"[DEBUG] {'OVERALL':40s} {overall:.3f}", flush=True)
return avg_scores
def main():
"""Entry point for baseline inference."""
if not API_KEY:
print("[DEBUG] WARNING: HF_TOKEN not set. Set it via: export HF_TOKEN=your_token_here", flush=True)
print("[DEBUG] Continuing anyway (will fail if auth is required)...", flush=True)
# Verify environment is running
try:
health = env_request("GET", "/health")
print(f"[DEBUG] Environment status: {health.get('status', 'unknown')}", flush=True)
except Exception as e:
print(f"[DEBUG] Cannot connect to environment at {ENV_URL}: {e}", flush=True)
print("[DEBUG] Start the server first: python -m uvicorn server.app:app --host 0.0.0.0 --port 7860", flush=True)
sys.exit(1)
client = create_client()
# If a specific task is requested via CLI arg
if len(sys.argv) > 1:
task_id = sys.argv[1]
scenario_id = sys.argv[2] if len(sys.argv) > 2 else None
run_episode(client, task_id=task_id, scenario_id=scenario_id)
else:
run_all_tasks(client)
if __name__ == "__main__":
main()