| """
|
| Energy & Memory RAM Optimization Inference Script
|
| =================================================
|
| This script demonstrates how an AI agent can learn to optimize energy consumption
|
| and RAM usage through reinforcement learning in the Energy Optimization Environment.
|
|
|
| The agent uses an LLM to make strategic decisions about resource optimization actions.
|
|
|
| Required Environment Variables:
|
| - API_BASE_URL: The API endpoint for the LLM (for Hugging Face router, use https://router.huggingface.co/v1)
|
| - MODEL_NAME: The model identifier to use for inference
|
| - HF_TOKEN: Your Hugging Face API key with inference permissions
|
| - LOCAL_IMAGE_NAME: The name of the local image to use for the environment (optional)
|
|
|
| Example setup:
|
| export API_BASE_URL="https://router.huggingface.co/v1"
|
| export MODEL_NAME="OpenAssistant/oasst-sft-1-pythia-12b"
|
| export HF_TOKEN="hf_..."
|
| export LOCAL_IMAGE_NAME="your-docker-image" # Optional
|
| """
|
|
|
| import asyncio
|
| import os
|
| import subprocess
|
| import textwrap
|
| from typing import List, Optional
|
|
|
| from openai import OpenAI, OpenAIError
|
|
|
| from he_demo.client import EnergyOptimizationEnv
|
| from he_demo.models import EnergyOptimizationAction
|
|
|
|
|
|
|
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
|
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
|
| HF_TOKEN = os.getenv("HF_TOKEN")
|
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
|
| LOCAL_SERVER_URL = os.getenv("LOCAL_SERVER_URL", "http://localhost:8000")
|
|
|
|
|
| API_KEY = HF_TOKEN
|
|
|
| TASK_NAME = os.getenv("ENERGY_TASK", "energy_optimization")
|
| BENCHMARK = os.getenv("ENERGY_BENCHMARK", "energy_optimization")
|
| MAX_STEPS = 50
|
| TEMPERATURE = 0.3
|
| MAX_TOKENS = 100
|
| SUCCESS_SCORE_THRESHOLD = 0.5
|
|
|
|
|
| MAX_TOTAL_REWARD = 100.0
|
|
|
| SYSTEM_PROMPT = textwrap.dedent(
|
| """
|
| You are an AI system optimization agent. Your goal is to optimize computer system resources:
|
| - Reduce RAM usage (target: below 40%)
|
| - Minimize energy consumption (target: below 3 kWh)
|
| - Complete optimization tasks efficiently
|
|
|
| Available actions:
|
| - reduce_ram: Focus on RAM optimization (intensity 0.0-1.0)
|
| - optimize_energy: Focus on energy reduction (intensity 0.0-1.0)
|
| - balance_resources: Balanced approach to both resources
|
| - monitor_system: Gather system information
|
|
|
| Action format: action_type,intensity
|
| Example: reduce_ram,0.8
|
|
|
| Consider current system state, task requirements, and potential trade-offs.
|
| Reply with exactly one action in the format: action_type,intensity
|
| """
|
| ).strip()
|
|
|
|
|
| def log_start(task: str, env: str, model: str) -> None:
|
| print(f"[START] task={task} env={env} model={model}", flush=True)
|
|
|
|
|
| def log_step(
|
| step: int, action: str, reward: float, done: bool, error: Optional[str]
|
| ) -> None:
|
| error_val = error if error else "null"
|
| done_val = str(done).lower()
|
| print(
|
| f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
|
| flush=True,
|
| )
|
|
|
|
|
| def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
|
| rewards_str = ",".join(f"{r:.2f}" for r in rewards)
|
| print(
|
| f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}",
|
| flush=True,
|
| )
|
|
|
|
|
| def build_user_prompt(
|
| step: int, observation, last_reward: float, history: List[str]
|
| ) -> str:
|
| current_task_info = ""
|
| if observation.current_task:
|
| task = observation.current_task
|
| current_task_info = f"""
|
| Current Task: {task.name}
|
| Description: {task.description}
|
| Targets: RAM < {task.ram_target}%, Energy < {task.energy_target} kWh
|
| Max Steps: {task.max_steps}
|
| """
|
|
|
| history_block = "\n".join(history[-3:]) if history else "None"
|
|
|
| return textwrap.dedent(
|
| f"""
|
| Step: {step}
|
| System State:
|
| - RAM Usage: {observation.ram_usage:.1f}%
|
| - Energy Consumption: {observation.energy_consumption:.1f} kWh
|
| - System Load: {observation.system_load:.2f}
|
| - Efficiency Score: {observation.efficiency_score:.2f}
|
| - Task Progress: {observation.task_progress:.2f}
|
| - Steps Taken: {observation.steps_taken}
|
|
|
| {current_task_info}
|
| Tasks Completed: {', '.join(observation.tasks_completed) if observation.tasks_completed else 'None'}
|
|
|
| Last Reward: {last_reward:.2f}
|
| Recent Actions:
|
| {history_block}
|
|
|
| Choose your next optimization action (action_type,intensity):
|
| """
|
| ).strip()
|
|
|
|
|
| def parse_action(action_str: str) -> EnergyOptimizationAction:
|
| """Parse action string into EnergyOptimizationAction."""
|
| try:
|
| parts = action_str.strip().split(',')
|
| if len(parts) != 2:
|
| raise ValueError("Invalid action format")
|
|
|
| action_type = parts[0].strip()
|
| intensity = float(parts[1].strip())
|
|
|
|
|
| valid_actions = ["reduce_ram", "optimize_energy", "balance_resources", "monitor_system"]
|
| if action_type not in valid_actions:
|
| action_type = "monitor_system"
|
|
|
|
|
| intensity = max(0.0, min(1.0, intensity))
|
|
|
| return EnergyOptimizationAction(action_type=action_type, intensity=intensity)
|
| except Exception:
|
|
|
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5)
|
|
|
|
|
| def get_model_action(
|
| client: OpenAI, step: int, observation, last_reward: float, history: List[str]
|
| ) -> EnergyOptimizationAction:
|
| """Get optimization action from the language model."""
|
| user_prompt = build_user_prompt(step, observation, last_reward, history)
|
| try:
|
| completion = client.chat.completions.create(
|
| model=MODEL_NAME,
|
| messages=[
|
| {"role": "system", "content": SYSTEM_PROMPT},
|
| {"role": "user", "content": user_prompt},
|
| ],
|
| temperature=TEMPERATURE,
|
| max_tokens=MAX_TOKENS,
|
| stream=False,
|
| )
|
| action_text = (completion.choices[0].message.content or "").strip()
|
| return parse_action(action_text)
|
| except OpenAIError as exc:
|
| error_text = str(exc)
|
| print(f"[DEBUG] Model request failed: {error_text}", flush=True)
|
| status_code = getattr(exc, 'status_code', None)
|
|
|
| if status_code == 403 or "403" in error_text or "insufficient permissions" in error_text.lower():
|
| raise RuntimeError(
|
| "Hugging Face authentication failed: your token does not have sufficient inference permissions. "
|
| "Use a token with inference access or switch to an active model/endpoint you are authorized for. "
|
| "If you are using the Hugging Face router, ensure HF_TOKEN has the `inference` scope and that MODEL_NAME is accessible."
|
| ) from exc
|
|
|
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5)
|
| except Exception as exc:
|
| print(f"[DEBUG] Unexpected model request failure: {exc}", flush=True)
|
| return EnergyOptimizationAction(action_type="monitor_system", intensity=0.5)
|
|
|
|
|
| async def main() -> None:
|
|
|
| if not API_BASE_URL or API_BASE_URL == "<your-active-endpoint>":
|
| raise ValueError("API_BASE_URL environment variable must be set to your active LLM endpoint")
|
|
|
| if not MODEL_NAME or MODEL_NAME == "<your-active-model>":
|
| raise ValueError("MODEL_NAME environment variable must be set to your active model identifier")
|
|
|
| if not HF_TOKEN:
|
| raise ValueError("HF_TOKEN environment variable must be set to your Hugging Face API key")
|
|
|
| client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
|
|
|
| async def local_image_exists(image_name: str) -> bool:
|
| try:
|
| result = subprocess.run(
|
| ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"],
|
| capture_output=True,
|
| text=True,
|
| check=True,
|
| )
|
| return image_name in result.stdout.splitlines()
|
| except Exception:
|
| return False
|
|
|
| if LOCAL_IMAGE_NAME:
|
| if await local_image_exists(LOCAL_IMAGE_NAME):
|
| env = await EnergyOptimizationEnv.from_docker_image(LOCAL_IMAGE_NAME)
|
| else:
|
| print(
|
| f"[WARN] Docker image '{LOCAL_IMAGE_NAME}' not found locally. Falling back to local server at {LOCAL_SERVER_URL}",
|
| flush=True,
|
| )
|
| env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL)
|
| else:
|
| env = EnergyOptimizationEnv(base_url=LOCAL_SERVER_URL)
|
|
|
| history: List[str] = []
|
| rewards: List[float] = []
|
| steps_taken = 0
|
| score = 0.0
|
| success = False
|
|
|
| log_start(task=TASK_NAME, env=BENCHMARK, model=MODEL_NAME)
|
|
|
| try:
|
| result = await env.reset()
|
| last_reward = 0.0
|
|
|
| for step in range(1, MAX_STEPS + 1):
|
| if result.done:
|
| break
|
|
|
|
|
| action = get_model_action(client, step, result.observation, last_reward, history)
|
|
|
|
|
| result = await env.step(action)
|
| obs = result.observation
|
|
|
| reward = result.reward or 0.0
|
| done = result.done
|
| error = None
|
|
|
|
|
| action_str = f"{action.action_type},{action.intensity:.1f}"
|
|
|
| rewards.append(reward)
|
| steps_taken = step
|
| last_reward = reward
|
|
|
| log_step(step=step, action=action_str, reward=reward, done=done, error=error)
|
|
|
|
|
| history.append(f"Step {step}: {action_str} -> reward {reward:+.2f}")
|
|
|
| if done:
|
| break
|
|
|
|
|
| total_reward = sum(rewards)
|
| tasks_completed = len(result.observation.tasks_completed) if result.observation.tasks_completed else 0
|
| efficiency_score = result.observation.efficiency_score
|
|
|
|
|
| score = (tasks_completed / 5.0) * 0.6 + (efficiency_score / 1.0) * 0.4
|
| score = min(max(score, 0.0), 1.0)
|
| success = score >= SUCCESS_SCORE_THRESHOLD
|
|
|
| finally:
|
| try:
|
| await env.close()
|
| except Exception as e:
|
| print(f"[DEBUG] env.close() error (container cleanup): {e}", flush=True)
|
| log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
|
|
|
|
|
| if __name__ == "__main__":
|
| asyncio.run(main())
|
|
|