| """ |
| Metrics Engine. |
| |
| Computes price series, spread, volatility, crash detection, and per-agent PnL. |
| Used by the simulation loop to track market health and by the dashboard for display. |
| """ |
|
|
| import math |
| import numpy as np |
| from dataclasses import dataclass, field |
|
|
|
|
| def calculate_sharpe_ratio(pnl_series: list[float], risk_free_rate: float = 0.0) -> float: |
| """Calculate the Sharpe Ratio of a PnL series.""" |
| if len(pnl_series) < 5: |
| return 0.0 |
| returns = np.diff(pnl_series) |
| if len(returns) == 0 or np.std(returns) == 0: |
| return 0.0 |
| return float(np.mean(returns - risk_free_rate) / np.std(returns) * np.sqrt(252)) |
|
|
|
|
| def calculate_max_drawdown(pnl_series: list[float]) -> float: |
| """Calculate the maximum drawdown percentage from a PnL series.""" |
| if not pnl_series: |
| return 0.0 |
| |
| capital = 10_000.0 |
| equity = [capital + p for p in pnl_series] |
| peak = equity[0] |
| max_dd = 0.0 |
| for value in equity: |
| if value > peak: |
| peak = value |
| dd = (peak - value) / peak if peak > 0 else 0 |
| if dd > max_dd: |
| max_dd = dd |
| return float(max_dd) |
|
|
|
|
| def calculate_win_rate(pnl_series: list[float]) -> float: |
| """Calculate the percentage of profitable ticks.""" |
| if len(pnl_series) < 2: |
| return 0.0 |
| returns = np.diff(pnl_series) |
| wins = len([r for r in returns if r > 0]) |
| return float(wins / len(returns)) if len(returns) > 0 else 0.0 |
|
|
|
|
| @dataclass |
| class TickMetrics: |
| """Metrics snapshot for a single tick.""" |
| tick: int |
| mid_price: float | None |
| best_bid: float | None |
| best_ask: float | None |
| spread: float | None |
| trade_count: int |
| volume: int |
|
|
|
|
| class MetricsEngine: |
| """ |
| Accumulates per-tick metrics and computes derived signals. |
| """ |
|
|
| def __init__(self, crash_threshold: float = 0.05, crash_window: int = 5): |
| self.tick_history: list[TickMetrics] = [] |
| self.price_series: list[float] = [] |
| self.crash_threshold = crash_threshold |
| self.crash_window = crash_window |
| self.crash_events: list[dict] = [] |
|
|
| def record_tick(self, metrics: TickMetrics): |
| """Record metrics for one tick.""" |
| self.tick_history.append(metrics) |
| if metrics.mid_price is not None: |
| self.price_series.append(metrics.mid_price) |
| self._check_crash() |
|
|
| def _check_crash(self): |
| """Detect crash: >threshold drop over crash_window ticks.""" |
| if len(self.price_series) < self.crash_window + 1: |
| return |
| recent = self.price_series[-(self.crash_window + 1):] |
| pct_change = (recent[-1] - recent[0]) / recent[0] |
| if pct_change < -self.crash_threshold: |
| self.crash_events.append({ |
| "tick": len(self.tick_history), |
| "drop_pct": pct_change * 100, |
| "from_price": recent[0], |
| "to_price": recent[-1], |
| }) |
|
|
| def rolling_volatility(self, window: int = 10) -> float | None: |
| """Rolling standard deviation of mid prices over last `window` ticks.""" |
| if len(self.price_series) < window: |
| return None |
| recent = self.price_series[-window:] |
| mean = sum(recent) / len(recent) |
| variance = sum((p - mean) ** 2 for p in recent) / len(recent) |
| return math.sqrt(variance) |
|
|
| def rolling_mean(self, window: int = 10) -> float | None: |
| """Rolling mean of mid prices.""" |
| if len(self.price_series) < window: |
| return None |
| return sum(self.price_series[-window:]) / window |
|
|
| def classify_regime(self) -> str: |
| """ |
| Simple regime classifier based on current market conditions. |
| Returns: "Efficient" | "Trending" | "Volatile" | "Crashed" |
| """ |
| if self.crash_events and self.crash_events[-1]["tick"] >= len(self.tick_history) - 5: |
| return "Crashed" |
|
|
| vol = self.rolling_volatility() |
| if vol is None: |
| return "Efficient" |
|
|
| mean = self.rolling_mean() |
| if mean is None or mean == 0: |
| return "Efficient" |
|
|
| |
| cv = vol / mean |
|
|
| if cv > 0.02: |
| return "Volatile" |
|
|
| |
| if len(self.price_series) >= 10: |
| recent = self.price_series[-10:] |
| ups = sum(1 for i in range(1, len(recent)) if recent[i] > recent[i - 1]) |
| downs = sum(1 for i in range(1, len(recent)) if recent[i] < recent[i - 1]) |
| if ups >= 7 or downs >= 7: |
| return "Trending" |
|
|
| return "Efficient" |
|
|
| def summary(self) -> dict: |
| """Summary stats for reporting.""" |
| return { |
| "total_ticks": len(self.tick_history), |
| "total_trades": sum(t.trade_count for t in self.tick_history), |
| "total_volume": sum(t.volume for t in self.tick_history), |
| "crash_events": len(self.crash_events), |
| "current_regime": self.classify_regime(), |
| "current_volatility": self.rolling_volatility(), |
| "price_range": ( |
| (min(self.price_series), max(self.price_series)) |
| if self.price_series else None |
| ), |
| } |
|
|