""" Inference Script Example =================================== MANDATORY - Before submitting, ensure the following variables are defined in your environment configuration: API_BASE_URL The API endpoint for the LLM. MODEL_NAME The model identifier to use for inference. HF_TOKEN Your Hugging Face / API key. LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() method - Defaults are set only for API_BASE_URL and MODEL_NAME (and should reflect your active inference setup): API_BASE_URL = os.getenv("API_BASE_URL", "") MODEL_NAME = os.getenv("MODEL_NAME", "") - The inference script must be named `inference.py` and placed in the root directory of the project - Participants must use OpenAI Client for all LLM calls using above variables STDOUT FORMAT - The script must emit exactly three line types to stdout, in this order: [START] task= env= model= [STEP] step= action= reward=<0.00> done= error= [END] success= steps= score= rewards= Rules: - One [START] line at episode begin. - One [STEP] line per step, immediately after env.step() returns. - One [END] line after env.close(), always emitted (even on exception). - reward and rewards are formatted to 2 decimal places. - done and success are lowercase booleans: true or false. - error is the raw last_action_error string, or null if none. - All fields on a single line with no newlines within a line. - Each tasks should return score in [0, 1] Example: [START] task=click-test env=miniwob model=Qwen3-VL-30B [STEP] step=1 action=click('123') reward=0.00 done=false error=null [STEP] step=2 action=fill('456','text') reward=0.00 done=false error=null [STEP] step=3 action=click('789') reward=1.00 done=true error=null [END] success=true steps=3 score=1.00 rewards=0.00,0.00,1.00 """ import asyncio import json import os import textwrap from typing import Any, Callable, Dict, List, Optional from openai import OpenAI try: from dotenv import load_dotenv except ImportError: load_dotenv = None try: from models import CoenvAction from client import CoEnv except ImportError: from models import CoenvAction from client import CoEnv from server.graders.grader_pod_recovery import grade as grade_pod_recovery from server.graders.grader_autoscaling import grade as grade_autoscaling from server.graders.grader_incident import grade as grade_incident if load_dotenv is not None: load_dotenv() LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://router.huggingface.co/v1") ENV_URL = os.getenv("API_BASE_URL", "http://localhost:8000") API_DELAY = float(os.getenv("API_DELAY", "0")) MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen3-8B") API_KEY = os.getenv("OPENROUTER_API_KEY") or os.getenv("HF_TOKEN") BENCHMARKS = ["POD_RECOVERY", "AUTOSCALING", "INCIDENT"] TASK_NAMES = ["pod_recovery", "autoscaling", "incident"] TEMPERATURE = 0.7 MAX_TOKENS = 150 SUCCESS_SCORE_THRESHOLD = 0.1 # normalized score in [0, 1] DEFAULT_MAX_STEPS = 15 SUCCESS_SCORE_THRESHOLD_BY_TASK: Dict[str, float] = { "pod_recovery": 0.9, "autoscaling": 0.9, "incident": 0.8, } MAX_STALL_REPEATS = 4 REWARD_EPSILON = 1e-9 MAX_STEPS_BY_TASK = { "pod_recovery": 15, "autoscaling": 20, "incident": 30, } GRADERS: Dict[str, Callable[[Dict[str, Any], int, int], float]] = { "pod_recovery": grade_pod_recovery, "autoscaling": grade_autoscaling, "incident": grade_incident, } SYSTEM_PROMPT = textwrap.dedent( """ You are a Kubernetes incident-response agent. Return ONLY valid JSON for one action with this schema: { "action_type": "scale|delete_pod|patch|rollout_restart|set_hpa|drain_node|describe|wait", "deployment": "... optional ...", "replicas": 1, "pod_name": "...", "resource_type": "deployment|pod|node|service|configmap|hpa", "name": "...", "patch": {}, "min_replicas": 1, "max_replicas": 5, "cpu_target_percent": 70, "node_name": "..." } Do not include markdown, prose, or code fences. """ ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) def _to_dict(obj: Any) -> Dict[str, Any]: if hasattr(obj, "model_dump"): return obj.model_dump() if isinstance(obj, dict): return obj return vars(obj) def _observation_summary(observation: Any) -> str: obs = _to_dict(observation) pods = obs.get("pods", []) deployments = obs.get("deployments", []) events = obs.get("events", []) pod_status_counts: Dict[str, int] = {} for pod in pods: status = pod.get("status", "Unknown") pod_status_counts[status] = pod_status_counts.get(status, 0) + 1 deployment_lines = [] for dep in deployments: deployment_lines.append( f"{dep.get('name')}: desired={dep.get('desired_replicas', 0)} available={dep.get('available_replicas', 0)}" ) recent_events = [ f"{e.get('type', 'Normal')}/{e.get('reason', '')}: {e.get('message', '')}" for e in events[-5:] ] return textwrap.dedent( f""" Objective: {obs.get('objective', '')} Step: {obs.get('step', 0)} Pod status counts: {pod_status_counts} Deployments: {chr(10).join(deployment_lines) if deployment_lines else 'None'} Recent events: {chr(10).join(recent_events) if recent_events else 'None'} """ ).strip() def build_user_prompt(task_name: str, step: int, observation: Any, history: List[str]) -> str: history_block = "\n".join(history[-4:]) if history else "None" return textwrap.dedent( f""" Task: {task_name} Step: {step} Current cluster summary: {_observation_summary(observation)} Previous steps: {history_block} Return one valid next action as pure JSON. """ ).strip() def _safe_json_action(text: str) -> Optional[Dict[str, Any]]: try: return json.loads(text) except json.JSONDecodeError: start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: return None return None def _heuristic_action(task_name: str, observation: Any) -> Dict[str, Any]: obs = _to_dict(observation) pods = obs.get("pods", []) if task_name == "pod_recovery": crashloop = [p for p in pods if p.get("deployment") == "frontend" and p.get("status") == "CrashLoopBackOff"] if crashloop: return {"action_type": "rollout_restart", "deployment": "frontend"} return {"action_type": "describe", "resource_type": "deployment", "name": "frontend"} if task_name == "autoscaling": return { "action_type": "set_hpa", "deployment": "backend", "min_replicas": 2, "max_replicas": 6, "cpu_target_percent": 70, } return {"action_type": "rollout_restart", "deployment": "auth-service"} def _normalize_action(action: Dict[str, Any]) -> Dict[str, Any]: action_type = action.get("action_type", "describe") if isinstance(action_type, str): action_type = { "set_hpas": "set_hpa", "hpa": "set_hpa", "restart_rollout": "rollout_restart", "noop": "wait", "no_op": "wait", "pause": "wait", "sleep": "wait", }.get(action_type.strip().lower(), action_type.strip().lower()) else: action_type = "describe" normalized: Dict[str, Any] = {"action_type": action_type} allowed_fields = { "deployment", "replicas", "pod_name", "resource_type", "name", "patch", "min_replicas", "max_replicas", "cpu_target_percent", "node_name", } for field in allowed_fields: if field in action and action[field] is not None: normalized[field] = action[field] defaults_by_type = { "describe": {"resource_type": "deployment", "name": "frontend"}, "scale": {"deployment": "frontend", "replicas": 3}, "rollout_restart": {"deployment": "frontend"}, "delete_pod": {"pod_name": "frontend-unknown"}, "drain_node": {"node_name": "node-1"}, "patch": {"resource_type": "deployment", "name": "frontend", "patch": {}}, "set_hpa": {"deployment": "backend", "min_replicas": 2, "max_replicas": 6, "cpu_target_percent": 70}, "wait": {}, } for k, v in defaults_by_type.get(action_type, {}).items(): normalized.setdefault(k, v) return normalized def get_model_action(client: OpenAI, task_name: str, step: int, observation: Any, history: List[str]) -> Dict[str, Any]: user_prompt = build_user_prompt(task_name, step, observation, history) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) text = (completion.choices[0].message.content or "").strip() parsed = _safe_json_action(text) if isinstance(parsed, dict): return _normalize_action(parsed) return _heuristic_action(task_name, observation) except Exception as exc: print(f"[DEBUG] Model request failed: {exc}", flush=True) return _heuristic_action(task_name, observation) async def main() -> None: if not API_KEY: raise RuntimeError("Missing HF_TOKEN/API_KEY for OpenAI client.") for TASK_NAME, BENCHMARK in zip(TASK_NAMES, BENCHMARKS): client = OpenAI(base_url=LLM_BASE_URL, api_key=API_KEY) max_steps = MAX_STEPS_BY_TASK.get(TASK_NAME, DEFAULT_MAX_STEPS) grader = GRADERS.get(TASK_NAME, grade_pod_recovery) history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False final_obs: Optional[Any] = None episode_done = False stalled = False last_action_str: Optional[str] = None consecutive_same_action = 0 last_reward: Optional[float] = None log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) try: async with CoEnv.from_env("SandyTheAdventurer/coenv") as env: result = await env.reset(task=TASK_NAME) final_obs = result.observation for step in range(1, max_steps + 1): if API_DELAY > 0: await asyncio.sleep(API_DELAY) if result.done: break action_payload = get_model_action(client, TASK_NAME, step, final_obs, history) action = CoenvAction(**action_payload) result = await env.step(action) obs = result.observation final_obs = obs reward = result.reward or 0.0 done = result.done error = (obs.metadata or {}).get("error") if hasattr(obs, "metadata") else None rewards.append(reward) steps_taken = step episode_done = bool(done) action_str = json.dumps(action_payload, separators=(",", ":")) log_step(step=step, action=action_str, reward=reward, done=done, error=error) history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}") if done: break world_state = _to_dict(final_obs) if final_obs is not None else {} score = grader(world_state, steps_taken, max_steps) score = min(max(score, 0.0), 1.0) success = ( episode_done and not stalled and steps_taken > 0 ) finally: log_end(success=success, steps=steps_taken, score=score, rewards=rewards) if __name__ == "__main__": asyncio.run(main())