Spaces:
Sleeping
Sleeping
| """ | |
| Inference Script – Cloud GPU+CPU Resource Management OpenEnv Environment | |
| ======================================================================== | |
| Mandatory variables (set in environment configuration): | |
| API_BASE_URL The API endpoint for the LLM. | |
| MODEL_NAME The model identifier to use for inference. | |
| HF_TOKEN Your Hugging Face / API key. | |
| STDOUT FORMAT: | |
| [START] task=<task_name> env=<benchmark> model=<model_name> | |
| [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null> | |
| [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn> | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import textwrap | |
| from typing import Any, Dict, List, Optional | |
| from openai import OpenAI | |
| from cloud_resource_env import CloudResourceClient | |
| # --------------------------------------------------------------------------- | |
| # Configuration | |
| # --------------------------------------------------------------------------- | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| BENCHMARK = "cloud_resource_env" | |
| ENV_URL = os.getenv("ENV_URL", "http://localhost:8000") | |
| TASKS = [ | |
| {"name": "gpu_cpu_allocation", "max_steps": 2 }, | |
| {"name": "thermal_management", "max_steps": 2 }, | |
| {"name": "heuristic_fragmentation", "max_steps": 2}, | |
| ] | |
| TEMPERATURE = 0.4 | |
| MAX_TOKENS = 500 | |
| SUCCESS_THRESHOLD = 0.3 | |
| # --------------------------------------------------------------------------- | |
| # Task-specific system prompts | |
| # --------------------------------------------------------------------------- | |
| SYSTEM_PROMPTS = { | |
| "gpu_cpu_allocation": textwrap.dedent("""\ | |
| You are an expert cloud GPU+CPU infrastructure manager. You observe | |
| cluster node metrics (GPU utilisation, CPU utilisation, VRAM, memory, | |
| cost) and decide allocation actions to optimise throughput and cost. | |
| Rules: | |
| - For each node, choose one action: | |
| "allocate_high" — increase GPU+CPU capacity by 50% (costs more) | |
| "allocate_low" — decrease capacity by 33% (saves money) | |
| "maintain" — no change | |
| "migrate" — move 30% of this node's load to other nodes | |
| - Target GPU utilisation: ~70%. | |
| - Target CPU utilisation: ~70%. | |
| - Avoid overloads (utilisation > 100%). | |
| - Stay within budget if one is specified. | |
| Respond with ONLY a valid JSON object mapping node_id to action. Example: | |
| {"node_0": "maintain", "node_1": "allocate_high", "node_2": "migrate"} | |
| No explanation, no markdown, just pure JSON. | |
| """), | |
| "thermal_management": textwrap.dedent("""\ | |
| You are an expert cloud thermal management engineer. You monitor GPU | |
| temperatures and ambient temperature, and decide cooling and load | |
| redistribution actions to prevent thermal throttling. | |
| Rules: | |
| - For each node, choose one action: | |
| "increase_cooling" — increase cooling level (costs energy) | |
| "decrease_cooling" — decrease cooling level (saves energy) | |
| "migrate_load" — move 40% of load to the coolest node | |
| "maintain" — no change | |
| - Safe temperature zone: 55°C – 75°C. | |
| - CRITICAL: If GPU temperature exceeds max threshold → thermal throttle! | |
| - Balance: keep temps safe WITHOUT excessive cooling cost. | |
| - When ambient temperature is high, proactively increase cooling. | |
| Respond with ONLY a valid JSON object mapping node_id to action. Example: | |
| {"node_0": "increase_cooling", "node_1": "maintain", "node_2": "migrate_load", "node_3": "maintain"} | |
| No explanation, no markdown, just pure JSON. | |
| """), | |
| "heuristic_fragmentation": textwrap.dedent("""\ | |
| You are an expert GPU cluster scheduler. You manage a fragmented GPU | |
| cluster where nodes have 8 GPU slots each and workloads need contiguous | |
| blocks. Choose allocation and defragmentation strategies. | |
| Rules: | |
| - For each node, choose one strategy: | |
| "best_fit" — place workload in node with smallest sufficient free block | |
| "first_fit" — place workload in first node with free space | |
| "compact" — defragment first (move allocated to front), then best-fit | |
| "split_workload" — if no contiguous block, split across nodes | |
| - All nodes use the majority-vote strategy for this step. | |
| - The pending workloads have varying GPU requirements (1, 2, 4, or 8 slots). | |
| - Goal: place all pending workloads, minimise fragmentation. | |
| - Compaction has a 10% overhead penalty. | |
| Respond with ONLY a valid JSON object mapping node_id to strategy. Example: | |
| {"node_0": "best_fit", "node_1": "best_fit", "node_2": "compact", "node_3": "first_fit", "node_4": "best_fit"} | |
| No explanation, no markdown, just pure JSON. | |
| """), | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Logging helpers | |
| # --------------------------------------------------------------------------- | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| action_clean = action.replace("\n", " ").strip() | |
| print( | |
| f"[STEP] step={step} action={action_clean} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # LLM interaction | |
| # --------------------------------------------------------------------------- | |
| def build_user_prompt( | |
| cluster_state: Dict[str, Any], | |
| task_info: Dict[str, Any], | |
| step_num: int, | |
| history: List[str], | |
| ) -> str: | |
| state_json = json.dumps(cluster_state, indent=2) | |
| hist_block = "\n".join(history[-4:]) if history else "None" | |
| # Build task-specific context | |
| extra_context = "" | |
| task_name = task_info.get("task_name", "") | |
| if task_name == "thermal_management": | |
| nodes = cluster_state.get("nodes", []) | |
| hot_nodes = [n for n in nodes if n.get("gpu_temp_celsius", 0) > 75] | |
| if hot_nodes: | |
| hot_list = ", ".join( | |
| f"{n['node_id']}={n['gpu_temp_celsius']:.1f}°C" for n in hot_nodes | |
| ) | |
| extra_context += f"\n⚠️ HOT NODES: {hot_list}\n" | |
| ambient = cluster_state.get("ambient_temp_celsius", 25) | |
| extra_context += f"Ambient temperature: {ambient:.1f}°C\n" | |
| elif task_name == "heuristic_fragmentation": | |
| pending = cluster_state.get("pending_workloads", []) | |
| frag = cluster_state.get("cluster_fragmentation", 0) | |
| extra_context += f"\nPending workloads (GPU slots needed): {pending}\n" | |
| extra_context += f"Cluster fragmentation: {frag:.3f}\n" | |
| return textwrap.dedent(f"""\ | |
| Task: {task_info.get('task_name', 'unknown')} ({task_info.get('difficulty', '?')}) | |
| Objective: {task_info.get('description', '')} | |
| Valid actions: {task_info.get('valid_actions', [])} | |
| Target GPU utilisation: {task_info.get('target_gpu_utilization_pct', 70)}% | |
| Target CPU utilisation: {task_info.get('target_cpu_utilization_pct', 70)}% | |
| Budget per step: {task_info.get('budget_per_step', 'N/A')} | |
| Step {step_num} of {task_info.get('max_steps', '?')} | |
| {extra_context} | |
| Current cluster state: | |
| {state_json} | |
| Recent history: | |
| {hist_block} | |
| Decide actions for each node. Respond with JSON only. | |
| """) | |
| def get_llm_decision( | |
| client: OpenAI, | |
| cluster_state: Dict[str, Any], | |
| task_info: Dict[str, Any], | |
| step_num: int, | |
| history: List[str], | |
| node_ids: List[str], | |
| ) -> str: | |
| task_name = task_info.get("task_name", "gpu_cpu_allocation") | |
| system_prompt = SYSTEM_PROMPTS.get(task_name, SYSTEM_PROMPTS["gpu_cpu_allocation"]) | |
| user_prompt = build_user_prompt(cluster_state, task_info, step_num, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Strip markdown fences if present | |
| if text.startswith("```"): | |
| text = text.split("\n", 1)[-1].rsplit("```", 1)[0].strip() | |
| # Validate JSON | |
| parsed = json.loads(text) | |
| # Ensure all node_ids are present | |
| valid_actions = task_info.get("valid_actions", ["maintain"]) | |
| default_action = valid_actions[0] if valid_actions else "maintain" | |
| for nid in node_ids: | |
| if nid not in parsed: | |
| parsed[nid] = default_action | |
| return json.dumps(parsed) | |
| except Exception as exc: | |
| print(f"[DEBUG] LLM error: {exc}", flush=True) | |
| valid_actions = task_info.get("valid_actions", ["maintain"]) | |
| default_action = valid_actions[0] if valid_actions else "maintain" | |
| fallback = {nid: default_action for nid in node_ids} | |
| return json.dumps(fallback) | |
| # --------------------------------------------------------------------------- | |
| # Main loop | |
| # --------------------------------------------------------------------------- | |
| async def run_task( | |
| llm_client: OpenAI, | |
| env: Any, | |
| task_name: str, | |
| max_steps: int, | |
| ) -> None: | |
| """Run a single task episode and emit structured logs.""" | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| # Reset environment with task selection | |
| await env.reset(task=task_name) | |
| # Get initial state and task info | |
| cluster_state = await env.call_tool("get_cluster_state") | |
| task_info = await env.call_tool("get_task_info") | |
| # Parse cluster_state if it's a string | |
| if isinstance(cluster_state, str): | |
| cluster_state = json.loads(cluster_state) | |
| if isinstance(task_info, str): | |
| task_info = json.loads(task_info) | |
| node_ids = [n["node_id"] for n in cluster_state.get("nodes", [])] | |
| history: List[str] = [] | |
| for step in range(1, max_steps + 1): | |
| # Get LLM decision | |
| decisions = get_llm_decision( | |
| llm_client, cluster_state, task_info, step, history, node_ids | |
| ) | |
| # Take action | |
| result = await env.call_tool("take_action", decisions=decisions) | |
| if isinstance(result, str): | |
| result = json.loads(result) | |
| reward = float(result.get("reward", 0.0)) | |
| done = bool(result.get("done", False)) | |
| error_msg = result.get("error") | |
| rewards.append(reward) | |
| steps_taken = step | |
| score = float(result.get("score", 0.0)) | |
| log_step(step=step, action=decisions, reward=reward, done=done, error=error_msg) | |
| history.append( | |
| f"Step {step}: {decisions} -> reward {reward:+.2f}" | |
| ) | |
| if done: | |
| break | |
| # Update cluster state for next iteration | |
| cluster_state = result.get("cluster_state", cluster_state) | |
| if isinstance(cluster_state, str): | |
| cluster_state = json.loads(cluster_state) | |
| score = max(0.0, min(1.0, score)) | |
| success = score >= SUCCESS_THRESHOLD | |
| except Exception as exc: | |
| print(f"[DEBUG] Task {task_name} error: {exc}", flush=True) | |
| finally: | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| async def main() -> None: | |
| llm_client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| # Connect to environment (Docker or URL) | |
| if LOCAL_IMAGE_NAME: | |
| env = await CloudResourceClient.from_docker_image(LOCAL_IMAGE_NAME) | |
| else: | |
| env = CloudResourceClient(base_url=ENV_URL) | |
| try: | |
| async with env: | |
| for task_cfg in TASKS: | |
| await run_task( | |
| llm_client=llm_client, | |
| env=env, | |
| task_name=task_cfg["name"], | |
| max_steps=task_cfg["max_steps"], | |
| ) | |
| except Exception as e: | |
| print(f"[DEBUG] env error: {e}", flush=True) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |