""" Energy & Memory RAM Optimization Inference Script ================================================= This script demonstrates how an AI agent can learn to optimize energy consumption and RAM usage through reinforcement learning in the Energy Optimization Environment. The agent uses an LLM to make strategic decisions about resource optimization actions. Required Environment Variables: - API_BASE_URL: The API endpoint for the LLM (for Hugging Face router, use https://router.huggingface.co/v1) - MODEL_NAME: The model identifier to use for inference - HF_TOKEN: Your Hugging Face API key with inference permissions - LOCAL_IMAGE_NAME: The name of the local image to use for the environment (optional) Example setup: export API_BASE_URL="https://router.huggingface.co/v1" export MODEL_NAME="OpenAssistant/oasst-sft-1-pythia-12b" export HF_TOKEN="hf_..." export LOCAL_IMAGE_NAME="your-docker-image" # Optional """ import asyncio import os import subprocess import textwrap from typing import List, Optional from openai import OpenAI, OpenAIError from he_demo.client import EnergyOptimizationEnv from he_demo.models import EnergyOptimizationAction # Environment configuration variables # Default endpoint uses Hugging Face's router; set API_BASE_URL explicitly if needed. API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") HF_TOKEN = os.getenv("HF_TOKEN") LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") LOCAL_SERVER_URL = os.getenv("LOCAL_SERVER_URL", "http://localhost:8000") # Use HF_TOKEN as API key for OpenAI client API_KEY = HF_TOKEN TASK_NAME = os.getenv("ENERGY_TASK", "energy_optimization") BENCHMARK = os.getenv("ENERGY_BENCHMARK", "energy_optimization") MAX_STEPS = 50 # More steps for complex optimization tasks TEMPERATURE = 0.3 # Lower temperature for more consistent optimization decisions MAX_TOKENS = 100 SUCCESS_SCORE_THRESHOLD = 0.5 # Higher threshold for meaningful optimization # Max possible reward: task completion bonuses + efficiency improvements MAX_TOTAL_REWARD = 100.0 # Estimated maximum possible reward SYSTEM_PROMPT = textwrap.dedent( """ You are an AI system optimization agent. Your goal is to optimize computer system resources: - Reduce RAM usage (target: below 40%) - Minimize energy consumption (target: below 3 kWh) - Complete optimization tasks efficiently Available actions: - reduce_ram: Focus on RAM optimization (intensity 0.0-1.0) - optimize_energy: Focus on energy reduction (intensity 0.0-1.0) - balance_resources: Balanced approach to both resources - monitor_system: Gather system information Action format: action_type,intensity Example: reduce_ram,0.8 Consider current system state, task requirements, and potential trade-offs. Reply with exactly one action in the format: action_type,intensity """ ).strip() def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step( step: int, action: str, reward: float, done: bool, error: Optional[str] ) -> None: error_val = error if error else "null" done_val = str(done).lower() print( f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True, ) def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print( f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True, ) def build_user_prompt( step: int, observation, last_reward: float, history: List[str] ) -> str: current_task_info = "" if observation.current_task: task = observation.current_task current_task_info = f""" Current Task: {task.name} Description: {task.description} Targets: RAM < {task.ram_target}%, Energy < {task.energy_target} kWh Max Steps: {task.max_steps} """ history_block = "\n".join(history[-3:]) if history else "None" return textwrap.dedent( f""" Step: {step} System State: - RAM Usage: {observation.ram_usage:.1f}% - Energy Consumption: {observation.energy_consumption:.1f} kWh - System Load: {observation.system_load:.2f} - Efficiency Score: {observation.efficiency_score:.2f} - Task Progress: {observation.task_progress:.2f} - Steps Taken: {observation.steps_taken} {current_task_info} Tasks Completed: {', '.join(observation.tasks_completed) if observation.tasks_completed else 'None'} Last Reward: {last_reward:.2f} Recent Actions: {history_block} Choose your next optimization action (action_type,intensity): """ ).strip() def parse_action(action_str: str) -> EnergyOptimizationAction: """Parse action string into EnergyOptimizationAction.""" try: parts = action_str.strip().split(',') if len(parts) != 2: raise ValueError("Invalid action format") action_type = parts[0].strip() intensity = float(parts[1].strip()) # Validate action type valid_actions = ["reduce_ram", "optimize_energy", "balance_resources", "monitor_system"] if action_type not in valid_actions: action_type = "monitor_system" # Default fallback # Clamp intensity to valid range intensity = max(0.0, min(1.0, intensity)) return EnergyOptimizationAction(action_type=action_type, intensity=intensity) except Exception: # Return safe default action return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) def get_model_action( client: OpenAI, step: int, observation, last_reward: float, history: List[str] ) -> EnergyOptimizationAction: """Get optimization action from the language model.""" user_prompt = build_user_prompt(step, observation, last_reward, history) try: completion = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], temperature=TEMPERATURE, max_tokens=MAX_TOKENS, stream=False, ) action_text = (completion.choices[0].message.content or "").strip() return parse_action(action_text) except OpenAIError as exc: error_text = str(exc) print(f"[DEBUG] Model request failed: {error_text}", flush=True) status_code = getattr(exc, 'status_code', None) if status_code == 403 or "403" in error_text or "insufficient permissions" in error_text.lower(): raise RuntimeError( "Hugging Face authentication failed: your token does not have sufficient inference permissions. " "Use a token with inference access or switch to an active model/endpoint you are authorized for. " "If you are using the Hugging Face router, ensure HF_TOKEN has the `inference` scope and that MODEL_NAME is accessible." ) from exc return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) except Exception as exc: print(f"[DEBUG] Unexpected model request failure: {exc}", flush=True) return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) async def main() -> None: # Validate required environment variables if not API_BASE_URL or API_BASE_URL == "": raise ValueError("API_BASE_URL environment variable must be set to your active LLM endpoint") if not MODEL_NAME or MODEL_NAME == "": raise ValueError("MODEL_NAME environment variable must be set to your active model identifier") if not HF_TOKEN: raise ValueError("HF_TOKEN environment variable must be set to your Hugging Face API key") client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) async def local_image_exists(image_name: str) -> bool: try: result = subprocess.run( ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"], capture_output=True, text=True, check=True, ) return image_name in result.stdout.splitlines() except Exception: return False if LOCAL_IMAGE_NAME: if await local_image_exists(LOCAL_IMAGE_NAME): env = await EnergyOptimizationEnv.from_docker_image(LOCAL_IMAGE_NAME) else: print( f"[WARN] Docker image '{LOCAL_IMAGE_NAME}' not found locally. Falling back to local server at {LOCAL_SERVER_URL}", flush=True, ) env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL) else: env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL) history: List[str] = [] rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) try: result = await env.reset() last_reward = 0.0 for step in range(1, MAX_STEPS + 1): if result.done: break # Get action from model action = get_model_action(client, step, result.observation, last_reward, history) # Execute action result = await env.step(action) obs = result.observation reward = result.reward or 0.0 done = result.done error = None # Format action for logging action_str = f"{action.action_type},{action.intensity:.1f}" rewards.append(reward) steps_taken = step last_reward = reward log_step(step=step, action=action_str, reward=reward, done=done, error=error) # Update history history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}") if done: break # Calculate final score based on tasks completed and efficiency total_reward = sum(rewards) tasks_completed = len(result.observation.tasks_completed) if result.observation.tasks_completed else 0 efficiency_score = result.observation.efficiency_score # Score combines task completion and efficiency score = (tasks_completed / 5.0) * 0.6 + (efficiency_score / 1.0) * 0.4 score = min(max(score, 0.0), 1.0) # clamp to [0, 1] success = score >= SUCCESS_SCORE_THRESHOLD finally: try: await env.close() except Exception as e: print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True) log_end(success=success, steps=steps_taken, score=score, rewards=rewards) if __name__ == "__main__": asyncio.run(main())