Spaces:
Running
Running
File size: 6,122 Bytes
ac5cfba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
"""
Green Agent compute efficiency tracker for Pokemon Red OpenEnv.
This module implements compute efficiency tracking as required by
The OpenEnv Challenge evaluation criteria. It measures wall-clock time,
memory usage, and provides transparency metrics for sustainability analysis.
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional, TypeVar
try:
import psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
T = TypeVar("T")
@dataclass
class GreenMetrics:
"""
Compute efficiency metrics for a single step or episode.
Attributes:
step_time_ms: Wall-clock time for last step in milliseconds.
avg_step_time_ms: Running average step time in milliseconds.
total_steps: Total steps executed.
total_time_s: Total wall-clock time in seconds.
peak_memory_mb: Peak memory usage in megabytes.
current_memory_mb: Current memory usage in megabytes.
cpu_percent: CPU utilization percentage (if available).
"""
step_time_ms: float = 0.0
avg_step_time_ms: float = 0.0
total_steps: int = 0
total_time_s: float = 0.0
peak_memory_mb: float = 0.0
current_memory_mb: float = 0.0
cpu_percent: float = 0.0
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary for JSON serialization."""
return {
"green_step_time_ms": round(self.step_time_ms, 3),
"green_avg_step_time_ms": round(self.avg_step_time_ms, 3),
"green_total_steps": self.total_steps,
"green_total_time_s": round(self.total_time_s, 3),
"green_peak_memory_mb": round(self.peak_memory_mb, 2),
"green_current_memory_mb": round(self.current_memory_mb, 2),
"green_cpu_percent": round(self.cpu_percent, 1),
}
class GreenAgentTracker:
"""
Tracks compute efficiency metrics for RL environment steps.
Designed for the OpenEnv Challenge "Green Agent" evaluation criteria,
measuring the computational cost of environment interactions to
promote sustainable AI development.
Example:
>>> tracker = GreenAgentTracker()
>>>
>>> # Track a step
>>> result, metrics = tracker.track_step(lambda: env.step(action))
>>> print(f"Step took {metrics['green_step_time_ms']:.2f}ms")
>>>
>>> # Get summary
>>> summary = tracker.get_summary()
>>> print(f"Avg step: {summary['green_avg_step_time_ms']:.2f}ms")
Attributes:
enabled: Whether tracking is active.
metrics: Current aggregated metrics.
"""
def __init__(self, enabled: bool = True):
"""
Initialize Green Agent tracker.
Args:
enabled: Whether to enable tracking (default True).
Disable for benchmarking without overhead.
"""
self.enabled = enabled
self._process: Optional[Any] = None
if HAS_PSUTIL:
self._process = psutil.Process(os.getpid())
self.reset()
def reset(self) -> None:
"""Reset tracking metrics for new episode."""
self._total_steps = 0
self._total_time = 0.0
self._peak_memory = 0.0
self._last_step_time = 0.0
def track_step(self, step_fn: Callable[[], T]) -> tuple[T, Dict[str, Any]]:
"""
Execute and track a step function.
Args:
step_fn: Callable that executes the environment step.
Returns:
Tuple of (step result, metrics dictionary).
"""
if not self.enabled:
return step_fn(), {}
# Measure memory before
start_memory = self._get_memory_mb()
# Time the step
start_time = time.perf_counter()
result = step_fn()
elapsed = time.perf_counter() - start_time
# Update tracking
self._last_step_time = elapsed
self._total_time += elapsed
self._total_steps += 1
# Update peak memory
current_memory = self._get_memory_mb()
self._peak_memory = max(self._peak_memory, current_memory)
# Build metrics
metrics = self.get_metrics()
return result, metrics.to_dict()
def track_reset(self, reset_fn: Callable[[], T]) -> tuple[T, Dict[str, Any]]:
"""
Execute and track a reset function.
Args:
reset_fn: Callable that executes the environment reset.
Returns:
Tuple of (reset result, metrics dictionary).
"""
self.reset()
return self.track_step(reset_fn)
def get_metrics(self) -> GreenMetrics:
"""Get current aggregated metrics."""
avg_step = self._total_time / max(self._total_steps, 1)
return GreenMetrics(
step_time_ms=self._last_step_time * 1000,
avg_step_time_ms=avg_step * 1000,
total_steps=self._total_steps,
total_time_s=self._total_time,
peak_memory_mb=self._peak_memory,
current_memory_mb=self._get_memory_mb(),
cpu_percent=self._get_cpu_percent(),
)
def get_summary(self) -> Dict[str, Any]:
"""Get metrics summary as dictionary."""
return self.get_metrics().to_dict()
def _get_memory_mb(self) -> float:
"""Get current process memory in MB."""
if self._process is not None:
try:
return self._process.memory_info().rss / (1024 * 1024)
except Exception:
pass
return 0.0
def _get_cpu_percent(self) -> float:
"""Get current CPU utilization percentage."""
if self._process is not None:
try:
return self._process.cpu_percent()
except Exception:
pass
return 0.0
|