Spaces:
Sleeping
Sleeping
| """ | |
| Inference Script Example | |
| =================================== | |
| MANDATORY | |
| - Before submitting, ensure the following variables are defined in your environment configuration: | |
| API_BASE_URL The API endpoint for the LLM. | |
| MODEL_NAME The model identifier to use for inference. | |
| HF_TOKEN Your Hugging Face / API key. | |
| LOCAL_IMAGE_NAME The name of the local image to use for the environment if you are using from_docker_image() | |
| method | |
| - Defaults are set only for API_BASE_URL and MODEL_NAME | |
| (and should reflect your active inference setup): | |
| API_BASE_URL = os.getenv("API_BASE_URL", "<your-active-endpoint>") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "<your-active-model>") | |
| - The inference script must be named `inference.py` and placed in the root directory of the project | |
| - Participants must use OpenAI Client for all LLM calls using above variables | |
| STDOUT FORMAT | |
| - The script must emit exactly three line types to stdout, in this order: | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> | |
| Rules: | |
| - One [START] line at episode begin. | |
| - One [STEP] line per step, immediately after env.step() returns. | |
| - One [END] line after env.close(), always emitted (even on exception). | |
| - reward and rewards are formatted to 2 decimal places. | |
| - done and success are lowercase booleans: true or false. | |
| - error is the raw last_action_error string, or null if none. | |
| - All fields on a single line with no newlines within a line. | |
| - Each tasks should return score in [0, 1] | |
| Example: | |
| [START] task=click-test env=miniwob model=Qwen3-VL-30B | |
| [STEP] step=1 action=click('123') reward=0.00 done=false error=null | |
| [STEP] step=2 action=fill('456','text') reward=0.00 done=false error=null | |
| [STEP] step=3 action=click('789') reward=1.00 done=true error=null | |
| [END] success=true steps=3 score=1.00 rewards=0.00,0.00,1.00 | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import textwrap | |
| from typing import Any, Callable, Dict, List, Optional | |
| from openai import OpenAI | |
| try: | |
| from dotenv import load_dotenv | |
| except ImportError: | |
| load_dotenv = None | |
| try: | |
| from models import CoenvAction | |
| from client import CoEnv | |
| except ImportError: | |
| from models import CoenvAction | |
| from client import CoEnv | |
| from server.graders.grader_pod_recovery import grade as grade_pod_recovery | |
| from server.graders.grader_autoscaling import grade as grade_autoscaling | |
| from server.graders.grader_incident import grade as grade_incident | |
| if load_dotenv is not None: | |
| load_dotenv() | |
| LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://router.huggingface.co/v1") | |
| ENV_URL = os.getenv("API_BASE_URL", "http://localhost:8000") | |
| API_DELAY = float(os.getenv("API_DELAY", "0")) | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen3-8B") | |
| API_KEY = os.getenv("OPENROUTER_API_KEY") or os.getenv("HF_TOKEN") | |
| BENCHMARKS = ["POD_RECOVERY", "AUTOSCALING", "INCIDENT"] | |
| TASK_NAMES = ["pod_recovery", "autoscaling", "incident"] | |
| TEMPERATURE = 0.7 | |
| MAX_TOKENS = 150 | |
| SUCCESS_SCORE_THRESHOLD = 0.1 # normalized score in [0, 1] | |
| DEFAULT_MAX_STEPS = 15 | |
| SUCCESS_SCORE_THRESHOLD_BY_TASK: Dict[str, float] = { | |
| "pod_recovery": 0.9, | |
| "autoscaling": 0.9, | |
| "incident": 0.8, | |
| } | |
| MAX_STALL_REPEATS = 4 | |
| REWARD_EPSILON = 1e-9 | |
| MAX_STEPS_BY_TASK = { | |
| "pod_recovery": 15, | |
| "autoscaling": 20, | |
| "incident": 30, | |
| } | |
| GRADERS: Dict[str, Callable[[Dict[str, Any], int, int], float]] = { | |
| "pod_recovery": grade_pod_recovery, | |
| "autoscaling": grade_autoscaling, | |
| "incident": grade_incident, | |
| } | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are a Kubernetes incident-response agent. | |
| Return ONLY valid JSON for one action with this schema: | |
| { | |
| "action_type": "scale|delete_pod|patch|rollout_restart|set_hpa|drain_node|describe|wait", | |
| "deployment": "... optional ...", | |
| "replicas": 1, | |
| "pod_name": "...", | |
| "resource_type": "deployment|pod|node|service|configmap|hpa", | |
| "name": "...", | |
| "patch": {}, | |
| "min_replicas": 1, | |
| "max_replicas": 5, | |
| "cpu_target_percent": 70, | |
| "node_name": "..." | |
| } | |
| Do not include markdown, prose, or code fences. | |
| """ | |
| ).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print(f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) | |
| def _to_dict(obj: Any) -> Dict[str, Any]: | |
| if hasattr(obj, "model_dump"): | |
| return obj.model_dump() | |
| if isinstance(obj, dict): | |
| return obj | |
| return vars(obj) | |
| def _observation_summary(observation: Any) -> str: | |
| obs = _to_dict(observation) | |
| pods = obs.get("pods", []) | |
| deployments = obs.get("deployments", []) | |
| events = obs.get("events", []) | |
| pod_status_counts: Dict[str, int] = {} | |
| for pod in pods: | |
| status = pod.get("status", "Unknown") | |
| pod_status_counts[status] = pod_status_counts.get(status, 0) + 1 | |
| deployment_lines = [] | |
| for dep in deployments: | |
| deployment_lines.append( | |
| f"{dep.get('name')}: desired={dep.get('desired_replicas', 0)} available={dep.get('available_replicas', 0)}" | |
| ) | |
| recent_events = [ | |
| f"{e.get('type', 'Normal')}/{e.get('reason', '')}: {e.get('message', '')}" | |
| for e in events[-5:] | |
| ] | |
| return textwrap.dedent( | |
| f""" | |
| Objective: {obs.get('objective', '')} | |
| Step: {obs.get('step', 0)} | |
| Pod status counts: {pod_status_counts} | |
| Deployments: | |
| {chr(10).join(deployment_lines) if deployment_lines else 'None'} | |
| Recent events: | |
| {chr(10).join(recent_events) if recent_events else 'None'} | |
| """ | |
| ).strip() | |
| def build_user_prompt(task_name: str, step: int, observation: Any, history: List[str]) -> str: | |
| history_block = "\n".join(history[-4:]) if history else "None" | |
| return textwrap.dedent( | |
| f""" | |
| Task: {task_name} | |
| Step: {step} | |
| Current cluster summary: | |
| {_observation_summary(observation)} | |
| Previous steps: | |
| {history_block} | |
| Return one valid next action as pure JSON. | |
| """ | |
| ).strip() | |
| def _safe_json_action(text: str) -> Optional[Dict[str, Any]]: | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| start = text.find("{") | |
| end = text.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| try: | |
| return json.loads(text[start : end + 1]) | |
| except json.JSONDecodeError: | |
| return None | |
| return None | |
| def _heuristic_action(task_name: str, observation: Any) -> Dict[str, Any]: | |
| obs = _to_dict(observation) | |
| pods = obs.get("pods", []) | |
| if task_name == "pod_recovery": | |
| crashloop = [p for p in pods if p.get("deployment") == "frontend" and p.get("status") == "CrashLoopBackOff"] | |
| if crashloop: | |
| return {"action_type": "rollout_restart", "deployment": "frontend"} | |
| return {"action_type": "describe", "resource_type": "deployment", "name": "frontend"} | |
| if task_name == "autoscaling": | |
| return { | |
| "action_type": "set_hpa", | |
| "deployment": "backend", | |
| "min_replicas": 2, | |
| "max_replicas": 6, | |
| "cpu_target_percent": 70, | |
| } | |
| return {"action_type": "rollout_restart", "deployment": "auth-service"} | |
| def _normalize_action(action: Dict[str, Any]) -> Dict[str, Any]: | |
| action_type = action.get("action_type", "describe") | |
| if isinstance(action_type, str): | |
| action_type = { | |
| "set_hpas": "set_hpa", | |
| "hpa": "set_hpa", | |
| "restart_rollout": "rollout_restart", | |
| "noop": "wait", | |
| "no_op": "wait", | |
| "pause": "wait", | |
| "sleep": "wait", | |
| }.get(action_type.strip().lower(), action_type.strip().lower()) | |
| else: | |
| action_type = "describe" | |
| normalized: Dict[str, Any] = {"action_type": action_type} | |
| allowed_fields = { | |
| "deployment", | |
| "replicas", | |
| "pod_name", | |
| "resource_type", | |
| "name", | |
| "patch", | |
| "min_replicas", | |
| "max_replicas", | |
| "cpu_target_percent", | |
| "node_name", | |
| } | |
| for field in allowed_fields: | |
| if field in action and action[field] is not None: | |
| normalized[field] = action[field] | |
| defaults_by_type = { | |
| "describe": {"resource_type": "deployment", "name": "frontend"}, | |
| "scale": {"deployment": "frontend", "replicas": 3}, | |
| "rollout_restart": {"deployment": "frontend"}, | |
| "delete_pod": {"pod_name": "frontend-unknown"}, | |
| "drain_node": {"node_name": "node-1"}, | |
| "patch": {"resource_type": "deployment", "name": "frontend", "patch": {}}, | |
| "set_hpa": {"deployment": "backend", "min_replicas": 2, "max_replicas": 6, "cpu_target_percent": 70}, | |
| "wait": {}, | |
| } | |
| for k, v in defaults_by_type.get(action_type, {}).items(): | |
| normalized.setdefault(k, v) | |
| return normalized | |
| def get_model_action(client: OpenAI, task_name: str, step: int, observation: Any, history: List[str]) -> Dict[str, Any]: | |
| user_prompt = build_user_prompt(task_name, step, observation, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| parsed = _safe_json_action(text) | |
| if isinstance(parsed, dict): | |
| return _normalize_action(parsed) | |
| return _heuristic_action(task_name, observation) | |
| except Exception as exc: | |
| print(f"[DEBUG] Model request failed: {exc}", flush=True) | |
| return _heuristic_action(task_name, observation) | |
| async def main() -> None: | |
| if not API_KEY: | |
| raise RuntimeError("Missing HF_TOKEN/API_KEY for OpenAI client.") | |
| for TASK_NAME, BENCHMARK in zip(TASK_NAMES, BENCHMARKS): | |
| client = OpenAI(base_url=LLM_BASE_URL, api_key=API_KEY) | |
| max_steps = MAX_STEPS_BY_TASK.get(TASK_NAME, DEFAULT_MAX_STEPS) | |
| grader = GRADERS.get(TASK_NAME, grade_pod_recovery) | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| final_obs: Optional[Any] = None | |
| episode_done = False | |
| stalled = False | |
| last_action_str: Optional[str] = None | |
| consecutive_same_action = 0 | |
| last_reward: Optional[float] = None | |
| log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| async with CoEnv.from_env("SandyTheAdventurer/coenv") as env: | |
| result = await env.reset(task=TASK_NAME) | |
| final_obs = result.observation | |
| for step in range(1, max_steps + 1): | |
| if API_DELAY > 0: | |
| await asyncio.sleep(API_DELAY) | |
| if result.done: | |
| break | |
| action_payload = get_model_action(client, TASK_NAME, step, final_obs, history) | |
| action = CoenvAction(**action_payload) | |
| result = await env.step(action) | |
| obs = result.observation | |
| final_obs = obs | |
| reward = result.reward or 0.0 | |
| done = result.done | |
| error = (obs.metadata or {}).get("error") if hasattr(obs, "metadata") else None | |
| rewards.append(reward) | |
| steps_taken = step | |
| episode_done = bool(done) | |
| action_str = json.dumps(action_payload, separators=(",", ":")) | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=error) | |
| history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}") | |
| if done: | |
| break | |
| world_state = _to_dict(final_obs) if final_obs is not None else {} | |
| score = grader(world_state, steps_taken, max_steps) | |
| score = min(max(score, 0.0), 1.0) | |
| success = ( | |
| episode_done | |
| and not stalled | |
| and steps_taken > 0 | |
| ) | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |