#!/usr/bin/env python3 """CloudSense inference script — runs an LLM agent through all tasks. Reads environment variables: API_BASE_URL: LLM API endpoint (default: https://router.huggingface.co/v1) MODEL_NAME: Model to use (default: Qwen/Qwen2.5-72B-Instruct) HF_TOKEN: Authentication token (no default) LOCAL_IMAGE_NAME: Optional — used when launching env via from_docker_image() """ import json import os import sys import time import traceback import requests from openai import OpenAI # ─── Environment Variables ─────────────────────────────────────────── API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") # Optional — if you use from_docker_image(): LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") ENV_URL = os.getenv("ENV_URL", "http://localhost:7860") if not HF_TOKEN: sys.exit("ERROR: HF_TOKEN environment variable is required") # ─── LLM Client ───────────────────────────────────────────────────── client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) SYSTEM_PROMPT = """You are a FinOps AI agent optimizing cloud infrastructure costs. You will be shown AWS resources with their configurations, utilization metrics, costs, and dependencies. For each resource, choose the best action: Actions: - rightsize_resource: Change to a smaller/cheaper instance type. Provide new_config with instance_type (and optionally storage_gb for RDS, node_count for K8s/ES). - terminate_resource: Remove unused or unnecessary resources. - add_lifecycle_policy: For S3 buckets with rarely-accessed data — transitions old data to cheaper storage tiers. - enable_autoscaling: Enable auto-scaling for EC2/K8s resources. - purchase_reservation: Buy reserved instances for steady-state workloads (or recommend spot for fault-tolerant). - change_storage_class: Change S3 storage class (e.g., GLACIER_DEEP_ARCHIVE, INFREQUENT_ACCESS). - schedule_uptime: Schedule non-prod resources to run only during business hours or weekdays. - request_more_info: When you need more information before deciding. - skip_resource: Leave the resource unchanged (use for critical prod resources). CRITICAL RULES: 1. NEVER terminate or modify critical production resources. Use skip_resource for these. 2. Check dependencies before terminating — don't break other resources. 3. Pay attention to blast radius warnings — they show cascading impacts of your actions. Reference affected resources in your reasoning to demonstrate infrastructure awareness. 4. For dev/staging resources with very low utilization, rightsize aggressively. 5. For S3 buckets with rare access patterns, add lifecycle policies. 6. For unused load balancers (0 targets), terminate them. Respond with ONLY a JSON object (no markdown, no explanation outside JSON): { "action_type": "one_of_the_actions_above", "resource_id": "the_resource_id", "new_config": {"instance_type": "t3.small"}, "reasoning": "Brief explanation of why this action" } """ TASKS = ["startup-cleanup", "mid-size-audit", "enterprise-finops"] def format_resources(resources: list[dict]) -> str: """Format resources into a readable text block for the LLM.""" lines = [] for r in resources: config = r.get("current_config", {}) util = r.get("utilization", {}) deps = r.get("dependencies", []) lines.append(f"─── {r['resource_id']} ───") lines.append(f" Name: {r['name']}") lines.append(f" Type: {r['resource_type']} | Env: {r['environment']} | Region: {r.get('region', 'us-east-1')}") lines.append(f" Config: {json.dumps(config)}") lines.append(f" Utilization: {json.dumps(util)}") lines.append(f" Monthly Cost: ${r['monthly_cost']:.2f}") lines.append(f" Tags: {json.dumps(r.get('tags', {}))}") lines.append(f" Critical: {r.get('is_critical', False)} | Backups: {r.get('has_backups', False)}") lines.append(f" Dependencies: {deps if deps else 'none'}") lines.append(f" Usage Pattern: {r.get('usage_pattern', 'unknown')}") if r.get("subnet"): lines.append(f" Subnet: {r['subnet']}") lines.append("") return "\n".join(lines) def parse_action(response_text: str) -> dict: """Parse LLM JSON response into an action dict.""" text = response_text.strip() # Strip markdown code fences if text.startswith("```"): lines = text.split("\n") # Remove first line (```json or ```) and last line (```) lines = [l for l in lines if not l.strip().startswith("```")] text = "\n".join(lines).strip() try: action = json.loads(text) except json.JSONDecodeError: # Try to extract JSON from the response start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: try: action = json.loads(text[start:end]) except json.JSONDecodeError: return { "action_type": "skip_resource", "resource_id": "unknown", "new_config": None, "reasoning": f"Failed to parse LLM response: {text[:200]}", } else: return { "action_type": "skip_resource", "resource_id": "unknown", "new_config": None, "reasoning": f"No JSON found in LLM response: {text[:200]}", } # Validate action_type valid_types = { "rightsize_resource", "terminate_resource", "add_lifecycle_policy", "enable_autoscaling", "purchase_reservation", "change_storage_class", "schedule_uptime", "request_more_info", "skip_resource", } if action.get("action_type") not in valid_types: action["action_type"] = "skip_resource" action["reasoning"] = f"Invalid action_type corrected to skip: {action.get('reasoning', '')}" return { "action_type": action.get("action_type", "skip_resource"), "resource_id": action.get("resource_id", "unknown"), "new_config": action.get("new_config"), "reasoning": action.get("reasoning", ""), } def call_llm(messages: list[dict], max_retries: int = 3) -> str: """Call the LLM API with exponential backoff (1s, 2s, 4s).""" for attempt in range(max_retries): try: response = client.chat.completions.create( model=MODEL_NAME, messages=messages, max_tokens=512, temperature=0.1, ) return response.choices[0].message.content or "" except Exception as e: if attempt < max_retries - 1: wait = 2 ** attempt # 1s, 2s, 4s time.sleep(wait) else: return json.dumps({ "action_type": "skip_resource", "resource_id": "unknown", "reasoning": f"LLM API failed after {max_retries} retries: {str(e)[:200]}", }) def run_task(task_id: str): """Run a single task and print [START], [STEP], [END] to stdout.""" step = 0 rewards = [] success = False score = 0.0 print(f"[START] task={task_id} env=cloudsense model={MODEL_NAME}") try: # Reset environment resp = requests.post(f"{ENV_URL}/reset", params={"task_id": task_id}, timeout=30) resp.raise_for_status() obs = resp.json() resources = obs["resources"] actioned_ids = set() last_blast_radius = None while True: # Pick next resource to act on remaining = [r for r in resources if r["resource_id"] not in actioned_ids] if not remaining: break target = remaining[0] # Build user prompt user_msg = f"Task: {obs['goal']}\n" user_msg += f"Step {step + 1}/{obs['max_steps']} | " user_msg += f"Current cost: ${obs['monthly_cost_current']:.2f}/mo | " user_msg += f"Possible savings: ${obs['total_possible_savings']:.2f}\n\n" # Add blast radius warning if applicable if last_blast_radius and last_blast_radius.get("risk_level", "none") != "none": user_msg += f"\n⚠️ BLAST RADIUS from last action:\n" user_msg += f"Risk: {last_blast_radius['risk_level']}\n" user_msg += f"Affected resources: {last_blast_radius.get('affected_resources', [])}\n" user_msg += f"Impact: {last_blast_radius.get('explanation', '')}\n" user_msg += "Consider this when choosing your next action.\n\n" user_msg += f"Analyze this resource and choose an action:\n\n" user_msg += format_resources([target]) if len(remaining) > 1: user_msg += f"\n({len(remaining) - 1} more resources remaining after this one)" messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_msg}, ] # Call LLM llm_response = call_llm(messages) action = parse_action(llm_response) # Override resource_id if LLM returned wrong one if action["resource_id"] != target["resource_id"]: action["resource_id"] = target["resource_id"] # Execute action with retry result = None for attempt in range(3): try: resp = requests.post( f"{ENV_URL}/step", json=action, timeout=30, ) resp.raise_for_status() result = resp.json() break except Exception as e: if attempt < 2: wait = 2 ** attempt # 1s, 2s print(f"[STEP] retry {attempt + 1}/3 after error: {str(e)[:80]}") time.sleep(wait) else: step += 1 rewards.append(0.0) error_str = str(e)[:100] print(f"[STEP] step={step} action=error({error_str}) reward=0.00 done=false error={error_str}") if result is None: continue step += 1 reward = result.get("reward", 0.0) done = result.get("done", False) rewards.append(reward) # Update state obs = result.get("observation", obs) resources = obs.get("resources", resources) last_blast_radius = result.get("info", {}).get("blast_radius") error = obs.get("last_action_error") actioned_ids.add(action["resource_id"]) done_str = "true" if done else "false" error_str = error if error else "null" print(f"[STEP] step={step} action={action['action_type']}({action['resource_id']}) reward={reward:.2f} done={done_str} error={error_str}") if done: score = result.get("info", {}).get("task_score", 0.0) success = True break if step >= obs.get("max_steps", 999): break except Exception as e: step += 1 rewards.append(0.0) error_str = str(e)[:100] print(f"[STEP] step={step} action=error({error_str}) reward=0.00 done=true error={error_str}") finally: # Always close and emit [END] try: requests.post(f"{ENV_URL}/close", timeout=10) except Exception: pass if score == 0.0 and rewards: score = round(sum(rewards) / max(len(rewards), 1), 2) score = max(0.0, min(1.0, score)) success_str = "true" if success else "false" rewards_str = ",".join(f"{r:.2f}" for r in rewards) if rewards else "0.00" print(f"[END] success={success_str} steps={step} score={score:.2f} rewards={rewards_str}") def main(): for task_id in TASKS: run_task(task_id) if __name__ == "__main__": main()