""" Strategy Analytics Engine. Cross-strategy analysis including: - Performance comparison tables - Return correlation matrices - Diversification scoring - Strategy clustering (hierarchical) """ from __future__ import annotations import logging from typing import Any, Dict, List, Optional import numpy as np import pandas as pd from scipy.cluster.hierarchy import fcluster, linkage from scipy.spatial.distance import squareform logger = logging.getLogger(__name__) class AnalyticsEngine: """Cross-strategy analytics and comparison.""" def compare_strategies( self, backtest_results: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Compare multiple backtest results side by side. Args: backtest_results: List of backtest result dicts (from BacktestEngine). Returns: Comparison table, correlation matrix, diversification score, clusters. """ if not backtest_results: return {"comparisons": [], "correlation_matrix": None} # 1. Build comparison table comparisons = [] equity_curves: Dict[str, List[float]] = {} for i, result in enumerate(backtest_results): metrics = result.get("metrics", {}) strategy_name = result.get("strategy_name", f"Strategy {i + 1}") strategy_id = result.get("strategy_id", i) comparisons.append({ "strategy_id": strategy_id, "strategy_name": strategy_name, "total_return": metrics.get("total_return"), "annualized_return": metrics.get("annualized_return"), "sharpe_ratio": metrics.get("sharpe_ratio"), "sortino_ratio": metrics.get("sortino_ratio"), "max_drawdown": metrics.get("max_drawdown"), "volatility": metrics.get("volatility"), "win_rate": metrics.get("win_rate"), "calmar_ratio": metrics.get("calmar_ratio"), "total_trades": metrics.get("total_trades"), "alpha": metrics.get("alpha"), "beta": metrics.get("beta"), }) # Extract daily returns for correlation curve = result.get("equity_curve", []) if curve: daily_rets = [p.get("daily_return", 0) for p in curve] equity_curves[strategy_name] = daily_rets # 2. Correlation matrix corr_matrix = None if len(equity_curves) >= 2: corr_matrix = self._compute_correlation(equity_curves) # 3. Diversification score div_score = None if corr_matrix: div_score = self._diversification_score(corr_matrix) # 4. Clustering clusters = None if corr_matrix and len(equity_curves) >= 3: clusters = self._cluster_strategies(corr_matrix) return { "comparisons": comparisons, "correlation_matrix": corr_matrix, "diversification_score": div_score, "clusters": clusters, } @staticmethod def _compute_correlation( return_series: Dict[str, List[float]], ) -> Dict[str, Any]: """Compute return correlation matrix between strategies.""" # Align lengths min_len = min(len(v) for v in return_series.values()) aligned = {k: v[:min_len] for k, v in return_series.items()} df = pd.DataFrame(aligned) corr = df.corr() return { "strategy_names": list(corr.columns), "matrix": [[round(v, 4) for v in row] for row in corr.values.tolist()], } @staticmethod def _diversification_score(corr_data: Dict[str, Any]) -> float: """ Compute diversification score (0-1). Lower average correlation = higher diversification. """ matrix = np.array(corr_data["matrix"]) n = len(matrix) if n < 2: return 1.0 # Average off-diagonal correlation mask = ~np.eye(n, dtype=bool) avg_corr = np.mean(np.abs(matrix[mask])) return round(float(1.0 - avg_corr), 4) @staticmethod def _cluster_strategies(corr_data: Dict[str, Any]) -> Dict[str, List[str]]: """Hierarchical clustering of strategies by return correlation.""" names = corr_data["strategy_names"] matrix = np.array(corr_data["matrix"]) n = len(matrix) if n < 3: return {"cluster_1": names} # Convert correlation to distance distance = 1.0 - np.abs(matrix) np.fill_diagonal(distance, 0) # Ensure symmetry distance = (distance + distance.T) / 2 try: condensed = squareform(distance, checks=False) Z = linkage(condensed, method="ward") max_clusters = min(n, 3) labels = fcluster(Z, t=max_clusters, criterion="maxclust") clusters: Dict[str, List[str]] = {} for name, label in zip(names, labels): key = f"cluster_{label}" clusters.setdefault(key, []).append(name) return clusters except Exception as e: logger.warning("Clustering failed: %s", e) return {"cluster_1": names} @staticmethod def rank_strategies( comparisons: List[Dict[str, Any]], sort_by: str = "sharpe_ratio", ascending: bool = False, ) -> List[Dict[str, Any]]: """Rank strategies by a specific metric.""" valid = [c for c in comparisons if c.get(sort_by) is not None] sorted_list = sorted( valid, key=lambda x: x.get(sort_by, 0), reverse=not ascending ) for i, item in enumerate(sorted_list, 1): item["rank"] = i return sorted_list analytics_engine = AnalyticsEngine()