Spaces:
Sleeping
Sleeping
| """ | |
| Strategy Analytics Engine. | |
| Cross-strategy analysis including: | |
| - Performance comparison tables | |
| - Return correlation matrices | |
| - Diversification scoring | |
| - Strategy clustering (hierarchical) | |
| """ | |
| from __future__ import annotations | |
| import logging | |
| from typing import Any, Dict, List, Optional | |
| import numpy as np | |
| import pandas as pd | |
| from scipy.cluster.hierarchy import fcluster, linkage | |
| from scipy.spatial.distance import squareform | |
| logger = logging.getLogger(__name__) | |
| class AnalyticsEngine: | |
| """Cross-strategy analytics and comparison.""" | |
| def compare_strategies( | |
| self, backtest_results: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compare multiple backtest results side by side. | |
| Args: | |
| backtest_results: List of backtest result dicts (from BacktestEngine). | |
| Returns: | |
| Comparison table, correlation matrix, diversification score, clusters. | |
| """ | |
| if not backtest_results: | |
| return {"comparisons": [], "correlation_matrix": None} | |
| # 1. Build comparison table | |
| comparisons = [] | |
| equity_curves: Dict[str, List[float]] = {} | |
| for i, result in enumerate(backtest_results): | |
| metrics = result.get("metrics", {}) | |
| strategy_name = result.get("strategy_name", f"Strategy {i + 1}") | |
| strategy_id = result.get("strategy_id", i) | |
| comparisons.append({ | |
| "strategy_id": strategy_id, | |
| "strategy_name": strategy_name, | |
| "total_return": metrics.get("total_return"), | |
| "annualized_return": metrics.get("annualized_return"), | |
| "sharpe_ratio": metrics.get("sharpe_ratio"), | |
| "sortino_ratio": metrics.get("sortino_ratio"), | |
| "max_drawdown": metrics.get("max_drawdown"), | |
| "volatility": metrics.get("volatility"), | |
| "win_rate": metrics.get("win_rate"), | |
| "calmar_ratio": metrics.get("calmar_ratio"), | |
| "total_trades": metrics.get("total_trades"), | |
| "alpha": metrics.get("alpha"), | |
| "beta": metrics.get("beta"), | |
| }) | |
| # Extract daily returns for correlation | |
| curve = result.get("equity_curve", []) | |
| if curve: | |
| daily_rets = [p.get("daily_return", 0) for p in curve] | |
| equity_curves[strategy_name] = daily_rets | |
| # 2. Correlation matrix | |
| corr_matrix = None | |
| if len(equity_curves) >= 2: | |
| corr_matrix = self._compute_correlation(equity_curves) | |
| # 3. Diversification score | |
| div_score = None | |
| if corr_matrix: | |
| div_score = self._diversification_score(corr_matrix) | |
| # 4. Clustering | |
| clusters = None | |
| if corr_matrix and len(equity_curves) >= 3: | |
| clusters = self._cluster_strategies(corr_matrix) | |
| return { | |
| "comparisons": comparisons, | |
| "correlation_matrix": corr_matrix, | |
| "diversification_score": div_score, | |
| "clusters": clusters, | |
| } | |
| def _compute_correlation( | |
| return_series: Dict[str, List[float]], | |
| ) -> Dict[str, Any]: | |
| """Compute return correlation matrix between strategies.""" | |
| # Align lengths | |
| min_len = min(len(v) for v in return_series.values()) | |
| aligned = {k: v[:min_len] for k, v in return_series.items()} | |
| df = pd.DataFrame(aligned) | |
| corr = df.corr() | |
| return { | |
| "strategy_names": list(corr.columns), | |
| "matrix": [[round(v, 4) for v in row] for row in corr.values.tolist()], | |
| } | |
| def _diversification_score(corr_data: Dict[str, Any]) -> float: | |
| """ | |
| Compute diversification score (0-1). | |
| Lower average correlation = higher diversification. | |
| """ | |
| matrix = np.array(corr_data["matrix"]) | |
| n = len(matrix) | |
| if n < 2: | |
| return 1.0 | |
| # Average off-diagonal correlation | |
| mask = ~np.eye(n, dtype=bool) | |
| avg_corr = np.mean(np.abs(matrix[mask])) | |
| return round(float(1.0 - avg_corr), 4) | |
| def _cluster_strategies(corr_data: Dict[str, Any]) -> Dict[str, List[str]]: | |
| """Hierarchical clustering of strategies by return correlation.""" | |
| names = corr_data["strategy_names"] | |
| matrix = np.array(corr_data["matrix"]) | |
| n = len(matrix) | |
| if n < 3: | |
| return {"cluster_1": names} | |
| # Convert correlation to distance | |
| distance = 1.0 - np.abs(matrix) | |
| np.fill_diagonal(distance, 0) | |
| # Ensure symmetry | |
| distance = (distance + distance.T) / 2 | |
| try: | |
| condensed = squareform(distance, checks=False) | |
| Z = linkage(condensed, method="ward") | |
| max_clusters = min(n, 3) | |
| labels = fcluster(Z, t=max_clusters, criterion="maxclust") | |
| clusters: Dict[str, List[str]] = {} | |
| for name, label in zip(names, labels): | |
| key = f"cluster_{label}" | |
| clusters.setdefault(key, []).append(name) | |
| return clusters | |
| except Exception as e: | |
| logger.warning("Clustering failed: %s", e) | |
| return {"cluster_1": names} | |
| def rank_strategies( | |
| comparisons: List[Dict[str, Any]], | |
| sort_by: str = "sharpe_ratio", | |
| ascending: bool = False, | |
| ) -> List[Dict[str, Any]]: | |
| """Rank strategies by a specific metric.""" | |
| valid = [c for c in comparisons if c.get(sort_by) is not None] | |
| sorted_list = sorted( | |
| valid, key=lambda x: x.get(sort_by, 0), reverse=not ascending | |
| ) | |
| for i, item in enumerate(sorted_list, 1): | |
| item["rank"] = i | |
| return sorted_list | |
| analytics_engine = AnalyticsEngine() | |