File size: 6,122 Bytes
ac5cfba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.

"""
Green Agent compute efficiency tracker for Pokemon Red OpenEnv.

This module implements compute efficiency tracking as required by
The OpenEnv Challenge evaluation criteria. It measures wall-clock time,
memory usage, and provides transparency metrics for sustainability analysis.
"""

from __future__ import annotations

import os
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional, TypeVar

try:
    import psutil
    HAS_PSUTIL = True
except ImportError:
    HAS_PSUTIL = False

T = TypeVar("T")


@dataclass
class GreenMetrics:
    """
    Compute efficiency metrics for a single step or episode.
    
    Attributes:
        step_time_ms: Wall-clock time for last step in milliseconds.
        avg_step_time_ms: Running average step time in milliseconds.
        total_steps: Total steps executed.
        total_time_s: Total wall-clock time in seconds.
        peak_memory_mb: Peak memory usage in megabytes.
        current_memory_mb: Current memory usage in megabytes.
        cpu_percent: CPU utilization percentage (if available).
    """
    step_time_ms: float = 0.0
    avg_step_time_ms: float = 0.0
    total_steps: int = 0
    total_time_s: float = 0.0
    peak_memory_mb: float = 0.0
    current_memory_mb: float = 0.0
    cpu_percent: float = 0.0
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert metrics to dictionary for JSON serialization."""
        return {
            "green_step_time_ms": round(self.step_time_ms, 3),
            "green_avg_step_time_ms": round(self.avg_step_time_ms, 3),
            "green_total_steps": self.total_steps,
            "green_total_time_s": round(self.total_time_s, 3),
            "green_peak_memory_mb": round(self.peak_memory_mb, 2),
            "green_current_memory_mb": round(self.current_memory_mb, 2),
            "green_cpu_percent": round(self.cpu_percent, 1),
        }


class GreenAgentTracker:
    """
    Tracks compute efficiency metrics for RL environment steps.
    
    Designed for the OpenEnv Challenge "Green Agent" evaluation criteria,
    measuring the computational cost of environment interactions to
    promote sustainable AI development.
    
    Example:
        >>> tracker = GreenAgentTracker()
        >>> 
        >>> # Track a step
        >>> result, metrics = tracker.track_step(lambda: env.step(action))
        >>> print(f"Step took {metrics['green_step_time_ms']:.2f}ms")
        >>> 
        >>> # Get summary
        >>> summary = tracker.get_summary()
        >>> print(f"Avg step: {summary['green_avg_step_time_ms']:.2f}ms")
    
    Attributes:
        enabled: Whether tracking is active.
        metrics: Current aggregated metrics.
    """
    
    def __init__(self, enabled: bool = True):
        """
        Initialize Green Agent tracker.
        
        Args:
            enabled: Whether to enable tracking (default True).
                    Disable for benchmarking without overhead.
        """
        self.enabled = enabled
        self._process: Optional[Any] = None
        if HAS_PSUTIL:
            self._process = psutil.Process(os.getpid())
        
        self.reset()
    
    def reset(self) -> None:
        """Reset tracking metrics for new episode."""
        self._total_steps = 0
        self._total_time = 0.0
        self._peak_memory = 0.0
        self._last_step_time = 0.0
    
    def track_step(self, step_fn: Callable[[], T]) -> tuple[T, Dict[str, Any]]:
        """
        Execute and track a step function.
        
        Args:
            step_fn: Callable that executes the environment step.
        
        Returns:
            Tuple of (step result, metrics dictionary).
        """
        if not self.enabled:
            return step_fn(), {}
        
        # Measure memory before
        start_memory = self._get_memory_mb()
        
        # Time the step
        start_time = time.perf_counter()
        result = step_fn()
        elapsed = time.perf_counter() - start_time
        
        # Update tracking
        self._last_step_time = elapsed
        self._total_time += elapsed
        self._total_steps += 1
        
        # Update peak memory
        current_memory = self._get_memory_mb()
        self._peak_memory = max(self._peak_memory, current_memory)
        
        # Build metrics
        metrics = self.get_metrics()
        return result, metrics.to_dict()
    
    def track_reset(self, reset_fn: Callable[[], T]) -> tuple[T, Dict[str, Any]]:
        """
        Execute and track a reset function.
        
        Args:
            reset_fn: Callable that executes the environment reset.
        
        Returns:
            Tuple of (reset result, metrics dictionary).
        """
        self.reset()
        return self.track_step(reset_fn)
    
    def get_metrics(self) -> GreenMetrics:
        """Get current aggregated metrics."""
        avg_step = self._total_time / max(self._total_steps, 1)
        
        return GreenMetrics(
            step_time_ms=self._last_step_time * 1000,
            avg_step_time_ms=avg_step * 1000,
            total_steps=self._total_steps,
            total_time_s=self._total_time,
            peak_memory_mb=self._peak_memory,
            current_memory_mb=self._get_memory_mb(),
            cpu_percent=self._get_cpu_percent(),
        )
    
    def get_summary(self) -> Dict[str, Any]:
        """Get metrics summary as dictionary."""
        return self.get_metrics().to_dict()
    
    def _get_memory_mb(self) -> float:
        """Get current process memory in MB."""
        if self._process is not None:
            try:
                return self._process.memory_info().rss / (1024 * 1024)
            except Exception:
                pass
        return 0.0
    
    def _get_cpu_percent(self) -> float:
        """Get current CPU utilization percentage."""
        if self._process is not None:
            try:
                return self._process.cpu_percent()
            except Exception:
                pass
        return 0.0