Spaces:
Sleeping
Sleeping
| """ | |
| Energy & Memory RAM Optimization Inference Script | |
| ================================================= | |
| This script demonstrates how an AI agent can learn to optimize energy consumption | |
| and RAM usage through reinforcement learning in the Energy Optimization Environment. | |
| The agent uses an LLM to make strategic decisions about resource optimization actions. | |
| Required Environment Variables: | |
| - API_BASE_URL: The API endpoint for the LLM (for Hugging Face router, use https://router.huggingface.co/v1) | |
| - MODEL_NAME: The model identifier to use for inference | |
| - HF_TOKEN: Your Hugging Face API key with inference permissions | |
| - LOCAL_IMAGE_NAME: The name of the local image to use for the environment (optional) | |
| Example setup: | |
| export API_BASE_URL="https://router.huggingface.co/v1" | |
| export MODEL_NAME="OpenAssistant/oasst-sft-1-pythia-12b" | |
| export HF_TOKEN="hf_..." | |
| export LOCAL_IMAGE_NAME="your-docker-image" # Optional | |
| """ | |
| import asyncio | |
| import os | |
| import subprocess | |
| import textwrap | |
| from typing import List, Optional | |
| from openai import OpenAI, OpenAIError | |
| from he_demo.client import EnergyOptimizationEnv | |
| from he_demo.models import EnergyOptimizationAction | |
| # Environment configuration variables | |
| # Default endpoint uses Hugging Face's router; set API_BASE_URL explicitly if needed. | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| LOCAL_SERVER_URL = os.getenv("LOCAL_SERVER_URL", "http://localhost:8000") | |
| # Use HF_TOKEN as API key for OpenAI client | |
| API_KEY = HF_TOKEN | |
| TASK_NAME = os.getenv("ENERGY_TASK", "energy_optimization") | |
| BENCHMARK = os.getenv("ENERGY_BENCHMARK", "energy_optimization") | |
| MAX_STEPS = 50 # More steps for complex optimization tasks | |
| TEMPERATURE = 0.3 # Lower temperature for more consistent optimization decisions | |
| MAX_TOKENS = 100 | |
| SUCCESS_SCORE_THRESHOLD = 0.5 # Higher threshold for meaningful optimization | |
| # Max possible reward: task completion bonuses + efficiency improvements | |
| MAX_TOTAL_REWARD = 100.0 # Estimated maximum possible reward | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are an AI system optimization agent. Your goal is to optimize computer system resources: | |
| - Reduce RAM usage (target: below 40%) | |
| - Minimize energy consumption (target: below 3 kWh) | |
| - Complete optimization tasks efficiently | |
| Available actions: | |
| - reduce_ram: Focus on RAM optimization (intensity 0.0-1.0) | |
| - optimize_energy: Focus on energy reduction (intensity 0.0-1.0) | |
| - balance_resources: Balanced approach to both resources | |
| - monitor_system: Gather system information | |
| Action format: action_type,intensity | |
| Example: reduce_ram,0.8 | |
| Consider current system state, task requirements, and potential trade-offs. | |
| Reply with exactly one action in the format: action_type,intensity | |
| """ | |
| ).strip() | |
| def log_start(task: str, env: str, model: str) -> None: | |
| print(f"[START] task={task} env={env} model={model}", flush=True) | |
| def log_step( | |
| step: int, action: str, reward: float, done: bool, error: Optional[str] | |
| ) -> None: | |
| error_val = error if error else "null" | |
| done_val = str(done).lower() | |
| print( | |
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", | |
| flush=True, | |
| ) | |
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None: | |
| rewards_str = ",".join(f"{r:.2f}" for r in rewards) | |
| print( | |
| f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", | |
| flush=True, | |
| ) | |
| def build_user_prompt( | |
| step: int, observation, last_reward: float, history: List[str] | |
| ) -> str: | |
| current_task_info = "" | |
| if observation.current_task: | |
| task = observation.current_task | |
| current_task_info = f""" | |
| Current Task: {task.name} | |
| Description: {task.description} | |
| Targets: RAM < {task.ram_target}%, Energy < {task.energy_target} kWh | |
| Max Steps: {task.max_steps} | |
| """ | |
| history_block = "\n".join(history[-3:]) if history else "None" | |
| return textwrap.dedent( | |
| f""" | |
| Step: {step} | |
| System State: | |
| - RAM Usage: {observation.ram_usage:.1f}% | |
| - Energy Consumption: {observation.energy_consumption:.1f} kWh | |
| - System Load: {observation.system_load:.2f} | |
| - Efficiency Score: {observation.efficiency_score:.2f} | |
| - Task Progress: {observation.task_progress:.2f} | |
| - Steps Taken: {observation.steps_taken} | |
| {current_task_info} | |
| Tasks Completed: {', '.join(observation.tasks_completed) if observation.tasks_completed else 'None'} | |
| Last Reward: {last_reward:.2f} | |
| Recent Actions: | |
| {history_block} | |
| Choose your next optimization action (action_type,intensity): | |
| """ | |
| ).strip() | |
| def parse_action(action_str: str) -> EnergyOptimizationAction: | |
| """Parse action string into EnergyOptimizationAction.""" | |
| try: | |
| parts = action_str.strip().split(',') | |
| if len(parts) != 2: | |
| raise ValueError("Invalid action format") | |
| action_type = parts[0].strip() | |
| intensity = float(parts[1].strip()) | |
| # Validate action type | |
| valid_actions = ["reduce_ram", "optimize_energy", "balance_resources", "monitor_system"] | |
| if action_type not in valid_actions: | |
| action_type = "monitor_system" # Default fallback | |
| # Clamp intensity to valid range | |
| intensity = max(0.0, min(1.0, intensity)) | |
| return EnergyOptimizationAction(action_type=action_type, intensity=intensity) | |
| except Exception: | |
| # Return safe default action | |
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) | |
| def get_model_action( | |
| client: OpenAI, step: int, observation, last_reward: float, history: List[str] | |
| ) -> EnergyOptimizationAction: | |
| """Get optimization action from the language model.""" | |
| user_prompt = build_user_prompt(step, observation, last_reward, history) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| action_text = (completion.choices[0].message.content or "").strip() | |
| return parse_action(action_text) | |
| except OpenAIError as exc: | |
| error_text = str(exc) | |
| print(f"[DEBUG] Model request failed: {error_text}", flush=True) | |
| status_code = getattr(exc, 'status_code', None) | |
| if status_code == 403 or "403" in error_text or "insufficient permissions" in error_text.lower(): | |
| raise RuntimeError( | |
| "Hugging Face authentication failed: your token does not have sufficient inference permissions. " | |
| "Use a token with inference access or switch to an active model/endpoint you are authorized for. " | |
| "If you are using the Hugging Face router, ensure HF_TOKEN has the `inference` scope and that MODEL_NAME is accessible." | |
| ) from exc | |
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) | |
| except Exception as exc: | |
| print(f"[DEBUG] Unexpected model request failure: {exc}", flush=True) | |
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5) | |
| async def main() -> None: | |
| # Validate required environment variables | |
| if not API_BASE_URL or API_BASE_URL == "<your-active-endpoint>": | |
| raise ValueError("API_BASE_URL environment variable must be set to your active LLM endpoint") | |
| if not MODEL_NAME or MODEL_NAME == "<your-active-model>": | |
| raise ValueError("MODEL_NAME environment variable must be set to your active model identifier") | |
| if not HF_TOKEN: | |
| raise ValueError("HF_TOKEN environment variable must be set to your Hugging Face API key") | |
| client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN) | |
| async def local_image_exists(image_name: str) -> bool: | |
| try: | |
| result = subprocess.run( | |
| ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"], | |
| capture_output=True, | |
| text=True, | |
| check=True, | |
| ) | |
| return image_name in result.stdout.splitlines() | |
| except Exception: | |
| return False | |
| if LOCAL_IMAGE_NAME: | |
| if await local_image_exists(LOCAL_IMAGE_NAME): | |
| env = await EnergyOptimizationEnv.from_docker_image(LOCAL_IMAGE_NAME) | |
| else: | |
| print( | |
| f"[WARN] Docker image '{LOCAL_IMAGE_NAME}' not found locally. Falling back to local server at {LOCAL_SERVER_URL}", | |
| flush=True, | |
| ) | |
| env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL) | |
| else: | |
| env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL) | |
| history: List[str] = [] | |
| rewards: List[float] = [] | |
| steps_taken = 0 | |
| score = 0.0 | |
| success = False | |
| log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME) | |
| try: | |
| result = await env.reset() | |
| last_reward = 0.0 | |
| for step in range(1, MAX_STEPS + 1): | |
| if result.done: | |
| break | |
| # Get action from model | |
| action = get_model_action(client, step, result.observation, last_reward, history) | |
| # Execute action | |
| result = await env.step(action) | |
| obs = result.observation | |
| reward = result.reward or 0.0 | |
| done = result.done | |
| error = None | |
| # Format action for logging | |
| action_str = f"{action.action_type},{action.intensity:.1f}" | |
| rewards.append(reward) | |
| steps_taken = step | |
| last_reward = reward | |
| log_step(step=step, action=action_str, reward=reward, done=done, error=error) | |
| # Update history | |
| history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}") | |
| if done: | |
| break | |
| # Calculate final score based on tasks completed and efficiency | |
| total_reward = sum(rewards) | |
| tasks_completed = len(result.observation.tasks_completed) if result.observation.tasks_completed else 0 | |
| efficiency_score = result.observation.efficiency_score | |
| # Score combines task completion and efficiency | |
| score = (tasks_completed / 5.0) * 0.6 + (efficiency_score / 1.0) * 0.4 | |
| score = min(max(score, 0.0), 1.0) # clamp to [0, 1] | |
| success = score >= SUCCESS_SCORE_THRESHOLD | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True) | |
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |